529 lines
14 KiB
Go
529 lines
14 KiB
Go
package main
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"io/fs"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"torrentGateway/internal/api"
|
|
"torrentGateway/internal/blossom"
|
|
"torrentGateway/internal/config"
|
|
"torrentGateway/internal/dht"
|
|
"torrentGateway/internal/middleware"
|
|
"torrentGateway/internal/storage"
|
|
"torrentGateway/internal/web"
|
|
"torrentGateway/internal/p2p"
|
|
"github.com/gorilla/mux"
|
|
)
|
|
|
|
func main() {
|
|
// Parse command line flags
|
|
configPath := flag.String("config", "configs/config.yaml", "Path to configuration file")
|
|
mode := flag.String("mode", "", "Override mode (unified, gateway-only, blossom-only, dht-only)")
|
|
flag.Parse()
|
|
|
|
// Load configuration
|
|
cfg, err := config.LoadConfig(*configPath)
|
|
if err != nil {
|
|
log.Fatalf("Failed to load configuration: %v", err)
|
|
}
|
|
|
|
// Override mode if specified
|
|
if *mode != "" {
|
|
cfg.Mode = *mode
|
|
}
|
|
|
|
log.Printf("Starting Blossom-BitTorrent Gateway in %s mode", cfg.Mode)
|
|
|
|
// Initialize shared storage backend
|
|
var storageBackend *storage.Backend
|
|
if cfg.IsServiceEnabled("gateway") || cfg.IsServiceEnabled("blossom") {
|
|
storageBackend, err = storage.NewBackend(
|
|
cfg.Storage.MetadataDB,
|
|
cfg.Storage.ChunkStorage,
|
|
cfg.Storage.BlobStorage,
|
|
cfg.Storage.ChunkSize,
|
|
cfg,
|
|
)
|
|
if err != nil {
|
|
log.Fatalf("Failed to initialize storage backend: %v", err)
|
|
}
|
|
defer storageBackend.Close()
|
|
|
|
// Start automated cleanup service if enabled
|
|
if cfg.Admin.AutoCleanup {
|
|
log.Printf("Starting automated cleanup service")
|
|
go startCleanupService(storageBackend, cfg)
|
|
}
|
|
}
|
|
|
|
// Track running services
|
|
var wg sync.WaitGroup
|
|
var servers []ServiceManager
|
|
|
|
// Declare gateway instance for DHT integration
|
|
var gatewayInstance *api.Gateway
|
|
|
|
// Declare P2P gateway for unified coordination
|
|
var p2pGateway *p2p.UnifiedP2PGateway
|
|
|
|
// Start Gateway service
|
|
if cfg.IsServiceEnabled("gateway") {
|
|
log.Printf("Starting Gateway service on port %d", cfg.Gateway.Port)
|
|
|
|
r := mux.NewRouter()
|
|
|
|
// Apply security middleware to ALL routes (not just API)
|
|
r.Use(middleware.SecurityHeaders)
|
|
r.Use(middleware.InputSanitization)
|
|
maxUploadSize, _ := cfg.GetMaxUploadSizeBytes()
|
|
r.Use(middleware.RequestSizeLimit(maxUploadSize))
|
|
r.Use(middleware.AntiCrawler)
|
|
r.Use(middleware.CORS)
|
|
|
|
// Register API routes with /api prefix
|
|
apiRouter := r.PathPrefix("/api").Subrouter()
|
|
|
|
// Register main API routes and get gateway instance first
|
|
gatewayInstance = api.RegisterRoutes(apiRouter, cfg, storageBackend)
|
|
|
|
// Register tracker routes on main router (no /api prefix for BitTorrent compatibility)
|
|
wsTracker := api.RegisterTrackerRoutes(r, cfg, storageBackend, gatewayInstance)
|
|
if wsTracker != nil {
|
|
gatewayInstance.SetWebSocketTracker(wsTracker)
|
|
}
|
|
|
|
// Serve static files
|
|
webFS := web.GetFS()
|
|
staticFS, _ := fs.Sub(webFS, "static")
|
|
r.PathPrefix("/static/").Handler(http.StripPrefix("/static/", http.FileServer(http.FS(staticFS))))
|
|
|
|
// Serve main pages
|
|
r.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
|
data, err := web.GetFile("index.html")
|
|
if err != nil {
|
|
http.Error(w, "Page not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/html")
|
|
w.Write(data)
|
|
})
|
|
|
|
r.HandleFunc("/player.html", func(w http.ResponseWriter, r *http.Request) {
|
|
data, err := web.GetFile("player.html")
|
|
if err != nil {
|
|
http.Error(w, "Page not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/html")
|
|
w.Write(data)
|
|
})
|
|
|
|
r.HandleFunc("/admin", func(w http.ResponseWriter, r *http.Request) {
|
|
data, err := web.GetFile("admin.html")
|
|
if err != nil {
|
|
http.Error(w, "Page not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/html")
|
|
w.Write(data)
|
|
})
|
|
|
|
gatewayServer := &http.Server{
|
|
Addr: fmt.Sprintf("0.0.0.0:%d", cfg.Gateway.Port),
|
|
Handler: r,
|
|
}
|
|
|
|
servers = append(servers, &HTTPServiceManager{
|
|
name: "Gateway",
|
|
server: gatewayServer,
|
|
})
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
if err := gatewayServer.ListenAndServe(); err != http.ErrServerClosed {
|
|
log.Printf("Gateway server error: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Start Blossom server
|
|
if cfg.IsServiceEnabled("blossom") {
|
|
log.Printf("Starting Blossom server on port %d", cfg.BlossomServer.Port)
|
|
|
|
blossomServer := blossom.NewServer(storageBackend, &cfg.BlossomServer, cfg)
|
|
|
|
httpServer := &http.Server{
|
|
Addr: fmt.Sprintf(":%d", cfg.BlossomServer.Port),
|
|
Handler: blossomServer,
|
|
}
|
|
|
|
servers = append(servers, &HTTPServiceManager{
|
|
name: "Blossom",
|
|
server: httpServer,
|
|
})
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
|
|
log.Printf("Blossom server error: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Start DHT node
|
|
if cfg.IsServiceEnabled("dht") {
|
|
log.Printf("Starting DHT node on port %d", cfg.DHT.Port)
|
|
|
|
dhtNode, err := dht.NewDHT(&cfg.DHT)
|
|
if err != nil {
|
|
log.Fatalf("Failed to create DHT node: %v", err)
|
|
}
|
|
|
|
// Create DHT bootstrap manager (only if gateway is enabled)
|
|
var dhtBootstrap *dht.DHTBootstrap
|
|
if gatewayInstance != nil {
|
|
dhtBootstrap = dht.NewDHTBootstrap(dhtNode, gatewayInstance, &cfg.DHT)
|
|
}
|
|
|
|
servers = append(servers, &DHTServiceManager{
|
|
name: "DHT",
|
|
node: dhtNode,
|
|
bootstrap: dhtBootstrap,
|
|
})
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
if err := dhtNode.Start(); err != nil {
|
|
log.Printf("DHT node error: %v", err)
|
|
return
|
|
}
|
|
|
|
// Initialize bootstrap functionality after DHT starts (only if gateway is enabled)
|
|
if dhtBootstrap != nil {
|
|
if err := dhtBootstrap.Initialize(); err != nil {
|
|
log.Printf("DHT bootstrap error: %v", err)
|
|
}
|
|
|
|
// Connect DHT bootstrap to gateway
|
|
gatewayInstance.SetDHTBootstrap(dhtBootstrap)
|
|
|
|
// Initialize unified P2P gateway after all components are connected
|
|
if gatewayInstance != nil {
|
|
log.Printf("Initializing unified P2P gateway...")
|
|
p2pGateway = p2p.NewUnifiedP2PGateway(cfg, storageBackend.GetDB())
|
|
|
|
if err := p2pGateway.Initialize(); err != nil {
|
|
log.Printf("P2P gateway initialization error: %v", err)
|
|
} else {
|
|
log.Printf("Unified P2P gateway initialized successfully")
|
|
gatewayInstance.SetP2PGateway(p2pGateway)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Handle shutdown signals
|
|
sigCh := make(chan os.Signal, 1)
|
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
// Wait for shutdown signal
|
|
<-sigCh
|
|
log.Println("Shutdown signal received, stopping services...")
|
|
|
|
// Stop P2P gateway first
|
|
if p2pGateway != nil {
|
|
log.Println("Stopping P2P gateway...")
|
|
if err := p2pGateway.Shutdown(); err != nil {
|
|
log.Printf("Error stopping P2P gateway: %v", err)
|
|
}
|
|
}
|
|
|
|
// Stop all services
|
|
for _, srv := range servers {
|
|
if err := srv.Stop(); err != nil {
|
|
log.Printf("Error stopping %s: %v", srv.Name(), err)
|
|
}
|
|
}
|
|
|
|
// Wait for all services to stop
|
|
wg.Wait()
|
|
log.Println("All services stopped")
|
|
}
|
|
|
|
// ServiceManager interface for managing different types of services
|
|
type ServiceManager interface {
|
|
Stop() error
|
|
Name() string
|
|
}
|
|
|
|
// HTTPServiceManager manages HTTP servers
|
|
type HTTPServiceManager struct {
|
|
name string
|
|
server *http.Server
|
|
}
|
|
|
|
func (h *HTTPServiceManager) Stop() error {
|
|
log.Printf("Stopping %s server", h.name)
|
|
return h.server.Close()
|
|
}
|
|
|
|
func (h *HTTPServiceManager) Name() string {
|
|
return h.name
|
|
}
|
|
|
|
// DHTServiceManager manages DHT nodes
|
|
type DHTServiceManager struct {
|
|
name string
|
|
node *dht.DHT
|
|
bootstrap *dht.DHTBootstrap
|
|
}
|
|
|
|
func (d *DHTServiceManager) Stop() error {
|
|
log.Printf("Stopping %s node", d.name)
|
|
if d.bootstrap != nil {
|
|
d.bootstrap.Stop()
|
|
}
|
|
return d.node.Stop()
|
|
}
|
|
|
|
func (d *DHTServiceManager) Name() string {
|
|
return d.name
|
|
}
|
|
|
|
// startCleanupService starts the automated cleanup service
|
|
func startCleanupService(storageBackend *storage.Backend, cfg *config.Config) {
|
|
// Parse cleanup age
|
|
cleanupAge, err := parseDuration(cfg.Admin.CleanupAge)
|
|
if err != nil {
|
|
log.Printf("Invalid cleanup_age format '%s': %v", cfg.Admin.CleanupAge, err)
|
|
return
|
|
}
|
|
|
|
// Default cleanup interval to 1 hour if not set
|
|
cleanupInterval := 1 * time.Hour
|
|
|
|
log.Printf("Cleanup service started: cleaning files older than %v every %v", cleanupAge, cleanupInterval)
|
|
|
|
ticker := time.NewTicker(cleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Run initial cleanup after 1 minute
|
|
initialDelay := time.NewTimer(1 * time.Minute)
|
|
defer initialDelay.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-initialDelay.C:
|
|
runCleanupCycle(storageBackend, cfg, cleanupAge)
|
|
case <-ticker.C:
|
|
runCleanupCycle(storageBackend, cfg, cleanupAge)
|
|
}
|
|
}
|
|
}
|
|
|
|
// runCleanupCycle performs a full cleanup cycle
|
|
func runCleanupCycle(storageBackend *storage.Backend, cfg *config.Config, cleanupAge time.Duration) {
|
|
log.Printf("Starting cleanup cycle...")
|
|
start := time.Now()
|
|
|
|
// Clean old files
|
|
if fileResults, err := cleanupOldFiles(storageBackend, cleanupAge); err != nil {
|
|
log.Printf("Error cleaning old files: %v", err)
|
|
} else {
|
|
log.Printf("Cleaned old files: %s", formatCleanupResults(fileResults))
|
|
}
|
|
|
|
// Clean orphaned chunks
|
|
if chunkResults, err := cleanupOrphanedChunks(storageBackend, cfg); err != nil {
|
|
log.Printf("Error cleaning orphaned chunks: %v", err)
|
|
} else {
|
|
log.Printf("Cleaned orphaned chunks: %s", formatCleanupResults(chunkResults))
|
|
}
|
|
|
|
// Clean inactive users (users with no activity for 180 days)
|
|
if userResults, err := cleanupInactiveUsers(storageBackend, 180); err != nil {
|
|
log.Printf("Error cleaning inactive users: %v", err)
|
|
} else {
|
|
log.Printf("Cleaned inactive users: %s", formatCleanupResults(userResults))
|
|
}
|
|
|
|
log.Printf("Cleanup cycle completed in %v", time.Since(start))
|
|
}
|
|
|
|
// cleanupOldFiles removes files older than the specified duration
|
|
func cleanupOldFiles(storageBackend *storage.Backend, olderThan time.Duration) (map[string]interface{}, error) {
|
|
cutoffTime := time.Now().Add(-olderThan)
|
|
|
|
rows, err := storageBackend.GetDB().Query(`
|
|
SELECT hash, original_name, storage_type, created_at
|
|
FROM files
|
|
WHERE created_at < ? OR last_access < ?
|
|
`, cutoffTime, cutoffTime)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var deletedCount int
|
|
var totalSize int64
|
|
|
|
for rows.Next() {
|
|
var hash, originalName, storageType string
|
|
var createdAt time.Time
|
|
|
|
if err := rows.Scan(&hash, &originalName, &storageType, &createdAt); err != nil {
|
|
continue
|
|
}
|
|
|
|
// Get file size before deletion
|
|
if metadata, err := storageBackend.GetFileMetadata(hash); err == nil && metadata != nil {
|
|
totalSize += metadata.Size
|
|
}
|
|
|
|
// Delete the file
|
|
if err := storageBackend.AdminDeleteFile(hash); err != nil {
|
|
log.Printf("Error deleting file %s: %v", hash, err)
|
|
continue
|
|
}
|
|
|
|
deletedCount++
|
|
log.Printf("Deleted old file: %s (%s)", originalName, hash[:8])
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"deleted_count": deletedCount,
|
|
"total_size": totalSize,
|
|
}, nil
|
|
}
|
|
|
|
// cleanupOrphanedChunks removes chunks that don't belong to any file
|
|
func cleanupOrphanedChunks(storageBackend *storage.Backend, cfg *config.Config) (map[string]interface{}, error) {
|
|
rows, err := storageBackend.GetDB().Query(`
|
|
SELECT c.chunk_hash, c.size
|
|
FROM chunks c
|
|
LEFT JOIN files f ON c.file_hash = f.hash
|
|
WHERE f.hash IS NULL
|
|
`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var deletedCount int
|
|
var totalSize int64
|
|
|
|
for rows.Next() {
|
|
var chunkHash string
|
|
var size int64
|
|
|
|
if err := rows.Scan(&chunkHash, &size); err != nil {
|
|
continue
|
|
}
|
|
|
|
// Delete chunk record from database
|
|
if _, err := storageBackend.GetDB().Exec("DELETE FROM chunks WHERE chunk_hash = ?", chunkHash); err != nil {
|
|
log.Printf("Error deleting chunk record %s: %v", chunkHash, err)
|
|
continue
|
|
}
|
|
|
|
// Delete chunk file from disk
|
|
chunkPath := fmt.Sprintf("%s/%s", cfg.Storage.ChunkStorage, chunkHash)
|
|
if err := os.Remove(chunkPath); err != nil {
|
|
log.Printf("Error deleting chunk file %s: %v", chunkHash, err)
|
|
}
|
|
|
|
deletedCount++
|
|
totalSize += size
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"deleted_count": deletedCount,
|
|
"total_size": totalSize,
|
|
}, nil
|
|
}
|
|
|
|
// cleanupInactiveUsers removes users with no activity for specified days
|
|
func cleanupInactiveUsers(storageBackend *storage.Backend, days int) (map[string]interface{}, error) {
|
|
cutoffTime := time.Now().AddDate(0, 0, -days)
|
|
|
|
result, err := storageBackend.GetDB().Exec(`
|
|
DELETE FROM users
|
|
WHERE last_activity < ? AND pubkey NOT IN (
|
|
SELECT DISTINCT owner_pubkey FROM files WHERE owner_pubkey IS NOT NULL
|
|
)
|
|
`, cutoffTime)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
deletedCount, _ := result.RowsAffected()
|
|
|
|
return map[string]interface{}{
|
|
"deleted_count": deletedCount,
|
|
}, nil
|
|
}
|
|
|
|
// formatCleanupResults formats cleanup results for logging
|
|
func formatCleanupResults(results map[string]interface{}) string {
|
|
deletedCount := results["deleted_count"].(int)
|
|
if totalSize, hasSize := results["total_size"]; hasSize {
|
|
return fmt.Sprintf("%d items (%s)", deletedCount, formatBytes(totalSize.(int64)))
|
|
}
|
|
return fmt.Sprintf("%d items", deletedCount)
|
|
}
|
|
|
|
// formatBytes formats byte count as human readable string
|
|
func formatBytes(bytes int64) string {
|
|
const unit = 1024
|
|
if bytes < unit {
|
|
return fmt.Sprintf("%d B", bytes)
|
|
}
|
|
div, exp := int64(unit), 0
|
|
for n := bytes / unit; n >= unit; n /= unit {
|
|
div *= unit
|
|
exp++
|
|
}
|
|
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
|
|
}
|
|
|
|
// parseDuration parses duration strings like "90d", "30d", "24h"
|
|
func parseDuration(s string) (time.Duration, error) {
|
|
if len(s) < 2 {
|
|
return 0, fmt.Errorf("invalid duration format")
|
|
}
|
|
|
|
unit := s[len(s)-1:]
|
|
valueStr := s[:len(s)-1]
|
|
value, err := strconv.Atoi(valueStr)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
switch strings.ToLower(unit) {
|
|
case "d":
|
|
return time.Duration(value) * 24 * time.Hour, nil
|
|
case "h":
|
|
return time.Duration(value) * time.Hour, nil
|
|
case "m":
|
|
return time.Duration(value) * time.Minute, nil
|
|
case "s":
|
|
return time.Duration(value) * time.Second, nil
|
|
default:
|
|
return time.ParseDuration(s)
|
|
}
|
|
}
|