vpn-btcpay-provisioner/app/handlers/webhook_handler.py

479 lines
19 KiB
Python

import tempfile
from flask import jsonify
import subprocess
import os
import logging
import hmac
import hashlib
import yaml
import datetime
import uuid
import traceback
from pathlib import Path
from dotenv import load_dotenv
from sqlalchemy.orm import joinedload
from ..utils.db.models import Subscription, Payment
from ..utils.db.operations import DatabaseManager
from ..utils.db.models import SubscriptionStatus
from ..utils.ansible_logger import AnsibleLogger
load_dotenv()
# Enhanced logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('vpn_provisioner.log')
]
)
logger = logging.getLogger(__name__)
ansible_logger = AnsibleLogger()
# Constants
BASE_DIR = Path(__file__).resolve().parent.parent.parent
PLAYBOOK_PATH = BASE_DIR / 'ansible' / 'playbooks' / 'vpn_provision.yml'
CLEANUP_PLAYBOOK = BASE_DIR / 'ansible' / 'playbooks' / 'vpn_cleanup.yml'
class WebhookError(Exception):
"""Custom exception for webhook handling errors"""
pass
def get_vault_values():
"""Get decrypted values from Ansible vault"""
try:
vault_pass = os.getenv('ANSIBLE_VAULT_PASSWORD')
if not vault_pass:
raise WebhookError("Vault password not found in environment variables")
with tempfile.NamedTemporaryFile(mode='w', delete=False) as vault_pass_file:
vault_pass_file.write(vault_pass)
vault_pass_file.flush()
# Execute ansible-vault with error checking
result = subprocess.run(
['ansible-vault', 'view', str(BASE_DIR / 'ansible/group_vars/vpn_servers/vault.yml')],
capture_output=True,
text=True,
env={**os.environ, 'ANSIBLE_VAULT_PASSWORD_FILE': vault_pass_file.name},
check=True # This will raise CalledProcessError if command fails
)
os.unlink(vault_pass_file.name)
try:
vault_contents = yaml.safe_load(result.stdout)
if not vault_contents:
raise WebhookError("Empty vault contents")
# Validate required vault values
required_keys = ['btcpay_base_url', 'btcpay_api_key', 'btcpay_store_id', 'webhook_secret']
missing_keys = [key for key in required_keys if key not in vault_contents]
if missing_keys:
raise WebhookError(f"Missing required vault values: {', '.join(missing_keys)}")
vault_contents['webhook_full_url'] = (
f"{vault_contents['btcpay_base_url']}"
f"{vault_contents.get('btcpay_webhook_path', '/webhook/vpn')}"
)
return vault_contents
except yaml.YAMLError as e:
raise WebhookError(f"Failed to parse vault contents: {str(e)}")
except subprocess.CalledProcessError as e:
logger.error(f"Ansible vault command failed: {e.stderr}")
raise WebhookError("Failed to decrypt vault")
except Exception as e:
logger.error(f"Error reading vault: {traceback.format_exc()}")
raise WebhookError(f"Vault operation failed: {str(e)}")
def verify_signature(payload_body: bytes, signature_header: str) -> bool:
"""
Verify BTCPay webhook signature with proper error handling
Args:
payload_body: Raw request body bytes
signature_header: BTCPay-Sig header value
Returns:
bool: True if signature is valid
"""
try:
if not signature_header:
logger.error("Missing signature header")
return False
vault_values = get_vault_values()
webhook_secret = vault_values.get('webhook_secret')
if not webhook_secret:
logger.error("Webhook secret not found in vault")
return False
# Generate expected signature
expected_signature = hmac.new(
webhook_secret.encode('utf-8'),
payload_body,
hashlib.sha256
).hexdigest()
# Constant-time comparison
return hmac.compare_digest(
signature_header.lower(),
f"sha256={expected_signature}".lower()
)
except Exception as e:
logger.error(f"Signature verification failed: {traceback.format_exc()}")
return False
def run_ansible_playbook(invoice_id: str, cleanup: bool = False, extra_vars: dict = None) -> subprocess.CompletedProcess:
"""Run the appropriate Ansible playbook with logging"""
try:
operation_type = 'cleanup' if cleanup else 'provision'
vault_pass = os.getenv('ANSIBLE_VAULT_PASSWORD')
if not vault_pass:
raise WebhookError("Vault password not found in environment variables")
with tempfile.NamedTemporaryFile(mode='w', delete=False) as vault_pass_file:
vault_pass_file.write(vault_pass)
vault_pass_file.flush()
playbook = CLEANUP_PLAYBOOK if cleanup else PLAYBOOK_PATH
cmd = [
'ansible-playbook',
str(playbook),
'-i', str(BASE_DIR / 'inventory.ini'),
'-e', f'invoice_id={invoice_id}'
]
if extra_vars:
for key, value in extra_vars.items():
cmd.extend(['-e', f'{key}={value}'])
cmd.extend([
'--vault-password-file', vault_pass_file.name,
'-vvv'
])
logger.info(f"Running ansible-playbook command: {' '.join(cmd)}")
# Run ansible-playbook without check=True to handle errors better
result = subprocess.run(
cmd,
capture_output=True,
text=True
)
# Log detailed output for debugging
logger.info("Ansible STDOUT:")
logger.info(result.stdout)
if result.stderr:
logger.error("Ansible STDERR:")
logger.error(result.stderr)
# Check return code manually
if result.returncode != 0:
logger.error(f"Ansible playbook failed with return code {result.returncode}")
logger.error(f"Error output: {result.stderr}")
raise WebhookError(f"Failed {operation_type} for subscription {invoice_id}")
# Log successful operation
is_test = bool(extra_vars and extra_vars.get('is_test'))
ansible_logger.log_operation(
invoice_id,
operation_type,
result,
is_test=is_test
)
# Check for fatal errors in output
if "fatal:" in result.stdout or "fatal:" in result.stderr:
logger.error("Fatal error detected in Ansible output")
raise WebhookError("Ansible playbook reported fatal error")
logger.info(f"Successfully completed {operation_type} for {invoice_id}")
return result
except subprocess.CalledProcessError as e:
logger.error(f"Playbook execution failed: {e.stderr}")
# Log failed operation
ansible_logger.log_operation(
invoice_id,
operation_type,
e,
is_test=bool(extra_vars and extra_vars.get('is_test'))
)
raise WebhookError(f"Failed {operation_type} for subscription {invoice_id}: {e.stderr}")
except Exception as e:
logger.error(f"Error running playbook: {traceback.format_exc()}")
raise WebhookError(f"Playbook execution failed: {str(e)}")
finally:
if 'vault_pass_file' in locals():
os.unlink(vault_pass_file.name)
def handle_subscription_status(data: dict) -> tuple:
"""Handle SubscriptionStatusUpdated webhook"""
try:
sub_id = data.get('subscriptionId')
status = data.get('status')
if not sub_id or not status:
return jsonify({"error": "Missing required fields"}), 400
logger.info(f"Processing subscription status update: {sub_id} -> {status}")
subscription = DatabaseManager.get_subscription_by_invoice(sub_id)
if not subscription:
logger.error(f"Subscription {sub_id} not found")
return jsonify({"error": "Subscription not found"}), 404
if status == 'Active':
DatabaseManager.activate_subscription(sub_id)
else:
# Run cleanup for inactive subscriptions
try:
result = run_ansible_playbook(sub_id, cleanup=True)
logger.info(f"Cleanup playbook completed for {sub_id}: {result.stdout}")
except WebhookError as e:
logger.error(f"Failed to clean up subscription {sub_id}: {str(e)}")
DatabaseManager.expire_subscription(subscription.id)
logger.info(f"Subscription {sub_id} is no longer active")
return jsonify({
"status": "success",
"message": f"Subscription {sub_id} status updated to {status}"
}), 200
except Exception as e:
logger.error(f"Error handling subscription status: {traceback.format_exc()}")
return jsonify({"error": str(e)}), 500
def handle_test_webhook(data, webhook_type):
"""Handle test webhook with proper Ansible execution and logging"""
logger.info(f"Processing test webhook: {webhook_type}")
invoice_id = data.get('invoiceId', '')
if not invoice_id.startswith('__test__'):
logger.error("Invalid test invoice ID format")
return jsonify({"error": "Invalid test invoice ID"}), 400
# Process both types of invoice settlement webhooks
if webhook_type in ['InvoiceSettled', 'InvoicePaymentSettled']:
try:
# For test invoices, create a 30-minute subscription
test_duration = 30 # minutes
test_user_id = f"test_{uuid.uuid4()}"
test_pubkey = f"TEST_KEY_{uuid.uuid4()}"
logger.info(f"Creating test subscription for {test_duration} minutes")
# Create test subscription entry - now returns a dictionary
subscription_data = DatabaseManager.create_subscription(
user_id=test_user_id,
invoice_id=invoice_id,
public_key=test_pubkey,
duration_hours=0.5 # 30 minutes
)
if not subscription_data:
logger.error("Failed to create test subscription")
return jsonify({"error": "Failed to create test subscription"}), 500
logger.info(f"Created test subscription: {subscription_data['id']}")
# Run the provisioning playbook with test flag
try:
logger.info("Running test VPN provision playbook")
result = run_ansible_playbook(
invoice_id=invoice_id,
cleanup=False,
extra_vars={
"is_test": True,
"test_duration_minutes": test_duration,
"test_public_key": test_pubkey
}
)
if result.returncode == 0:
logger.info(f"Test VPN provisioned successfully for {test_duration} minutes")
# Activate subscription and record payment
activated_data = DatabaseManager.activate_subscription(invoice_id)
if activated_data:
DatabaseManager.record_payment(
test_user_id,
subscription_data['id'], # Use dictionary key instead of object attribute
invoice_id,
data.get('amount', 0)
)
cleanup_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=test_duration)
logger.info(f"Scheduling cleanup for {cleanup_time}")
return jsonify({
"status": "success",
"message": f"Test VPN provisioned for {test_duration} minutes",
"test_user_id": test_user_id,
"subscription_id": subscription_data['id'], # Include subscription ID in response
"assigned_ip": subscription_data['assigned_ip'], # Include assigned IP
"cleanup_scheduled": cleanup_time.isoformat()
}), 200
else:
logger.error("Failed to activate subscription")
return jsonify({"error": "Failed to activate subscription"}), 500
logger.error(f"Test provisioning failed: {result.stderr}")
return jsonify({"error": "Test provisioning failed"}), 500
except WebhookError as e:
logger.error(f"Error in test provision playbook: {str(e)}")
return jsonify({"error": str(e)}), 500
except Exception as e:
logger.error(f"Error in test provisioning: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({"error": str(e)}), 500
# Handle test subscription status updates
elif webhook_type == 'SubscriptionStatusUpdated':
return handle_subscription_status(data)
# For other test webhook types, just acknowledge
else:
logger.info(f"Acknowledged test webhook: {webhook_type}")
return jsonify({
"status": "success",
"message": f"Test webhook {webhook_type} acknowledged"
}), 200
def handle_payment_webhook(request) -> tuple:
"""Handle BTCPay Server webhook for VPN provisioning"""
try:
vault_values = get_vault_values()
logger.info(f"Processing webhook on endpoint: {vault_values['webhook_full_url']}")
# Verify signature
signature = request.headers.get('BTCPay-Sig')
if not signature:
logger.error("Missing BTCPay-Sig header")
return jsonify({"error": "Missing signature"}), 401
is_valid = verify_signature(request.get_data(), signature)
if not is_valid:
logger.error("Invalid signature")
return jsonify({"error": "Invalid signature"}), 401
# Parse and validate payload
try:
data = request.json
except Exception as e:
logger.error(f"Invalid JSON payload: {str(e)}")
return jsonify({"error": "Invalid JSON"}), 400
if not data:
return jsonify({"error": "Empty payload"}), 400
logger.info(f"Received webhook data: {data}")
# Extract webhook type and invoice ID
webhook_type = data.get('type')
invoice_id = data.get('invoiceId', '')
if not webhook_type:
return jsonify({"error": "Missing webhook type"}), 400
# Handle test webhooks with special processing
if invoice_id.startswith('__test__'):
return handle_test_webhook(data, webhook_type)
# Handle different webhook types for production
if webhook_type == 'SubscriptionStatusUpdated':
return handle_subscription_status(data)
elif webhook_type in ['InvoiceSettled', 'InvoicePaymentSettled']:
if not invoice_id:
logger.error("Missing invoiceId in webhook data")
return jsonify({"error": "Missing invoiceId"}), 400
from ..utils.db import get_session
with get_session() as session:
try:
# Check if payment already exists
existing_payment = session.query(Payment).filter(
Payment.invoice_id == invoice_id
).first()
if existing_payment:
logger.info(f"Payment already recorded for invoice {invoice_id}")
return jsonify({
"status": "success",
"message": "Payment already processed",
"invoice_id": invoice_id
}), 200
# Run VPN provisioning
logger.info(f"Starting VPN provisioning for invoice {invoice_id}")
result = run_ansible_playbook(invoice_id)
# Update subscription status within session
subscription = session.query(Subscription).filter(
Subscription.invoice_id == invoice_id
).options(joinedload(Subscription.user)).first()
if subscription:
# Activate subscription
subscription.status = SubscriptionStatus.ACTIVE
# Record payment only if it doesn't exist
payment = Payment(
user_id=subscription.user_id,
subscription_id=subscription.id,
invoice_id=invoice_id,
amount=data.get('amount', 0)
)
session.add(payment)
# Commit all changes
session.commit()
logger.info(f"VPN provisioning completed for invoice {invoice_id}")
return jsonify({
"status": "success",
"invoice_id": invoice_id,
"message": "VPN provisioning completed"
}), 200
else:
logger.error(f"Subscription not found for invoice {invoice_id}")
return jsonify({"error": "Subscription not found"}), 404
except Exception as e:
session.rollback()
logger.error(f"VPN provisioning failed: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": "Provisioning failed",
"details": str(e)
}), 500
else:
logger.info(f"Received {webhook_type} webhook - no action required")
return jsonify({
"status": "success",
"message": f"Webhook {webhook_type} acknowledged"
}), 200
except WebhookError as e:
logger.error(f"Webhook error: {str(e)}")
return jsonify({"error": str(e)}), 500
except Exception as e:
logger.error(f"Unexpected error: {traceback.format_exc()}")
return jsonify({"error": "Internal server error"}), 500