# app/utils/ansible_logger.py import logging import json from datetime import datetime from pathlib import Path from .db.operations import DatabaseManager class AnsibleLogger: def __init__(self, log_dir=None): """Initialize the Ansible logger""" # Use data directory from project structure self.base_dir = Path(__file__).resolve().parent.parent.parent self.log_dir = log_dir or (self.base_dir / 'data' / 'logs') self.log_dir.mkdir(parents=True, exist_ok=True) # Set up file handler self.logger = logging.getLogger('ansible_operations') self.logger.setLevel(logging.DEBUG) # Create a detailed log file detailed_log = self.log_dir / 'ansible_operations.log' file_handler = logging.FileHandler(detailed_log) file_handler.setLevel(logging.DEBUG) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(formatter) self.logger.addHandler(file_handler) def log_operation(self, subscription_id, operation_type, result, is_test=False): """Log an Ansible operation""" try: # Get the subscription subscription = DatabaseManager.get_subscription_by_invoice(subscription_id) if not subscription: self.logger.error(f"Subscription {subscription_id} not found") return # Create detailed log entry log_entry = { 'timestamp': datetime.utcnow().isoformat(), 'subscription_id': subscription_id, 'operation_type': operation_type, 'is_test': is_test, 'return_code': result.returncode, 'stdout': result.stdout, 'stderr': result.stderr, 'assigned_ip': subscription.assigned_ip } # Create log filename with timestamp log_file = self.log_dir / f"{operation_type}_{subscription_id}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json" # Write detailed JSON log with open(log_file, 'w') as f: json.dump(log_entry, f, indent=2) # Create provision log in database DatabaseManager.create_provision_log({ 'subscription_id': subscription.id, 'action': operation_type, 'status': 'success' if result.returncode == 0 else 'failure', 'ansible_output': result.stdout, 'error_message': result.stderr if result.returncode != 0 else None }) # Log summary if result.returncode == 0: self.logger.info(f"Successfully completed {operation_type} for subscription {subscription_id}") else: self.logger.error(f"Failed {operation_type} for subscription {subscription_id}: {result.stderr}") except Exception as e: self.logger.error(f"Error logging operation: {str(e)}") def get_logs(self, subscription_id=None, hours=24, operation_type=None): """Get recent Ansible operation logs""" try: log_files = [] pattern = f"*{subscription_id if subscription_id else ''}*.json" for log_file in self.log_dir.glob(pattern): if operation_type and operation_type not in log_file.name: continue log_files.append(log_file) # Sort by modification time and return most recent first log_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) logs = [] for log_file in log_files: with open(log_file) as f: logs.append(json.load(f)) return logs except Exception as e: self.logger.error(f"Error retrieving logs: {str(e)}") return [] def cleanup_old_logs(self, days=30): """Clean up logs older than specified days""" try: cutoff = datetime.now().timestamp() - (days * 24 * 60 * 60) for log_file in self.log_dir.glob('*.json'): if log_file.stat().st_mtime < cutoff: log_file.unlink() self.logger.info(f"Cleaned up old log file: {log_file}") except Exception as e: self.logger.error(f"Error cleaning up logs: {str(e)}")