vpn-btcpay-provisioner/scripts/migrate_db.py

149 lines
5.9 KiB
Python

import sys
from pathlib import Path
import sqlite3
import uuid
import logging
from datetime import datetime
# Add the project root to Python path
project_root = Path(__file__).resolve().parent.parent
sys.path.append(str(project_root))
from app.utils.db.models import Base, SubscriptionStatus
from app.utils.db import get_db_path, get_session
from sqlalchemy import create_engine
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def backup_database():
"""Create a backup of the existing database"""
db_path = get_db_path()
if not db_path.exists():
logger.info("No existing database found, skipping backup")
return
backup_path = db_path.parent / f"{db_path.stem}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}{db_path.suffix}"
db_path.rename(backup_path)
logger.info(f"Created database backup at {backup_path}")
return backup_path
def migrate_database():
"""Perform database migration"""
try:
# Create backup
backup_path = backup_database()
# Initialize new database
engine = create_engine(f"sqlite:///{get_db_path()}")
Base.metadata.create_all(engine)
# If there's an old database, migrate the data
if backup_path and backup_path.exists():
logger.info("Migrating data from old database...")
# Connect to old database
old_conn = sqlite3.connect(backup_path)
old_cursor = old_conn.cursor()
# Get schema information
old_cursor.execute("SELECT sql FROM sqlite_master WHERE type='table'")
old_schema = old_cursor.fetchall()
logger.info("Old database schema:")
for table in old_schema:
logger.info(table[0])
with get_session() as session:
try:
# Start transaction
session.begin()
# Migrate users - note we only use user_id now
logger.info("Migrating users...")
old_cursor.execute("SELECT id, user_id, created_at FROM users")
users = old_cursor.fetchall()
user_id_map = {} # Map old IDs to new UUIDs
for old_id, user_id, created_at in users:
user_id_map[old_id] = user_id # Keep the same user_id
session.execute(
"INSERT INTO users (user_id, created_at) VALUES (?, ?)",
[user_id, created_at or datetime.utcnow()]
)
# Migrate subscriptions
logger.info("Migrating subscriptions...")
old_cursor.execute("""
SELECT id, user_id, invoice_id, public_key, start_time,
expiry_time, status, warning_sent, assigned_ip
FROM subscriptions
""")
subscriptions = old_cursor.fetchall()
for sub in subscriptions:
old_id, old_user_id, invoice_id, public_key, start_time, \
expiry_time, status, warning_sent, assigned_ip = sub
if old_user_id in user_id_map:
session.execute("""
INSERT INTO subscriptions
(user_id, invoice_id, public_key, start_time, expiry_time,
status, warning_sent, assigned_ip)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", [
old_user_id, # Use the original user_id
invoice_id,
public_key,
start_time,
expiry_time,
status,
warning_sent,
assigned_ip
])
# Migrate payments
logger.info("Migrating payments...")
old_cursor.execute("""
SELECT user_id, subscription_id, invoice_id, amount, timestamp
FROM payments
""")
payments = old_cursor.fetchall()
for payment in payments:
old_user_id, sub_id, invoice_id, amount, timestamp = payment
if old_user_id in user_id_map:
session.execute("""
INSERT INTO payments
(user_id, subscription_id, invoice_id, amount, timestamp)
VALUES (?, ?, ?, ?, ?)
""", [
old_user_id, # Use the original user_id
sub_id,
invoice_id,
amount,
timestamp
])
# Commit transaction
session.commit()
logger.info("Migration completed successfully")
except Exception as e:
session.rollback()
logger.error(f"Migration failed: {str(e)}")
raise
old_conn.close()
else:
logger.info("No existing database to migrate")
except Exception as e:
logger.error(f"Migration failed: {str(e)}")
raise
if __name__ == '__main__':
try:
migrate_database()
except Exception as e:
logger.error(f"Migration failed: {str(e)}")
sys.exit(1)