enki b3204ea07a
Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / Build Docker Images (push) Blocked by required conditions
CI Pipeline / E2E Tests (push) Blocked by required conditions
first commit
2025-08-18 00:40:15 -07:00

656 lines
17 KiB
Go

package dht
import (
"database/sql"
"fmt"
"log"
"net"
"sync"
"time"
"git.sovbit.dev/enki/torrentGateway/internal/config"
)
// APINodeInfo represents a DHT node for API compatibility
type APINodeInfo struct {
IP string
Port int
}
// DHTBootstrap manages DHT bootstrap functionality and persistence
type DHTBootstrap struct {
node *DHT
gateway Gateway
knownNodes map[string]time.Time // nodeID -> last seen
torrents map[string]bool // announced torrents
db *sql.DB
config *config.DHTConfig
mutex sync.RWMutex
startTime time.Time
}
// Gateway interface for DHT integration
type Gateway interface {
GetPublicURL() string
GetDHTPort() int
GetDatabase() *sql.DB
GetAllTorrentHashes() []string
}
// NodeInfo represents a DHT node with reputation
type NodeInfo struct {
NodeID string `json:"node_id"`
IP string `json:"ip"`
Port int `json:"port"`
LastSeen time.Time `json:"last_seen"`
Reputation int `json:"reputation"`
}
// TorrentAnnounce represents a DHT torrent announcement
type TorrentAnnounce struct {
InfoHash string `json:"info_hash"`
Port int `json:"port"`
LastAnnounce time.Time `json:"last_announce"`
PeerCount int `json:"peer_count"`
}
// NewDHTBootstrap creates a new DHT bootstrap manager
func NewDHTBootstrap(node *DHT, gateway Gateway, config *config.DHTConfig) *DHTBootstrap {
return &DHTBootstrap{
node: node,
gateway: gateway,
knownNodes: make(map[string]time.Time),
torrents: make(map[string]bool),
db: gateway.GetDatabase(),
config: config,
startTime: time.Now(),
}
}
// Initialize sets up DHT bootstrap functionality
func (d *DHTBootstrap) Initialize() error {
log.Printf("Initializing DHT bootstrap functionality")
// Initialize database tables
if err := d.initializeTables(); err != nil {
return fmt.Errorf("failed to initialize DHT tables: %w", err)
}
// Load persisted data
if err := d.loadPersistedData(); err != nil {
log.Printf("Warning: Failed to load persisted DHT data: %v", err)
}
// Add self as bootstrap node if configured
if d.config.BootstrapSelf {
if err := d.addSelfAsBootstrap(); err != nil {
log.Printf("Warning: Failed to add self as bootstrap: %v", err)
}
}
// Add default bootstrap nodes
d.addDefaultBootstrapNodes()
// Start announce loop for existing torrents
go d.announceLoop()
// Start routing table maintenance
go d.maintainRoutingTable()
// Start node discovery
go d.nodeDiscoveryLoop()
log.Printf("DHT bootstrap initialized successfully")
return nil
}
// initializeTables creates DHT-related database tables
func (d *DHTBootstrap) initializeTables() error {
tables := []string{
`CREATE TABLE IF NOT EXISTS dht_announces (
info_hash TEXT PRIMARY KEY,
port INTEGER NOT NULL,
last_announce TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
peer_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)`,
`CREATE TABLE IF NOT EXISTS dht_nodes (
node_id TEXT PRIMARY KEY,
ip TEXT NOT NULL,
port INTEGER NOT NULL,
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
reputation INTEGER DEFAULT 0,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)`,
`CREATE INDEX IF NOT EXISTS idx_dht_announces_last_announce ON dht_announces(last_announce)`,
`CREATE INDEX IF NOT EXISTS idx_dht_nodes_last_seen ON dht_nodes(last_seen)`,
`CREATE INDEX IF NOT EXISTS idx_dht_nodes_reputation ON dht_nodes(reputation)`,
}
for _, query := range tables {
if _, err := d.db.Exec(query); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
}
log.Printf("DHT database tables initialized")
return nil
}
// loadPersistedData loads DHT state from database
func (d *DHTBootstrap) loadPersistedData() error {
// Load announced torrents
rows, err := d.db.Query(`
SELECT info_hash, port FROM dht_announces
WHERE last_announce > datetime('now', '-1 day')
`)
if err != nil {
return err
}
defer rows.Close()
count := 0
for rows.Next() {
var infoHash string
var port int
if err := rows.Scan(&infoHash, &port); err != nil {
continue
}
d.torrents[infoHash] = true
count++
}
// Load known DHT nodes
nodeRows, err := d.db.Query(`
SELECT node_id, ip, port, last_seen FROM dht_nodes
WHERE last_seen > datetime('now', '-6 hours')
ORDER BY reputation DESC, last_seen DESC
LIMIT 1000
`)
if err != nil {
return err
}
defer nodeRows.Close()
nodeCount := 0
for nodeRows.Next() {
var nodeID, ip string
var port int
var lastSeen time.Time
if err := nodeRows.Scan(&nodeID, &ip, &port, &lastSeen); err != nil {
continue
}
d.knownNodes[nodeID] = lastSeen
nodeCount++
}
log.Printf("Loaded %d announced torrents and %d known DHT nodes", count, nodeCount)
return nil
}
// addSelfAsBootstrap adds the gateway as a bootstrap node
func (d *DHTBootstrap) addSelfAsBootstrap() error {
publicURL := d.gateway.GetPublicURL()
dhtPort := d.gateway.GetDHTPort()
// Parse the public URL to get the hostname
selfAddr := fmt.Sprintf("%s:%d", extractHostname(publicURL), dhtPort)
// Add to bootstrap nodes list
d.config.BootstrapNodes = append([]string{selfAddr}, d.config.BootstrapNodes...)
log.Printf("Added self as DHT bootstrap node: %s", selfAddr)
return nil
}
// addDefaultBootstrapNodes adds well-known DHT bootstrap nodes
func (d *DHTBootstrap) addDefaultBootstrapNodes() {
defaultNodes := []string{
"router.bittorrent.com:6881",
"dht.transmissionbt.com:6881",
"router.utorrent.com:6881",
"dht.libtorrent.org:25401",
}
// Add default nodes if not already in config
for _, node := range defaultNodes {
found := false
for _, existing := range d.config.BootstrapNodes {
if existing == node {
found = true
break
}
}
if !found {
d.config.BootstrapNodes = append(d.config.BootstrapNodes, node)
}
}
log.Printf("DHT bootstrap nodes: %v", d.config.BootstrapNodes)
}
// announceLoop periodically announces all tracked torrents
func (d *DHTBootstrap) announceLoop() {
if d.config.AnnounceInterval <= 0 {
log.Printf("DHT announce loop disabled (interval <= 0)")
return
}
ticker := time.NewTicker(d.config.AnnounceInterval)
defer ticker.Stop()
log.Printf("Starting DHT announce loop (interval: %v)", d.config.AnnounceInterval)
for {
select {
case <-ticker.C:
d.announceAllTorrents()
}
}
}
// announceAllTorrents announces all known torrents to DHT
func (d *DHTBootstrap) announceAllTorrents() {
d.mutex.RLock()
torrents := make([]string, 0, len(d.torrents))
for infoHash := range d.torrents {
torrents = append(torrents, infoHash)
}
d.mutex.RUnlock()
// Also get torrents from gateway storage
gatewayTorrents := d.gateway.GetAllTorrentHashes()
// Merge lists
allTorrents := make(map[string]bool)
for _, infoHash := range torrents {
allTorrents[infoHash] = true
}
for _, infoHash := range gatewayTorrents {
allTorrents[infoHash] = true
}
// Announce each torrent
count := 0
port := d.gateway.GetDHTPort()
for infoHash := range allTorrents {
d.node.Announce(infoHash, port)
d.updateDHTAnnounce(infoHash, port)
count++
}
if count > 0 {
log.Printf("Announced %d torrents to DHT", count)
}
}
// AnnounceNewTorrent immediately announces a new torrent to DHT
func (d *DHTBootstrap) AnnounceNewTorrent(infoHash string, port int) {
d.mutex.Lock()
d.torrents[infoHash] = true
d.mutex.Unlock()
// Immediately announce to DHT
d.node.Announce(infoHash, port)
d.updateDHTAnnounce(infoHash, port)
log.Printf("Announced new torrent to DHT: %s", infoHash[:8])
}
// updateDHTAnnounce updates announce record in database
func (d *DHTBootstrap) updateDHTAnnounce(infoHash string, port int) {
_, err := d.db.Exec(`
INSERT OR REPLACE INTO dht_announces (info_hash, port, last_announce, peer_count)
VALUES (?, ?, CURRENT_TIMESTAMP,
COALESCE((SELECT peer_count FROM dht_announces WHERE info_hash = ?), 0))
`, infoHash, port, infoHash)
if err != nil {
log.Printf("Failed to update DHT announce record: %v", err)
}
}
// maintainRoutingTable performs routing table maintenance
func (d *DHTBootstrap) maintainRoutingTable() {
cleanupInterval := 5 * time.Minute
if d.config.CleanupInterval > 0 {
cleanupInterval = d.config.CleanupInterval
}
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
log.Printf("Starting DHT routing table maintenance (interval: %v)", cleanupInterval)
for range ticker.C {
d.refreshBuckets()
d.cleanDeadNodes()
d.pruneOldData()
}
}
// refreshBuckets refreshes DHT routing table buckets
func (d *DHTBootstrap) refreshBuckets() {
// In a real implementation, this would send find_node queries
// to refresh buckets that haven't been active
stats := d.node.GetStats()
d.mutex.Lock()
defer d.mutex.Unlock()
// Update node count in known nodes
activeNodes := 0
now := time.Now()
cutoff := now.Add(-30 * time.Minute)
for nodeID, lastSeen := range d.knownNodes {
if lastSeen.After(cutoff) {
activeNodes++
} else {
delete(d.knownNodes, nodeID)
}
}
log.Printf("DHT bucket refresh: %d nodes in routing table, %d known nodes, %d stored items",
stats.NodesInTable, activeNodes, stats.StoredItems)
}
// cleanDeadNodes removes expired nodes from database
func (d *DHTBootstrap) cleanDeadNodes() {
cutoff := time.Now().Add(-6 * time.Hour)
result, err := d.db.Exec(`
DELETE FROM dht_nodes WHERE last_seen < ?
`, cutoff)
if err != nil {
log.Printf("Failed to clean dead DHT nodes: %v", err)
return
}
if rowsAffected, _ := result.RowsAffected(); rowsAffected > 0 {
log.Printf("Cleaned %d dead DHT nodes", rowsAffected)
}
}
// pruneOldData removes old DHT announce data
func (d *DHTBootstrap) pruneOldData() {
// Remove announces older than 7 days
cutoff := time.Now().Add(-7 * 24 * time.Hour)
result, err := d.db.Exec(`
DELETE FROM dht_announces WHERE last_announce < ?
`, cutoff)
if err != nil {
log.Printf("Failed to prune old DHT announces: %v", err)
return
}
if rowsAffected, _ := result.RowsAffected(); rowsAffected > 0 {
log.Printf("Pruned %d old DHT announces", rowsAffected)
}
}
// nodeDiscoveryLoop discovers and tracks new DHT nodes
func (d *DHTBootstrap) nodeDiscoveryLoop() {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
log.Printf("Starting DHT node discovery loop")
for range ticker.C {
d.discoverNewNodes()
}
}
// discoverNewNodes attempts to discover new DHT nodes
func (d *DHTBootstrap) discoverNewNodes() {
// In a real implementation, this would:
// 1. Send find_node queries to known nodes
// 2. Parse responses to discover new nodes
// 3. Add new nodes to routing table and database
stats := d.node.GetStats()
log.Printf("DHT node discovery: %d nodes in routing table", stats.NodesInTable)
}
// AddKnownNode adds a newly discovered node to our knowledge base
func (d *DHTBootstrap) AddKnownNode(nodeID, ip string, port int, reputation int) {
d.mutex.Lock()
defer d.mutex.Unlock()
now := time.Now()
d.knownNodes[nodeID] = now
// Store in database
_, err := d.db.Exec(`
INSERT OR REPLACE INTO dht_nodes (node_id, ip, port, last_seen, reputation)
VALUES (?, ?, ?, ?, ?)
`, nodeID, ip, port, now, reputation)
if err != nil {
log.Printf("Failed to store DHT node: %v", err)
}
}
// GetDHTStats returns comprehensive DHT statistics
func (d *DHTBootstrap) GetDHTStats() map[string]interface{} {
d.mutex.RLock()
knownNodesCount := len(d.knownNodes)
announcedTorrents := len(d.torrents)
d.mutex.RUnlock()
nodeStats := d.node.GetStats()
// Get database stats
var totalAnnounces, totalNodes int64
d.db.QueryRow(`SELECT COUNT(*) FROM dht_announces`).Scan(&totalAnnounces)
d.db.QueryRow(`SELECT COUNT(*) FROM dht_nodes`).Scan(&totalNodes)
// Get recent activity
var recentAnnounces, activeNodes int64
d.db.QueryRow(`SELECT COUNT(*) FROM dht_announces WHERE last_announce > datetime('now', '-1 hour')`).Scan(&recentAnnounces)
d.db.QueryRow(`SELECT COUNT(*) FROM dht_nodes WHERE last_seen > datetime('now', '-1 hour')`).Scan(&activeNodes)
return map[string]interface{}{
"node_id": fmt.Sprintf("%x", d.node.nodeID),
"routing_table_size": nodeStats.NodesInTable,
"active_torrents": announcedTorrents,
"total_announces": totalAnnounces,
"recent_announces": recentAnnounces,
"known_nodes": knownNodesCount,
"total_nodes": totalNodes,
"active_nodes": activeNodes,
"packets_sent": nodeStats.PacketsSent,
"packets_received": nodeStats.PacketsReceived,
"stored_items": nodeStats.StoredItems,
"uptime": time.Since(d.startTime).String(),
"bootstrap_nodes": len(d.config.BootstrapNodes),
}
}
// GetTorrentStats returns DHT statistics for a specific torrent
func (d *DHTBootstrap) GetTorrentStats(infoHash string) map[string]interface{} {
var announce TorrentAnnounce
err := d.db.QueryRow(`
SELECT info_hash, port, last_announce, peer_count
FROM dht_announces
WHERE info_hash = ?
`, infoHash).Scan(&announce.InfoHash, &announce.Port, &announce.LastAnnounce, &announce.PeerCount)
if err != nil {
return map[string]interface{}{
"info_hash": infoHash,
"announced": false,
"last_announce": nil,
"peer_count": 0,
}
}
return map[string]interface{}{
"info_hash": announce.InfoHash,
"announced": true,
"port": announce.Port,
"last_announce": announce.LastAnnounce.Format(time.RFC3339),
"peer_count": announce.PeerCount,
}
}
// Stop gracefully shuts down DHT bootstrap functionality
func (d *DHTBootstrap) Stop() error {
log.Printf("Stopping DHT bootstrap functionality")
// Persist current state
d.mutex.RLock()
defer d.mutex.RUnlock()
// Update final announce times
for infoHash := range d.torrents {
d.updateDHTAnnounce(infoHash, d.gateway.GetDHTPort())
}
log.Printf("DHT bootstrap stopped, persisted %d torrents", len(d.torrents))
return nil
}
// Helper functions
// extractHostname extracts hostname from URL
func extractHostname(url string) string {
// Simple URL parsing - in production use net/url
if host, _, err := net.SplitHostPort(url); err == nil {
return host
}
// Fallback for URLs without port
return url
}
// isValidNodeID checks if a node ID is valid
func isValidNodeID(nodeID string) bool {
return len(nodeID) == NodeIDLength*2 // hex-encoded 20 bytes
}
// ForceAnnounce manually triggers announcement of all torrents
func (d *DHTBootstrap) ForceAnnounce() map[string]interface{} {
before := d.GetDHTStats()
d.announceAllTorrents()
after := d.GetDHTStats()
return map[string]interface{}{
"before": before,
"after": after,
"action": "force_announce",
}
}
// GetActiveBootstrapNodes returns currently active bootstrap nodes
func (d *DHTBootstrap) GetActiveBootstrapNodes() []NodeInfo {
var activeNodes []NodeInfo
cutoff := time.Now().Add(-1 * time.Hour)
rows, err := d.db.Query(`
SELECT node_id, ip, port, last_seen, reputation
FROM dht_nodes
WHERE last_seen > ? AND reputation > 0
ORDER BY reputation DESC, last_seen DESC
LIMIT 50
`, cutoff)
if err != nil {
return activeNodes
}
defer rows.Close()
for rows.Next() {
var node NodeInfo
if err := rows.Scan(&node.NodeID, &node.IP, &node.Port, &node.LastSeen, &node.Reputation); err != nil {
continue
}
activeNodes = append(activeNodes, node)
}
return activeNodes
}
// GetBootstrapNodes returns nodes in API-compatible format for interface compliance
func (d *DHTBootstrap) GetBootstrapNodes() []APINodeInfo {
var nodes []APINodeInfo
// Add self if configured
if d.config.BootstrapSelf {
publicURL := d.gateway.GetPublicURL()
selfNode := APINodeInfo{
IP: extractHostname(publicURL),
Port: d.gateway.GetDHTPort(),
}
nodes = append(nodes, selfNode)
}
// Add other good nodes from database
rows, err := d.db.Query(`
SELECT ip, port FROM dht_nodes
WHERE last_seen > datetime('now', '-1 hour')
ORDER BY reputation DESC, last_seen DESC
LIMIT 20
`)
if err != nil {
log.Printf("Failed to query DHT nodes: %v", err)
return nodes
}
defer rows.Close()
for rows.Next() {
var node APINodeInfo
if err := rows.Scan(&node.IP, &node.Port); err != nil {
continue
}
nodes = append(nodes, node)
}
return nodes
}
// GetBootstrapNodesInternal returns nodes with full NodeInfo structure
func (d *DHTBootstrap) GetBootstrapNodesInternal() []NodeInfo {
var nodes []NodeInfo
// Add self if configured
if d.config.BootstrapSelf {
publicURL := d.gateway.GetPublicURL()
selfNode := NodeInfo{
NodeID: fmt.Sprintf("%x", d.node.nodeID),
IP: extractHostname(publicURL),
Port: d.gateway.GetDHTPort(),
LastSeen: time.Now(),
Reputation: 100, // High reputation for self
}
nodes = append(nodes, selfNode)
}
// Add other good nodes from database
rows, err := d.db.Query(`
SELECT node_id, ip, port, last_seen, reputation
FROM dht_nodes
WHERE last_seen > datetime('now', '-1 hour')
ORDER BY reputation DESC, last_seen DESC
LIMIT 20
`)
if err != nil {
log.Printf("Failed to query DHT nodes: %v", err)
return nodes
}
defer rows.Close()
for rows.Next() {
var node NodeInfo
if err := rows.Scan(&node.NodeID, &node.IP, &node.Port, &node.LastSeen, &node.Reputation); err != nil {
continue
}
nodes = append(nodes, node)
}
return nodes
}