vpn-btcpay-provisioner/app/utils/ansible_logger.py

112 lines
4.4 KiB
Python

# app/utils/ansible_logger.py
import logging
import json
from datetime import datetime
from pathlib import Path
from .db.operations import DatabaseManager
class AnsibleLogger:
def __init__(self, log_dir=None):
"""Initialize the Ansible logger"""
# Use data directory from project structure
self.base_dir = Path(__file__).resolve().parent.parent.parent
self.log_dir = log_dir or (self.base_dir / 'data' / 'logs')
self.log_dir.mkdir(parents=True, exist_ok=True)
# Set up file handler
self.logger = logging.getLogger('ansible_operations')
self.logger.setLevel(logging.DEBUG)
# Create a detailed log file
detailed_log = self.log_dir / 'ansible_operations.log'
file_handler = logging.FileHandler(detailed_log)
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def log_operation(self, subscription_id, operation_type, result, is_test=False):
"""Log an Ansible operation"""
try:
# Get the subscription
subscription = DatabaseManager.get_subscription_by_invoice(subscription_id)
if not subscription:
self.logger.error(f"Subscription {subscription_id} not found")
return
# Create detailed log entry
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'subscription_id': subscription_id,
'operation_type': operation_type,
'is_test': is_test,
'return_code': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr,
'assigned_ip': subscription.assigned_ip
}
# Create log filename with timestamp
log_file = self.log_dir / f"{operation_type}_{subscription_id}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
# Write detailed JSON log
with open(log_file, 'w') as f:
json.dump(log_entry, f, indent=2)
# Create provision log in database
DatabaseManager.create_provision_log({
'subscription_id': subscription.id,
'action': operation_type,
'status': 'success' if result.returncode == 0 else 'failure',
'ansible_output': result.stdout,
'error_message': result.stderr if result.returncode != 0 else None
})
# Log summary
if result.returncode == 0:
self.logger.info(f"Successfully completed {operation_type} for subscription {subscription_id}")
else:
self.logger.error(f"Failed {operation_type} for subscription {subscription_id}: {result.stderr}")
except Exception as e:
self.logger.error(f"Error logging operation: {str(e)}")
def get_logs(self, subscription_id=None, hours=24, operation_type=None):
"""Get recent Ansible operation logs"""
try:
log_files = []
pattern = f"*{subscription_id if subscription_id else ''}*.json"
for log_file in self.log_dir.glob(pattern):
if operation_type and operation_type not in log_file.name:
continue
log_files.append(log_file)
# Sort by modification time and return most recent first
log_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
logs = []
for log_file in log_files:
with open(log_file) as f:
logs.append(json.load(f))
return logs
except Exception as e:
self.logger.error(f"Error retrieving logs: {str(e)}")
return []
def cleanup_old_logs(self, days=30):
"""Clean up logs older than specified days"""
try:
cutoff = datetime.now().timestamp() - (days * 24 * 60 * 60)
for log_file in self.log_dir.glob('*.json'):
if log_file.stat().st_mtime < cutoff:
log_file.unlink()
self.logger.info(f"Cleaned up old log file: {log_file}")
except Exception as e:
self.logger.error(f"Error cleaning up logs: {str(e)}")