import sys from pathlib import Path import sqlite3 import uuid import logging from datetime import datetime # Add the project root to Python path project_root = Path(__file__).resolve().parent.parent sys.path.append(str(project_root)) from app.utils.db.models import Base, SubscriptionStatus from app.utils.db import get_db_path, get_session from sqlalchemy import create_engine logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def backup_database(): """Create a backup of the existing database""" db_path = get_db_path() if not db_path.exists(): logger.info("No existing database found, skipping backup") return backup_path = db_path.parent / f"{db_path.stem}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}{db_path.suffix}" db_path.rename(backup_path) logger.info(f"Created database backup at {backup_path}") return backup_path def migrate_database(): """Perform database migration""" try: # Create backup backup_path = backup_database() # Initialize new database engine = create_engine(f"sqlite:///{get_db_path()}") Base.metadata.create_all(engine) # If there's an old database, migrate the data if backup_path and backup_path.exists(): logger.info("Migrating data from old database...") # Connect to old database old_conn = sqlite3.connect(backup_path) old_cursor = old_conn.cursor() # Get schema information old_cursor.execute("SELECT sql FROM sqlite_master WHERE type='table'") old_schema = old_cursor.fetchall() logger.info("Old database schema:") for table in old_schema: logger.info(table[0]) with get_session() as session: try: # Start transaction session.begin() # Migrate users - note we only use user_id now logger.info("Migrating users...") old_cursor.execute("SELECT id, user_id, created_at FROM users") users = old_cursor.fetchall() user_id_map = {} # Map old IDs to new UUIDs for old_id, user_id, created_at in users: user_id_map[old_id] = user_id # Keep the same user_id session.execute( "INSERT INTO users (user_id, created_at) VALUES (?, ?)", [user_id, created_at or datetime.utcnow()] ) # Migrate subscriptions logger.info("Migrating subscriptions...") old_cursor.execute(""" SELECT id, user_id, invoice_id, public_key, start_time, expiry_time, status, warning_sent, assigned_ip FROM subscriptions """) subscriptions = old_cursor.fetchall() for sub in subscriptions: old_id, old_user_id, invoice_id, public_key, start_time, \ expiry_time, status, warning_sent, assigned_ip = sub if old_user_id in user_id_map: session.execute(""" INSERT INTO subscriptions (user_id, invoice_id, public_key, start_time, expiry_time, status, warning_sent, assigned_ip) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, [ old_user_id, # Use the original user_id invoice_id, public_key, start_time, expiry_time, status, warning_sent, assigned_ip ]) # Migrate payments logger.info("Migrating payments...") old_cursor.execute(""" SELECT user_id, subscription_id, invoice_id, amount, timestamp FROM payments """) payments = old_cursor.fetchall() for payment in payments: old_user_id, sub_id, invoice_id, amount, timestamp = payment if old_user_id in user_id_map: session.execute(""" INSERT INTO payments (user_id, subscription_id, invoice_id, amount, timestamp) VALUES (?, ?, ?, ?, ?) """, [ old_user_id, # Use the original user_id sub_id, invoice_id, amount, timestamp ]) # Commit transaction session.commit() logger.info("Migration completed successfully") except Exception as e: session.rollback() logger.error(f"Migration failed: {str(e)}") raise old_conn.close() else: logger.info("No existing database to migrate") except Exception as e: logger.error(f"Migration failed: {str(e)}") raise if __name__ == '__main__': try: migrate_database() except Exception as e: logger.error(f"Migration failed: {str(e)}") sys.exit(1)