torrent-gateway/internal/p2p/coordinator.go
enki 7c92aa3ded
Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / E2E Tests (push) Blocked by required conditions
Major DHT and Torrent fixes.
2025-08-29 21:18:36 -07:00

751 lines
20 KiB
Go

package p2p
import (
"fmt"
"log"
"net"
"sort"
"sync"
"time"
"torrentGateway/internal/dht"
trackerPkg "torrentGateway/internal/tracker"
)
// PeerInfo represents a peer from any source (tracker, DHT, WebSeed)
type PeerInfo struct {
IP string
Port int
PeerID string
Source string // "tracker", "dht", "webseed"
Quality int // Higher is better
LastSeen time.Time
}
// P2PCoordinator manages integration between tracker, DHT, and WebSeed
type P2PCoordinator struct {
tracker *trackerPkg.Tracker
dht *dht.DHTBootstrap
gateway Gateway
announcer *Announcer
// Peer management
peerCache map[string][]PeerInfo // infoHash -> peers
cacheMutex sync.RWMutex
// Re-announce management
activeTorrents map[string]*TorrentInfo // Track active torrents
torrentsMutex sync.RWMutex
stopReannounce chan struct{}
// Configuration
preferWebSeed bool
announceToAll bool
peerExchange bool
maxPeersReturn int
}
// Gateway interface for P2P coordinator
type Gateway interface {
CreateTorrent(fileHash string) (*TorrentInfo, error)
WebSeedPeer() PeerInfo
EnableWebSeed(infoHash string) error
PublishToNostr(torrent *TorrentInfo) error
GetPort() int
}
// TorrentInfo represents torrent metadata
type TorrentInfo struct {
InfoHash string `json:"info_hash"`
Name string `json:"name"`
Size int64 `json:"size"`
PieceLength int `json:"piece_length"`
Pieces []string `json:"pieces"`
WebSeedURL string `json:"webseed_url,omitempty"`
CreatedAt time.Time `json:"created_at"`
LastAnnounce time.Time `json:"last_announce"`
IsActive bool `json:"is_active"`
}
// Announcer handles Nostr announcements
type Announcer interface {
AnnounceNewTorrent(torrent *TorrentInfo) error
}
// NewCoordinator creates a new P2P coordinator
func NewCoordinator(gateway Gateway, tracker *trackerPkg.Tracker, dht *dht.DHTBootstrap) *P2PCoordinator {
c := &P2PCoordinator{
tracker: tracker,
dht: dht,
gateway: gateway,
peerCache: make(map[string][]PeerInfo),
activeTorrents: make(map[string]*TorrentInfo),
stopReannounce: make(chan struct{}),
preferWebSeed: true,
announceToAll: true,
peerExchange: true,
maxPeersReturn: 50,
}
// Start periodic re-announce routine
go c.periodicReannounce()
return c
}
// OnFileUploaded coordinates all P2P components when a file is uploaded
func (p *P2PCoordinator) OnFileUploaded(fileHash string, filename string) error {
log.Printf("P2P: Coordinating upload for file %s (%s)", fileHash[:8], filename)
// 1. Create torrent
torrent, err := p.gateway.CreateTorrent(fileHash)
if err != nil {
return fmt.Errorf("failed to create torrent: %v", err)
}
// Store torrent for periodic re-announces
p.torrentsMutex.Lock()
p.activeTorrents[torrent.InfoHash] = torrent
p.torrentsMutex.Unlock()
// 2. Enable WebSeed serving first (most reliable source)
err = p.gateway.EnableWebSeed(torrent.InfoHash)
if err != nil {
log.Printf("P2P: Failed to enable WebSeed: %v", err)
} else {
log.Printf("P2P: Enabled WebSeed for torrent %s", torrent.InfoHash[:8])
}
// 3. Register WebSeed with tracker to make it available to peers
if p.tracker != nil {
// Create WebSeed peer info
webSeedPeer := p.createWebSeedPeerInfo(torrent.InfoHash)
// Store WebSeed peer directly in tracker database
if err := p.storeWebSeedInTracker(torrent.InfoHash, webSeedPeer); err != nil {
log.Printf("P2P: Failed to register WebSeed with tracker: %v", err)
} else {
log.Printf("P2P: Registered WebSeed with tracker for %s", torrent.InfoHash[:8])
}
}
// 4. Announce to DHT network for peer discovery
if p.dht != nil {
go func() {
// Delay DHT announce slightly to ensure WebSeed is ready
time.Sleep(2 * time.Second)
p.dht.AnnounceNewTorrent(torrent.InfoHash, p.gateway.GetPort())
log.Printf("P2P: Announced torrent %s to DHT", torrent.InfoHash[:8])
}()
}
// 5. Publish to Nostr if announcer is available
if p.announcer != nil && *p.announcer != nil {
go func() {
err := (*p.announcer).AnnounceNewTorrent(torrent)
if err != nil {
log.Printf("P2P: Failed to announce to Nostr: %v", err)
} else {
log.Printf("P2P: Published torrent %s to Nostr", torrent.InfoHash[:8])
}
}()
}
// 6. Schedule immediate re-announce to ensure availability
go func() {
time.Sleep(5 * time.Second) // Give systems time to initialize
p.reannounceToAll(torrent)
}()
log.Printf("P2P: Successfully coordinated torrent %s across all systems", torrent.InfoHash[:8])
return nil
}
// createWebSeedPeerInfo creates peer info for the WebSeed
func (p *P2PCoordinator) createWebSeedPeerInfo(infoHash string) PeerInfo {
webSeedPeer := p.gateway.WebSeedPeer()
webSeedPeer.Source = "webseed"
webSeedPeer.Quality = 100
webSeedPeer.LastSeen = time.Now()
return webSeedPeer
}
// storeWebSeedInTracker stores WebSeed directly in tracker database
func (p *P2PCoordinator) storeWebSeedInTracker(infoHash string, webSeedPeer PeerInfo) error {
if p.tracker == nil {
return fmt.Errorf("no tracker available")
}
// This would need to be implemented based on tracker's internal structure
// For now, we'll just log the intention
log.Printf("P2P: Would store WebSeed %s:%d for torrent %s in tracker",
webSeedPeer.IP, webSeedPeer.Port, infoHash[:8])
return nil
}
// reannounceToAll re-announces a torrent to all systems
func (p *P2PCoordinator) reannounceToAll(torrent *TorrentInfo) {
log.Printf("P2P: Re-announcing torrent %s to all systems", torrent.InfoHash[:8])
// Re-announce to DHT
if p.dht != nil {
p.dht.AnnounceNewTorrent(torrent.InfoHash, p.gateway.GetPort())
log.Printf("P2P: DHT re-announced for %s", torrent.InfoHash[:8])
}
// Update WebSeed peer in tracker
if p.tracker != nil {
webSeedPeer := p.createWebSeedPeerInfo(torrent.InfoHash)
if err := p.storeWebSeedInTracker(torrent.InfoHash, webSeedPeer); err != nil {
log.Printf("P2P: Tracker WebSeed update failed for %s: %v", torrent.InfoHash[:8], err)
}
}
}
// GetPeers implements unified peer discovery across all sources
func (p *P2PCoordinator) GetPeers(infoHash string) []PeerInfo {
p.cacheMutex.Lock()
defer p.cacheMutex.Unlock()
// Check cache first (5 minute TTL)
if cached, exists := p.peerCache[infoHash]; exists {
if len(cached) > 0 && time.Since(cached[0].LastSeen) < 5*time.Minute {
return p.selectBestPeers(cached)
}
}
var allPeers []PeerInfo
// 1. Always include WebSeed if available (highest priority)
if p.preferWebSeed {
webSeedPeer := p.gateway.WebSeedPeer()
webSeedPeer.Quality = 100 // Highest quality
webSeedPeer.Source = "webseed"
webSeedPeer.LastSeen = time.Now()
allPeers = append(allPeers, webSeedPeer)
}
// 2. Get tracker peers
if p.tracker != nil {
trackerPeers := p.getTrackerPeers(infoHash)
for _, peer := range trackerPeers {
peer.Source = "tracker"
peer.Quality = 80 // High quality
allPeers = append(allPeers, peer)
}
}
// 3. Get DHT peers
if p.dht != nil {
dhtPeers := p.getDHTPeers(infoHash)
for _, peer := range dhtPeers {
peer.Source = "dht"
peer.Quality = 60 // Medium quality
allPeers = append(allPeers, peer)
}
}
// Deduplicate and cache
dedupedPeers := p.deduplicate(allPeers)
p.peerCache[infoHash] = dedupedPeers
return p.selectBestPeers(dedupedPeers)
}
// rankPeers sorts peers by quality with comprehensive ranking system
func (p *P2PCoordinator) rankPeers(peers []PeerInfo) []PeerInfo {
// Apply quality bonuses before ranking
for i := range peers {
peers[i].Quality = p.calculateEnhancedQuality(&peers[i])
}
sort.Slice(peers, func(i, j int) bool {
// Primary sort: quality (higher is better)
if peers[i].Quality != peers[j].Quality {
return peers[i].Quality > peers[j].Quality
}
// Secondary sort: source priority
sourceWeight := map[string]int{
"webseed": 4, // Highest priority
"local": 3, // Local network peers
"tracker": 2, // Tracker peers
"dht": 1, // DHT peers
}
weightI := sourceWeight[peers[i].Source]
weightJ := sourceWeight[peers[j].Source]
if weightI != weightJ {
return weightI > weightJ
}
// Tertiary sort: last seen (more recent is better)
return peers[i].LastSeen.After(peers[j].LastSeen)
})
return peers
}
// calculateEnhancedQuality calculates comprehensive quality score with bonuses
func (p *P2PCoordinator) calculateEnhancedQuality(peer *PeerInfo) int {
baseQuality := peer.Quality
// WebSeed always gets maximum quality
if peer.Source == "webseed" {
return 100
}
// Local network detection and bonus (quality 90)
ip := net.ParseIP(peer.IP)
if ip != nil && (ip.IsPrivate() || ip.IsLoopback()) {
peer.Source = "local" // Mark as local
return 90
}
// Recently seen bonus (within last 10 minutes)
if time.Since(peer.LastSeen) <= 10*time.Minute {
baseQuality += 15
} else if time.Since(peer.LastSeen) <= 30*time.Minute {
baseQuality += 10
} else if time.Since(peer.LastSeen) <= 60*time.Minute {
baseQuality += 5
}
// Source-specific bonuses
switch peer.Source {
case "tracker":
// Tracker peers: quality 80 base
if baseQuality < 80 {
baseQuality = 80
}
case "dht":
// DHT peers: quality 60 base
if baseQuality < 60 {
baseQuality = 60
}
}
// Cap maximum quality at 99 (WebSeed reserves 100)
if baseQuality > 99 {
baseQuality = 99
}
return baseQuality
}
// selectBestPeers returns the best peers up to maxPeersReturn limit
func (p *P2PCoordinator) selectBestPeers(peers []PeerInfo) []PeerInfo {
ranked := p.rankPeers(peers)
if len(ranked) > p.maxPeersReturn {
return ranked[:p.maxPeersReturn]
}
return ranked
}
// deduplicate removes duplicate peers based on IP:Port
func (p *P2PCoordinator) deduplicate(peers []PeerInfo) []PeerInfo {
seen := make(map[string]bool)
var unique []PeerInfo
for _, peer := range peers {
key := fmt.Sprintf("%s:%d", peer.IP, peer.Port)
if !seen[key] {
seen[key] = true
unique = append(unique, peer)
}
}
return unique
}
// Helper methods to get peers from different sources
func (p *P2PCoordinator) getTrackerPeers(infoHash string) []PeerInfo {
if p.tracker == nil {
return nil
}
// Call tracker.GetPeersForTorrent to get actual peers
trackerPeers, err := p.tracker.GetPeersForTorrent(infoHash)
if err != nil {
log.Printf("P2P: Failed to get tracker peers for %s: %v", infoHash[:8], err)
return nil
}
// Convert tracker.PeerInfo to coordinator.PeerInfo
var peers []PeerInfo
for _, trackerPeer := range trackerPeers {
// Filter out expired peers (already filtered by tracker, but double-check)
if time.Since(trackerPeer.LastSeen) > 45*time.Minute {
continue
}
// Convert to coordinator PeerInfo format
peer := PeerInfo{
IP: trackerPeer.IP,
Port: trackerPeer.Port,
PeerID: trackerPeer.PeerID,
Source: "tracker",
LastSeen: trackerPeer.LastSeen,
}
// Calculate quality based on peer attributes
peer.Quality = p.calculateTrackerPeerQuality(trackerPeer)
peers = append(peers, peer)
}
log.Printf("P2P: Retrieved %d tracker peers for %s", len(peers), infoHash[:8])
return peers
}
// calculateTrackerPeerQuality calculates quality score for tracker peers
func (p *P2PCoordinator) calculateTrackerPeerQuality(trackerPeer *trackerPkg.PeerInfo) int {
quality := 80 // Base tracker quality
// WebSeeds get highest priority
if trackerPeer.IsWebSeed {
return 100
}
// Seeders get bonus
if trackerPeer.IsSeeder || trackerPeer.Left == 0 {
quality += 15
}
// Use tracker priority if available
if trackerPeer.Priority > 50 {
quality += (trackerPeer.Priority - 50) / 5 // Scale priority to quality boost
}
// Recent activity bonus
if time.Since(trackerPeer.LastSeen) < 10*time.Minute {
quality += 10
} else if time.Since(trackerPeer.LastSeen) < 30*time.Minute {
quality += 5
}
// Local network bonus (check for private IP ranges)
ip := net.ParseIP(trackerPeer.IP)
if ip != nil && (ip.IsPrivate() || ip.IsLoopback()) {
quality += 10 // Local network peers get bonus
}
return quality
}
func (p *P2PCoordinator) getDHTPeers(infoHash string) []PeerInfo {
if p.dht == nil {
return nil
}
// Check cache first (5 minute TTL for DHT peers)
cacheKey := fmt.Sprintf("dht_%s", infoHash)
p.cacheMutex.RLock()
if cached, exists := p.peerCache[cacheKey]; exists {
if len(cached) > 0 && time.Since(cached[0].LastSeen) < 5*time.Minute {
p.cacheMutex.RUnlock()
return cached
}
}
p.cacheMutex.RUnlock()
// Get DHT node for direct peer queries
dhtNode := p.dht.GetNode()
if dhtNode == nil {
log.Printf("P2P: DHT node not available for peer lookup")
return nil
}
// Convert hex infohash to bytes for DHT lookup
infoHashBytes, err := hexToBytes(infoHash)
if err != nil {
log.Printf("P2P: Invalid infohash format for DHT lookup: %v", err)
return nil
}
// Use FindPeersFromNetwork for active DHT peer discovery
dhtPeers, err := dhtNode.FindPeersFromNetwork(infoHashBytes)
if err != nil {
log.Printf("P2P: Failed to find DHT peers for %s: %v", infoHash[:8], err)
return nil
}
// Convert DHT peers to coordinator PeerInfo format
var peers []PeerInfo
for _, dhtPeer := range dhtPeers {
// Create coordinator peer from DHT peer
peer := PeerInfo{
IP: dhtPeer.IP.String(),
Port: dhtPeer.Port,
PeerID: fmt.Sprintf("dht_%s_%d", dhtPeer.IP.String(), dhtPeer.Port), // Generate peer ID
Source: "dht",
Quality: p.calculateDHTPeerQuality(dhtPeer),
LastSeen: dhtPeer.Added, // Use Added time as LastSeen
}
peers = append(peers, peer)
}
// Cache the results
p.cacheMutex.Lock()
p.peerCache[cacheKey] = peers
p.cacheMutex.Unlock()
log.Printf("P2P: Retrieved %d DHT peers for %s", len(peers), infoHash[:8])
return peers
}
// calculateDHTPeerQuality calculates quality score for DHT peers
func (p *P2PCoordinator) calculateDHTPeerQuality(dhtPeer interface{}) int {
quality := 60 // Base DHT quality
// DHT peers are generally less reliable than tracker peers
// We'll add more sophisticated logic as we understand the DHT peer structure better
return quality
}
// hexToBytes converts hex string to bytes
func hexToBytes(hexStr string) ([]byte, error) {
if len(hexStr) != 40 { // 20 bytes * 2 hex chars
return nil, fmt.Errorf("invalid infohash length: %d", len(hexStr))
}
result := make([]byte, 20)
for i := 0; i < 20; i++ {
n := 0
for j := 0; j < 2; j++ {
c := hexStr[i*2+j]
switch {
case c >= '0' && c <= '9':
n = n*16 + int(c-'0')
case c >= 'a' && c <= 'f':
n = n*16 + int(c-'a'+10)
case c >= 'A' && c <= 'F':
n = n*16 + int(c-'A'+10)
default:
return nil, fmt.Errorf("invalid hex character: %c", c)
}
}
result[i] = byte(n)
}
return result, nil
}
// AnnounceToExternalServices announces torrent to DHT and other external services
func (p *P2PCoordinator) AnnounceToExternalServices(infoHash string, port int) error {
var errs []string
// Announce to DHT
if p.dht != nil {
p.dht.AnnounceNewTorrent(infoHash, port)
log.Printf("P2P: Successfully announced %s to DHT", infoHash[:8])
}
// Could add other external services here (like PEX, other trackers, etc.)
if len(errs) > 0 {
return fmt.Errorf("external announce errors: %v", errs)
}
return nil
}
// GetStats returns comprehensive P2P statistics
func (p *P2PCoordinator) GetStats() map[string]interface{} {
stats := make(map[string]interface{})
// Tracker stats (get actual stats from tracker)
if p.tracker != nil {
trackerStats := p.tracker.GetStats()
stats["tracker"] = map[string]interface{}{
"status": "active",
"torrents": trackerStats["torrents"],
"peers": trackerStats["peers"],
"seeders": trackerStats["seeders"],
"leechers": trackerStats["leechers"],
"webseeds": trackerStats["webseeds"],
}
}
// DHT stats (get actual stats from DHT)
if p.dht != nil {
dhtStats := p.dht.GetDHTStats()
stats["dht"] = map[string]interface{}{
"status": "active",
"routing_table": dhtStats["routing_table_size"],
"active_torrents": dhtStats["active_torrents"],
"packets_sent": dhtStats["packets_sent"],
"packets_received": dhtStats["packets_received"],
"stored_items": dhtStats["stored_items"],
}
}
// WebSeed stats
stats["webseed"] = map[string]interface{}{
"status": "integrated",
"priority": 100,
}
// Coordination stats
p.cacheMutex.RLock()
cacheSize := len(p.peerCache)
p.cacheMutex.RUnlock()
p.torrentsMutex.RLock()
activeCount := len(p.activeTorrents)
p.torrentsMutex.RUnlock()
stats["coordination"] = map[string]interface{}{
"cached_peer_lists": cacheSize,
"active_torrents": activeCount,
"prefer_webseed": p.preferWebSeed,
"announce_to_all": p.announceToAll,
"peer_exchange": p.peerExchange,
"max_peers_return": p.maxPeersReturn,
"reannounce_interval": "15 minutes",
}
return stats
}
// SetAnnouncer sets the Nostr announcer
func (p *P2PCoordinator) SetAnnouncer(announcer *Announcer) {
p.announcer = announcer
}
// OnPeerConnect handles new peer connections for coordination
func (p *P2PCoordinator) OnPeerConnect(infoHash string, peer PeerInfo) {
// Update peer cache with new connection
p.cacheMutex.Lock()
defer p.cacheMutex.Unlock()
peers := p.peerCache[infoHash]
// Update existing peer or add new one
found := false
for i, existingPeer := range peers {
if existingPeer.IP == peer.IP && existingPeer.Port == peer.Port {
peers[i].LastSeen = time.Now()
peers[i].Quality += 10 // Boost quality for active peers
found = true
break
}
}
if !found {
peer.LastSeen = time.Now()
peers = append(peers, peer)
}
p.peerCache[infoHash] = peers
}
// ============ PERIODIC RE-ANNOUNCE FUNCTIONALITY ============
// periodicReannounce handles periodic re-announcement of all active torrents
func (p *P2PCoordinator) periodicReannounce() {
ticker := time.NewTicker(15 * time.Minute) // Re-announce every 15 minutes per BEP-3
defer ticker.Stop()
log.Printf("P2P: Starting periodic re-announce routine (15 minute interval)")
for {
select {
case <-ticker.C:
p.performReannouncements()
case <-p.stopReannounce:
log.Printf("P2P: Stopping periodic re-announce routine")
return
}
}
}
// performReannouncements re-announces all active torrents
func (p *P2PCoordinator) performReannouncements() {
p.torrentsMutex.RLock()
torrentCount := len(p.activeTorrents)
torrents := make([]*TorrentInfo, 0, torrentCount)
for _, torrent := range p.activeTorrents {
torrents = append(torrents, torrent)
}
p.torrentsMutex.RUnlock()
if torrentCount == 0 {
return
}
log.Printf("P2P: Performing periodic re-announce for %d torrents", torrentCount)
// Re-announce all torrents in parallel (with rate limiting)
semaphore := make(chan struct{}, 5) // Limit to 5 concurrent re-announces
var wg sync.WaitGroup
for _, torrent := range torrents {
wg.Add(1)
go func(t *TorrentInfo) {
defer wg.Done()
semaphore <- struct{}{} // Acquire
defer func() { <-semaphore }() // Release
p.reannounceToAll(t)
}(torrent)
}
wg.Wait()
log.Printf("P2P: Completed periodic re-announce for %d torrents", torrentCount)
}
// RemoveTorrent removes a torrent from active tracking
func (p *P2PCoordinator) RemoveTorrent(infoHash string) {
p.torrentsMutex.Lock()
delete(p.activeTorrents, infoHash)
p.torrentsMutex.Unlock()
// Clean up peer cache
p.cacheMutex.Lock()
delete(p.peerCache, infoHash)
delete(p.peerCache, fmt.Sprintf("dht_%s", infoHash))
p.cacheMutex.Unlock()
log.Printf("P2P: Removed torrent %s from coordination", infoHash[:8])
}
// Stop gracefully shuts down the coordinator
func (p *P2PCoordinator) Stop() {
log.Printf("P2P: Shutting down coordinator")
close(p.stopReannounce)
// Final announce "stopped" event for all torrents
p.torrentsMutex.RLock()
torrents := make([]*TorrentInfo, 0, len(p.activeTorrents))
for _, torrent := range p.activeTorrents {
torrents = append(torrents, torrent)
}
p.torrentsMutex.RUnlock()
log.Printf("P2P: Sending final stop announcements for %d torrents", len(torrents))
// Send stop events
for _, torrent := range torrents {
if p.dht != nil {
// DHT doesn't have a stop event, but we can stop announcing
log.Printf("P2P: Stopping DHT announcements for %s", torrent.InfoHash[:8])
}
}
}
// GetActiveTorrents returns list of currently active torrents
func (p *P2PCoordinator) GetActiveTorrents() map[string]*TorrentInfo {
p.torrentsMutex.RLock()
defer p.torrentsMutex.RUnlock()
// Return copy to prevent external modification
torrents := make(map[string]*TorrentInfo)
for k, v := range p.activeTorrents {
torrents[k] = v
}
return torrents
}