package dht import ( "database/sql" "fmt" "log" "net" "sync" "time" "git.sovbit.dev/enki/torrentGateway/internal/config" ) // APINodeInfo represents a DHT node for API compatibility type APINodeInfo struct { IP string Port int } // DHTBootstrap manages DHT bootstrap functionality and persistence type DHTBootstrap struct { node *DHT gateway Gateway knownNodes map[string]time.Time // nodeID -> last seen torrents map[string]bool // announced torrents db *sql.DB config *config.DHTConfig mutex sync.RWMutex startTime time.Time } // Gateway interface for DHT integration type Gateway interface { GetPublicURL() string GetDHTPort() int GetDatabase() *sql.DB GetAllTorrentHashes() []string } // NodeInfo represents a DHT node with reputation type NodeInfo struct { NodeID string `json:"node_id"` IP string `json:"ip"` Port int `json:"port"` LastSeen time.Time `json:"last_seen"` Reputation int `json:"reputation"` } // TorrentAnnounce represents a DHT torrent announcement type TorrentAnnounce struct { InfoHash string `json:"info_hash"` Port int `json:"port"` LastAnnounce time.Time `json:"last_announce"` PeerCount int `json:"peer_count"` } // NewDHTBootstrap creates a new DHT bootstrap manager func NewDHTBootstrap(node *DHT, gateway Gateway, config *config.DHTConfig) *DHTBootstrap { return &DHTBootstrap{ node: node, gateway: gateway, knownNodes: make(map[string]time.Time), torrents: make(map[string]bool), db: gateway.GetDatabase(), config: config, startTime: time.Now(), } } // Initialize sets up DHT bootstrap functionality func (d *DHTBootstrap) Initialize() error { log.Printf("Initializing DHT bootstrap functionality") // Initialize database tables if err := d.initializeTables(); err != nil { return fmt.Errorf("failed to initialize DHT tables: %w", err) } // Load persisted data if err := d.loadPersistedData(); err != nil { log.Printf("Warning: Failed to load persisted DHT data: %v", err) } // Add self as bootstrap node if configured if d.config.BootstrapSelf { if err := d.addSelfAsBootstrap(); err != nil { log.Printf("Warning: Failed to add self as bootstrap: %v", err) } } // Add default bootstrap nodes d.addDefaultBootstrapNodes() // Start announce loop for existing torrents go d.announceLoop() // Start routing table maintenance go d.maintainRoutingTable() // Start node discovery go d.nodeDiscoveryLoop() log.Printf("DHT bootstrap initialized successfully") return nil } // initializeTables creates DHT-related database tables func (d *DHTBootstrap) initializeTables() error { tables := []string{ `CREATE TABLE IF NOT EXISTS dht_announces ( info_hash TEXT PRIMARY KEY, port INTEGER NOT NULL, last_announce TIMESTAMP DEFAULT CURRENT_TIMESTAMP, peer_count INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )`, `CREATE TABLE IF NOT EXISTS dht_nodes ( node_id TEXT PRIMARY KEY, ip TEXT NOT NULL, port INTEGER NOT NULL, last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, reputation INTEGER DEFAULT 0, first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP )`, `CREATE INDEX IF NOT EXISTS idx_dht_announces_last_announce ON dht_announces(last_announce)`, `CREATE INDEX IF NOT EXISTS idx_dht_nodes_last_seen ON dht_nodes(last_seen)`, `CREATE INDEX IF NOT EXISTS idx_dht_nodes_reputation ON dht_nodes(reputation)`, } for _, query := range tables { if _, err := d.db.Exec(query); err != nil { return fmt.Errorf("failed to create table: %w", err) } } log.Printf("DHT database tables initialized") return nil } // loadPersistedData loads DHT state from database func (d *DHTBootstrap) loadPersistedData() error { // Load announced torrents rows, err := d.db.Query(` SELECT info_hash, port FROM dht_announces WHERE last_announce > datetime('now', '-1 day') `) if err != nil { return err } defer rows.Close() count := 0 for rows.Next() { var infoHash string var port int if err := rows.Scan(&infoHash, &port); err != nil { continue } d.torrents[infoHash] = true count++ } // Load known DHT nodes nodeRows, err := d.db.Query(` SELECT node_id, ip, port, last_seen FROM dht_nodes WHERE last_seen > datetime('now', '-6 hours') ORDER BY reputation DESC, last_seen DESC LIMIT 1000 `) if err != nil { return err } defer nodeRows.Close() nodeCount := 0 for nodeRows.Next() { var nodeID, ip string var port int var lastSeen time.Time if err := nodeRows.Scan(&nodeID, &ip, &port, &lastSeen); err != nil { continue } d.knownNodes[nodeID] = lastSeen nodeCount++ } log.Printf("Loaded %d announced torrents and %d known DHT nodes", count, nodeCount) return nil } // addSelfAsBootstrap adds the gateway as a bootstrap node func (d *DHTBootstrap) addSelfAsBootstrap() error { publicURL := d.gateway.GetPublicURL() dhtPort := d.gateway.GetDHTPort() // Parse the public URL to get the hostname selfAddr := fmt.Sprintf("%s:%d", extractHostname(publicURL), dhtPort) // Add to bootstrap nodes list d.config.BootstrapNodes = append([]string{selfAddr}, d.config.BootstrapNodes...) log.Printf("Added self as DHT bootstrap node: %s", selfAddr) return nil } // addDefaultBootstrapNodes adds well-known DHT bootstrap nodes func (d *DHTBootstrap) addDefaultBootstrapNodes() { defaultNodes := []string{ "router.bittorrent.com:6881", "dht.transmissionbt.com:6881", "router.utorrent.com:6881", "dht.libtorrent.org:25401", } // Add default nodes if not already in config for _, node := range defaultNodes { found := false for _, existing := range d.config.BootstrapNodes { if existing == node { found = true break } } if !found { d.config.BootstrapNodes = append(d.config.BootstrapNodes, node) } } log.Printf("DHT bootstrap nodes: %v", d.config.BootstrapNodes) } // announceLoop periodically announces all tracked torrents func (d *DHTBootstrap) announceLoop() { if d.config.AnnounceInterval <= 0 { log.Printf("DHT announce loop disabled (interval <= 0)") return } ticker := time.NewTicker(d.config.AnnounceInterval) defer ticker.Stop() log.Printf("Starting DHT announce loop (interval: %v)", d.config.AnnounceInterval) for { select { case <-ticker.C: d.announceAllTorrents() } } } // announceAllTorrents announces all known torrents to DHT func (d *DHTBootstrap) announceAllTorrents() { d.mutex.RLock() torrents := make([]string, 0, len(d.torrents)) for infoHash := range d.torrents { torrents = append(torrents, infoHash) } d.mutex.RUnlock() // Also get torrents from gateway storage gatewayTorrents := d.gateway.GetAllTorrentHashes() // Merge lists allTorrents := make(map[string]bool) for _, infoHash := range torrents { allTorrents[infoHash] = true } for _, infoHash := range gatewayTorrents { allTorrents[infoHash] = true } // Announce each torrent count := 0 port := d.gateway.GetDHTPort() for infoHash := range allTorrents { d.node.Announce(infoHash, port) d.updateDHTAnnounce(infoHash, port) count++ } if count > 0 { log.Printf("Announced %d torrents to DHT", count) } } // AnnounceNewTorrent immediately announces a new torrent to DHT func (d *DHTBootstrap) AnnounceNewTorrent(infoHash string, port int) { d.mutex.Lock() d.torrents[infoHash] = true d.mutex.Unlock() // Immediately announce to DHT d.node.Announce(infoHash, port) d.updateDHTAnnounce(infoHash, port) log.Printf("Announced new torrent to DHT: %s", infoHash[:8]) } // updateDHTAnnounce updates announce record in database func (d *DHTBootstrap) updateDHTAnnounce(infoHash string, port int) { _, err := d.db.Exec(` INSERT OR REPLACE INTO dht_announces (info_hash, port, last_announce, peer_count) VALUES (?, ?, CURRENT_TIMESTAMP, COALESCE((SELECT peer_count FROM dht_announces WHERE info_hash = ?), 0)) `, infoHash, port, infoHash) if err != nil { log.Printf("Failed to update DHT announce record: %v", err) } } // maintainRoutingTable performs routing table maintenance func (d *DHTBootstrap) maintainRoutingTable() { cleanupInterval := 5 * time.Minute if d.config.CleanupInterval > 0 { cleanupInterval = d.config.CleanupInterval } ticker := time.NewTicker(cleanupInterval) defer ticker.Stop() log.Printf("Starting DHT routing table maintenance (interval: %v)", cleanupInterval) for range ticker.C { d.refreshBuckets() d.cleanDeadNodes() d.pruneOldData() } } // refreshBuckets refreshes DHT routing table buckets func (d *DHTBootstrap) refreshBuckets() { // In a real implementation, this would send find_node queries // to refresh buckets that haven't been active stats := d.node.GetStats() d.mutex.Lock() defer d.mutex.Unlock() // Update node count in known nodes activeNodes := 0 now := time.Now() cutoff := now.Add(-30 * time.Minute) for nodeID, lastSeen := range d.knownNodes { if lastSeen.After(cutoff) { activeNodes++ } else { delete(d.knownNodes, nodeID) } } log.Printf("DHT bucket refresh: %d nodes in routing table, %d known nodes, %d stored items", stats.NodesInTable, activeNodes, stats.StoredItems) } // cleanDeadNodes removes expired nodes from database func (d *DHTBootstrap) cleanDeadNodes() { cutoff := time.Now().Add(-6 * time.Hour) result, err := d.db.Exec(` DELETE FROM dht_nodes WHERE last_seen < ? `, cutoff) if err != nil { log.Printf("Failed to clean dead DHT nodes: %v", err) return } if rowsAffected, _ := result.RowsAffected(); rowsAffected > 0 { log.Printf("Cleaned %d dead DHT nodes", rowsAffected) } } // pruneOldData removes old DHT announce data func (d *DHTBootstrap) pruneOldData() { // Remove announces older than 7 days cutoff := time.Now().Add(-7 * 24 * time.Hour) result, err := d.db.Exec(` DELETE FROM dht_announces WHERE last_announce < ? `, cutoff) if err != nil { log.Printf("Failed to prune old DHT announces: %v", err) return } if rowsAffected, _ := result.RowsAffected(); rowsAffected > 0 { log.Printf("Pruned %d old DHT announces", rowsAffected) } } // nodeDiscoveryLoop discovers and tracks new DHT nodes func (d *DHTBootstrap) nodeDiscoveryLoop() { ticker := time.NewTicker(10 * time.Minute) defer ticker.Stop() log.Printf("Starting DHT node discovery loop") for range ticker.C { d.discoverNewNodes() } } // discoverNewNodes attempts to discover new DHT nodes func (d *DHTBootstrap) discoverNewNodes() { // In a real implementation, this would: // 1. Send find_node queries to known nodes // 2. Parse responses to discover new nodes // 3. Add new nodes to routing table and database stats := d.node.GetStats() log.Printf("DHT node discovery: %d nodes in routing table", stats.NodesInTable) } // AddKnownNode adds a newly discovered node to our knowledge base func (d *DHTBootstrap) AddKnownNode(nodeID, ip string, port int, reputation int) { d.mutex.Lock() defer d.mutex.Unlock() now := time.Now() d.knownNodes[nodeID] = now // Store in database _, err := d.db.Exec(` INSERT OR REPLACE INTO dht_nodes (node_id, ip, port, last_seen, reputation) VALUES (?, ?, ?, ?, ?) `, nodeID, ip, port, now, reputation) if err != nil { log.Printf("Failed to store DHT node: %v", err) } } // GetDHTStats returns comprehensive DHT statistics func (d *DHTBootstrap) GetDHTStats() map[string]interface{} { d.mutex.RLock() knownNodesCount := len(d.knownNodes) announcedTorrents := len(d.torrents) d.mutex.RUnlock() nodeStats := d.node.GetStats() // Get database stats var totalAnnounces, totalNodes int64 d.db.QueryRow(`SELECT COUNT(*) FROM dht_announces`).Scan(&totalAnnounces) d.db.QueryRow(`SELECT COUNT(*) FROM dht_nodes`).Scan(&totalNodes) // Get recent activity var recentAnnounces, activeNodes int64 d.db.QueryRow(`SELECT COUNT(*) FROM dht_announces WHERE last_announce > datetime('now', '-1 hour')`).Scan(&recentAnnounces) d.db.QueryRow(`SELECT COUNT(*) FROM dht_nodes WHERE last_seen > datetime('now', '-1 hour')`).Scan(&activeNodes) return map[string]interface{}{ "node_id": fmt.Sprintf("%x", d.node.nodeID), "routing_table_size": nodeStats.NodesInTable, "active_torrents": announcedTorrents, "total_announces": totalAnnounces, "recent_announces": recentAnnounces, "known_nodes": knownNodesCount, "total_nodes": totalNodes, "active_nodes": activeNodes, "packets_sent": nodeStats.PacketsSent, "packets_received": nodeStats.PacketsReceived, "stored_items": nodeStats.StoredItems, "uptime": time.Since(d.startTime).String(), "bootstrap_nodes": len(d.config.BootstrapNodes), } } // GetTorrentStats returns DHT statistics for a specific torrent func (d *DHTBootstrap) GetTorrentStats(infoHash string) map[string]interface{} { var announce TorrentAnnounce err := d.db.QueryRow(` SELECT info_hash, port, last_announce, peer_count FROM dht_announces WHERE info_hash = ? `, infoHash).Scan(&announce.InfoHash, &announce.Port, &announce.LastAnnounce, &announce.PeerCount) if err != nil { return map[string]interface{}{ "info_hash": infoHash, "announced": false, "last_announce": nil, "peer_count": 0, } } return map[string]interface{}{ "info_hash": announce.InfoHash, "announced": true, "port": announce.Port, "last_announce": announce.LastAnnounce.Format(time.RFC3339), "peer_count": announce.PeerCount, } } // Stop gracefully shuts down DHT bootstrap functionality func (d *DHTBootstrap) Stop() error { log.Printf("Stopping DHT bootstrap functionality") // Persist current state d.mutex.RLock() defer d.mutex.RUnlock() // Update final announce times for infoHash := range d.torrents { d.updateDHTAnnounce(infoHash, d.gateway.GetDHTPort()) } log.Printf("DHT bootstrap stopped, persisted %d torrents", len(d.torrents)) return nil } // Helper functions // extractHostname extracts hostname from URL func extractHostname(url string) string { // Simple URL parsing - in production use net/url if host, _, err := net.SplitHostPort(url); err == nil { return host } // Fallback for URLs without port return url } // isValidNodeID checks if a node ID is valid func isValidNodeID(nodeID string) bool { return len(nodeID) == NodeIDLength*2 // hex-encoded 20 bytes } // ForceAnnounce manually triggers announcement of all torrents func (d *DHTBootstrap) ForceAnnounce() map[string]interface{} { before := d.GetDHTStats() d.announceAllTorrents() after := d.GetDHTStats() return map[string]interface{}{ "before": before, "after": after, "action": "force_announce", } } // GetActiveBootstrapNodes returns currently active bootstrap nodes func (d *DHTBootstrap) GetActiveBootstrapNodes() []NodeInfo { var activeNodes []NodeInfo cutoff := time.Now().Add(-1 * time.Hour) rows, err := d.db.Query(` SELECT node_id, ip, port, last_seen, reputation FROM dht_nodes WHERE last_seen > ? AND reputation > 0 ORDER BY reputation DESC, last_seen DESC LIMIT 50 `, cutoff) if err != nil { return activeNodes } defer rows.Close() for rows.Next() { var node NodeInfo if err := rows.Scan(&node.NodeID, &node.IP, &node.Port, &node.LastSeen, &node.Reputation); err != nil { continue } activeNodes = append(activeNodes, node) } return activeNodes } // GetBootstrapNodes returns nodes in API-compatible format for interface compliance func (d *DHTBootstrap) GetBootstrapNodes() []APINodeInfo { var nodes []APINodeInfo // Add self if configured if d.config.BootstrapSelf { publicURL := d.gateway.GetPublicURL() selfNode := APINodeInfo{ IP: extractHostname(publicURL), Port: d.gateway.GetDHTPort(), } nodes = append(nodes, selfNode) } // Add other good nodes from database rows, err := d.db.Query(` SELECT ip, port FROM dht_nodes WHERE last_seen > datetime('now', '-1 hour') ORDER BY reputation DESC, last_seen DESC LIMIT 20 `) if err != nil { log.Printf("Failed to query DHT nodes: %v", err) return nodes } defer rows.Close() for rows.Next() { var node APINodeInfo if err := rows.Scan(&node.IP, &node.Port); err != nil { continue } nodes = append(nodes, node) } return nodes } // GetBootstrapNodesInternal returns nodes with full NodeInfo structure func (d *DHTBootstrap) GetBootstrapNodesInternal() []NodeInfo { var nodes []NodeInfo // Add self if configured if d.config.BootstrapSelf { publicURL := d.gateway.GetPublicURL() selfNode := NodeInfo{ NodeID: fmt.Sprintf("%x", d.node.nodeID), IP: extractHostname(publicURL), Port: d.gateway.GetDHTPort(), LastSeen: time.Now(), Reputation: 100, // High reputation for self } nodes = append(nodes, selfNode) } // Add other good nodes from database rows, err := d.db.Query(` SELECT node_id, ip, port, last_seen, reputation FROM dht_nodes WHERE last_seen > datetime('now', '-1 hour') ORDER BY reputation DESC, last_seen DESC LIMIT 20 `) if err != nil { log.Printf("Failed to query DHT nodes: %v", err) return nodes } defer rows.Close() for rows.Next() { var node NodeInfo if err := rows.Scan(&node.NodeID, &node.IP, &node.Port, &node.LastSeen, &node.Reputation); err != nil { continue } nodes = append(nodes, node) } return nodes }