enki 7c92aa3ded
Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / E2E Tests (push) Blocked by required conditions
Major DHT and Torrent fixes.
2025-08-29 21:18:36 -07:00

529 lines
14 KiB
Go

package main
import (
"flag"
"fmt"
"io/fs"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"torrentGateway/internal/api"
"torrentGateway/internal/blossom"
"torrentGateway/internal/config"
"torrentGateway/internal/dht"
"torrentGateway/internal/middleware"
"torrentGateway/internal/storage"
"torrentGateway/internal/web"
"torrentGateway/internal/p2p"
"github.com/gorilla/mux"
)
func main() {
// Parse command line flags
configPath := flag.String("config", "configs/config.yaml", "Path to configuration file")
mode := flag.String("mode", "", "Override mode (unified, gateway-only, blossom-only, dht-only)")
flag.Parse()
// Load configuration
cfg, err := config.LoadConfig(*configPath)
if err != nil {
log.Fatalf("Failed to load configuration: %v", err)
}
// Override mode if specified
if *mode != "" {
cfg.Mode = *mode
}
log.Printf("Starting Blossom-BitTorrent Gateway in %s mode", cfg.Mode)
// Initialize shared storage backend
var storageBackend *storage.Backend
if cfg.IsServiceEnabled("gateway") || cfg.IsServiceEnabled("blossom") {
storageBackend, err = storage.NewBackend(
cfg.Storage.MetadataDB,
cfg.Storage.ChunkStorage,
cfg.Storage.BlobStorage,
cfg.Storage.ChunkSize,
cfg,
)
if err != nil {
log.Fatalf("Failed to initialize storage backend: %v", err)
}
defer storageBackend.Close()
// Start automated cleanup service if enabled
if cfg.Admin.AutoCleanup {
log.Printf("Starting automated cleanup service")
go startCleanupService(storageBackend, cfg)
}
}
// Track running services
var wg sync.WaitGroup
var servers []ServiceManager
// Declare gateway instance for DHT integration
var gatewayInstance *api.Gateway
// Declare P2P gateway for unified coordination
var p2pGateway *p2p.UnifiedP2PGateway
// Start Gateway service
if cfg.IsServiceEnabled("gateway") {
log.Printf("Starting Gateway service on port %d", cfg.Gateway.Port)
r := mux.NewRouter()
// Apply security middleware to ALL routes (not just API)
r.Use(middleware.SecurityHeaders)
r.Use(middleware.InputSanitization)
maxUploadSize, _ := cfg.GetMaxUploadSizeBytes()
r.Use(middleware.RequestSizeLimit(maxUploadSize))
r.Use(middleware.AntiCrawler)
r.Use(middleware.CORS)
// Register API routes with /api prefix
apiRouter := r.PathPrefix("/api").Subrouter()
// Register main API routes and get gateway instance first
gatewayInstance = api.RegisterRoutes(apiRouter, cfg, storageBackend)
// Register tracker routes on main router (no /api prefix for BitTorrent compatibility)
wsTracker := api.RegisterTrackerRoutes(r, cfg, storageBackend, gatewayInstance)
if wsTracker != nil {
gatewayInstance.SetWebSocketTracker(wsTracker)
}
// Serve static files
webFS := web.GetFS()
staticFS, _ := fs.Sub(webFS, "static")
r.PathPrefix("/static/").Handler(http.StripPrefix("/static/", http.FileServer(http.FS(staticFS))))
// Serve main pages
r.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
data, err := web.GetFile("index.html")
if err != nil {
http.Error(w, "Page not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "text/html")
w.Write(data)
})
r.HandleFunc("/player.html", func(w http.ResponseWriter, r *http.Request) {
data, err := web.GetFile("player.html")
if err != nil {
http.Error(w, "Page not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "text/html")
w.Write(data)
})
r.HandleFunc("/admin", func(w http.ResponseWriter, r *http.Request) {
data, err := web.GetFile("admin.html")
if err != nil {
http.Error(w, "Page not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "text/html")
w.Write(data)
})
gatewayServer := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", cfg.Gateway.Port),
Handler: r,
}
servers = append(servers, &HTTPServiceManager{
name: "Gateway",
server: gatewayServer,
})
wg.Add(1)
go func() {
defer wg.Done()
if err := gatewayServer.ListenAndServe(); err != http.ErrServerClosed {
log.Printf("Gateway server error: %v", err)
}
}()
}
// Start Blossom server
if cfg.IsServiceEnabled("blossom") {
log.Printf("Starting Blossom server on port %d", cfg.BlossomServer.Port)
blossomServer := blossom.NewServer(storageBackend, &cfg.BlossomServer, cfg)
httpServer := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.BlossomServer.Port),
Handler: blossomServer,
}
servers = append(servers, &HTTPServiceManager{
name: "Blossom",
server: httpServer,
})
wg.Add(1)
go func() {
defer wg.Done()
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
log.Printf("Blossom server error: %v", err)
}
}()
}
// Start DHT node
if cfg.IsServiceEnabled("dht") {
log.Printf("Starting DHT node on port %d", cfg.DHT.Port)
dhtNode, err := dht.NewDHT(&cfg.DHT)
if err != nil {
log.Fatalf("Failed to create DHT node: %v", err)
}
// Create DHT bootstrap manager (only if gateway is enabled)
var dhtBootstrap *dht.DHTBootstrap
if gatewayInstance != nil {
dhtBootstrap = dht.NewDHTBootstrap(dhtNode, gatewayInstance, &cfg.DHT)
}
servers = append(servers, &DHTServiceManager{
name: "DHT",
node: dhtNode,
bootstrap: dhtBootstrap,
})
wg.Add(1)
go func() {
defer wg.Done()
if err := dhtNode.Start(); err != nil {
log.Printf("DHT node error: %v", err)
return
}
// Initialize bootstrap functionality after DHT starts (only if gateway is enabled)
if dhtBootstrap != nil {
if err := dhtBootstrap.Initialize(); err != nil {
log.Printf("DHT bootstrap error: %v", err)
}
// Connect DHT bootstrap to gateway
gatewayInstance.SetDHTBootstrap(dhtBootstrap)
// Initialize unified P2P gateway after all components are connected
if gatewayInstance != nil {
log.Printf("Initializing unified P2P gateway...")
p2pGateway = p2p.NewUnifiedP2PGateway(cfg, storageBackend.GetDB())
if err := p2pGateway.Initialize(); err != nil {
log.Printf("P2P gateway initialization error: %v", err)
} else {
log.Printf("Unified P2P gateway initialized successfully")
gatewayInstance.SetP2PGateway(p2pGateway)
}
}
}
}()
}
// Handle shutdown signals
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Wait for shutdown signal
<-sigCh
log.Println("Shutdown signal received, stopping services...")
// Stop P2P gateway first
if p2pGateway != nil {
log.Println("Stopping P2P gateway...")
if err := p2pGateway.Shutdown(); err != nil {
log.Printf("Error stopping P2P gateway: %v", err)
}
}
// Stop all services
for _, srv := range servers {
if err := srv.Stop(); err != nil {
log.Printf("Error stopping %s: %v", srv.Name(), err)
}
}
// Wait for all services to stop
wg.Wait()
log.Println("All services stopped")
}
// ServiceManager interface for managing different types of services
type ServiceManager interface {
Stop() error
Name() string
}
// HTTPServiceManager manages HTTP servers
type HTTPServiceManager struct {
name string
server *http.Server
}
func (h *HTTPServiceManager) Stop() error {
log.Printf("Stopping %s server", h.name)
return h.server.Close()
}
func (h *HTTPServiceManager) Name() string {
return h.name
}
// DHTServiceManager manages DHT nodes
type DHTServiceManager struct {
name string
node *dht.DHT
bootstrap *dht.DHTBootstrap
}
func (d *DHTServiceManager) Stop() error {
log.Printf("Stopping %s node", d.name)
if d.bootstrap != nil {
d.bootstrap.Stop()
}
return d.node.Stop()
}
func (d *DHTServiceManager) Name() string {
return d.name
}
// startCleanupService starts the automated cleanup service
func startCleanupService(storageBackend *storage.Backend, cfg *config.Config) {
// Parse cleanup age
cleanupAge, err := parseDuration(cfg.Admin.CleanupAge)
if err != nil {
log.Printf("Invalid cleanup_age format '%s': %v", cfg.Admin.CleanupAge, err)
return
}
// Default cleanup interval to 1 hour if not set
cleanupInterval := 1 * time.Hour
log.Printf("Cleanup service started: cleaning files older than %v every %v", cleanupAge, cleanupInterval)
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
// Run initial cleanup after 1 minute
initialDelay := time.NewTimer(1 * time.Minute)
defer initialDelay.Stop()
for {
select {
case <-initialDelay.C:
runCleanupCycle(storageBackend, cfg, cleanupAge)
case <-ticker.C:
runCleanupCycle(storageBackend, cfg, cleanupAge)
}
}
}
// runCleanupCycle performs a full cleanup cycle
func runCleanupCycle(storageBackend *storage.Backend, cfg *config.Config, cleanupAge time.Duration) {
log.Printf("Starting cleanup cycle...")
start := time.Now()
// Clean old files
if fileResults, err := cleanupOldFiles(storageBackend, cleanupAge); err != nil {
log.Printf("Error cleaning old files: %v", err)
} else {
log.Printf("Cleaned old files: %s", formatCleanupResults(fileResults))
}
// Clean orphaned chunks
if chunkResults, err := cleanupOrphanedChunks(storageBackend, cfg); err != nil {
log.Printf("Error cleaning orphaned chunks: %v", err)
} else {
log.Printf("Cleaned orphaned chunks: %s", formatCleanupResults(chunkResults))
}
// Clean inactive users (users with no activity for 180 days)
if userResults, err := cleanupInactiveUsers(storageBackend, 180); err != nil {
log.Printf("Error cleaning inactive users: %v", err)
} else {
log.Printf("Cleaned inactive users: %s", formatCleanupResults(userResults))
}
log.Printf("Cleanup cycle completed in %v", time.Since(start))
}
// cleanupOldFiles removes files older than the specified duration
func cleanupOldFiles(storageBackend *storage.Backend, olderThan time.Duration) (map[string]interface{}, error) {
cutoffTime := time.Now().Add(-olderThan)
rows, err := storageBackend.GetDB().Query(`
SELECT hash, original_name, storage_type, created_at
FROM files
WHERE created_at < ? OR last_access < ?
`, cutoffTime, cutoffTime)
if err != nil {
return nil, err
}
defer rows.Close()
var deletedCount int
var totalSize int64
for rows.Next() {
var hash, originalName, storageType string
var createdAt time.Time
if err := rows.Scan(&hash, &originalName, &storageType, &createdAt); err != nil {
continue
}
// Get file size before deletion
if metadata, err := storageBackend.GetFileMetadata(hash); err == nil && metadata != nil {
totalSize += metadata.Size
}
// Delete the file
if err := storageBackend.AdminDeleteFile(hash); err != nil {
log.Printf("Error deleting file %s: %v", hash, err)
continue
}
deletedCount++
log.Printf("Deleted old file: %s (%s)", originalName, hash[:8])
}
return map[string]interface{}{
"deleted_count": deletedCount,
"total_size": totalSize,
}, nil
}
// cleanupOrphanedChunks removes chunks that don't belong to any file
func cleanupOrphanedChunks(storageBackend *storage.Backend, cfg *config.Config) (map[string]interface{}, error) {
rows, err := storageBackend.GetDB().Query(`
SELECT c.chunk_hash, c.size
FROM chunks c
LEFT JOIN files f ON c.file_hash = f.hash
WHERE f.hash IS NULL
`)
if err != nil {
return nil, err
}
defer rows.Close()
var deletedCount int
var totalSize int64
for rows.Next() {
var chunkHash string
var size int64
if err := rows.Scan(&chunkHash, &size); err != nil {
continue
}
// Delete chunk record from database
if _, err := storageBackend.GetDB().Exec("DELETE FROM chunks WHERE chunk_hash = ?", chunkHash); err != nil {
log.Printf("Error deleting chunk record %s: %v", chunkHash, err)
continue
}
// Delete chunk file from disk
chunkPath := fmt.Sprintf("%s/%s", cfg.Storage.ChunkStorage, chunkHash)
if err := os.Remove(chunkPath); err != nil {
log.Printf("Error deleting chunk file %s: %v", chunkHash, err)
}
deletedCount++
totalSize += size
}
return map[string]interface{}{
"deleted_count": deletedCount,
"total_size": totalSize,
}, nil
}
// cleanupInactiveUsers removes users with no activity for specified days
func cleanupInactiveUsers(storageBackend *storage.Backend, days int) (map[string]interface{}, error) {
cutoffTime := time.Now().AddDate(0, 0, -days)
result, err := storageBackend.GetDB().Exec(`
DELETE FROM users
WHERE last_activity < ? AND pubkey NOT IN (
SELECT DISTINCT owner_pubkey FROM files WHERE owner_pubkey IS NOT NULL
)
`, cutoffTime)
if err != nil {
return nil, err
}
deletedCount, _ := result.RowsAffected()
return map[string]interface{}{
"deleted_count": deletedCount,
}, nil
}
// formatCleanupResults formats cleanup results for logging
func formatCleanupResults(results map[string]interface{}) string {
deletedCount := results["deleted_count"].(int)
if totalSize, hasSize := results["total_size"]; hasSize {
return fmt.Sprintf("%d items (%s)", deletedCount, formatBytes(totalSize.(int64)))
}
return fmt.Sprintf("%d items", deletedCount)
}
// formatBytes formats byte count as human readable string
func formatBytes(bytes int64) string {
const unit = 1024
if bytes < unit {
return fmt.Sprintf("%d B", bytes)
}
div, exp := int64(unit), 0
for n := bytes / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
}
// parseDuration parses duration strings like "90d", "30d", "24h"
func parseDuration(s string) (time.Duration, error) {
if len(s) < 2 {
return 0, fmt.Errorf("invalid duration format")
}
unit := s[len(s)-1:]
valueStr := s[:len(s)-1]
value, err := strconv.Atoi(valueStr)
if err != nil {
return 0, err
}
switch strings.ToLower(unit) {
case "d":
return time.Duration(value) * 24 * time.Hour, nil
case "h":
return time.Duration(value) * time.Hour, nil
case "m":
return time.Duration(value) * time.Minute, nil
case "s":
return time.Duration(value) * time.Second, nil
default:
return time.ParseDuration(s)
}
}