torrent-gateway/internal/p2p/coordinator.go
enki b3204ea07a
Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / Build Docker Images (push) Blocked by required conditions
CI Pipeline / E2E Tests (push) Blocked by required conditions
first commit
2025-08-18 00:40:15 -07:00

331 lines
8.4 KiB
Go

package p2p
import (
"fmt"
"log"
"net"
"sort"
"sync"
"time"
"git.sovbit.dev/enki/torrentGateway/internal/dht"
"git.sovbit.dev/enki/torrentGateway/internal/tracker"
)
// PeerInfo represents a peer from any source (tracker, DHT, WebSeed)
type PeerInfo struct {
IP string
Port int
PeerID string
Source string // "tracker", "dht", "webseed"
Quality int // Higher is better
LastSeen time.Time
}
// P2PCoordinator manages integration between tracker, DHT, and WebSeed
type P2PCoordinator struct {
tracker *tracker.Tracker
dht *dht.DHTBootstrap
gateway Gateway
announcer *Announcer
// Peer management
peerCache map[string][]PeerInfo // infoHash -> peers
cacheMutex sync.RWMutex
// Configuration
preferWebSeed bool
announceToAll bool
peerExchange bool
maxPeersReturn int
}
// Gateway interface for P2P coordinator
type Gateway interface {
CreateTorrent(fileHash string) (*TorrentInfo, error)
WebSeedPeer() PeerInfo
EnableWebSeed(infoHash string) error
PublishToNostr(torrent *TorrentInfo) error
GetPort() int
}
// TorrentInfo represents torrent metadata
type TorrentInfo struct {
InfoHash string
Name string
Size int64
PieceLength int
Pieces []string
WebSeedURL string
}
// Announcer handles Nostr announcements
type Announcer interface {
AnnounceNewTorrent(torrent *TorrentInfo) error
}
// NewCoordinator creates a new P2P coordinator
func NewCoordinator(gateway Gateway, tracker *tracker.Tracker, dht *dht.DHTBootstrap) *P2PCoordinator {
return &P2PCoordinator{
tracker: tracker,
dht: dht,
gateway: gateway,
peerCache: make(map[string][]PeerInfo),
preferWebSeed: true,
announceToAll: true,
peerExchange: true,
maxPeersReturn: 50,
}
}
// OnFileUploaded coordinates all P2P components when a file is uploaded
func (p *P2PCoordinator) OnFileUploaded(fileHash string, filename string) error {
log.Printf("P2P: Coordinating upload for file %s (%s)", fileHash[:8], filename)
// 1. Create torrent
torrent, err := p.gateway.CreateTorrent(fileHash)
if err != nil {
return fmt.Errorf("failed to create torrent: %v", err)
}
// 2. Register with tracker if available
if p.tracker != nil {
webSeedPeer := p.gateway.WebSeedPeer()
err = p.tracker.RegisterTorrent(torrent.InfoHash, []PeerInfo{webSeedPeer})
if err != nil {
log.Printf("P2P: Failed to register with tracker: %v", err)
} else {
log.Printf("P2P: Registered torrent %s with tracker", torrent.InfoHash[:8])
}
}
// 3. Announce to DHT if available
if p.dht != nil {
err = p.dht.AnnounceNewTorrent(torrent.InfoHash, p.gateway.GetPort())
if err != nil {
log.Printf("P2P: Failed to announce to DHT: %v", err)
} else {
log.Printf("P2P: Announced torrent %s to DHT", torrent.InfoHash[:8])
}
}
// 4. Enable WebSeed serving
err = p.gateway.EnableWebSeed(torrent.InfoHash)
if err != nil {
log.Printf("P2P: Failed to enable WebSeed: %v", err)
} else {
log.Printf("P2P: Enabled WebSeed for torrent %s", torrent.InfoHash[:8])
}
// 5. Publish to Nostr if announcer is available
if p.announcer != nil {
err = p.announcer.AnnounceNewTorrent(torrent)
if err != nil {
log.Printf("P2P: Failed to announce to Nostr: %v", err)
} else {
log.Printf("P2P: Published torrent %s to Nostr", torrent.InfoHash[:8])
}
}
return nil
}
// GetPeers implements unified peer discovery across all sources
func (p *P2PCoordinator) GetPeers(infoHash string) []PeerInfo {
p.cacheMutex.Lock()
defer p.cacheMutex.Unlock()
// Check cache first (5 minute TTL)
if cached, exists := p.peerCache[infoHash]; exists {
if len(cached) > 0 && time.Since(cached[0].LastSeen) < 5*time.Minute {
return p.selectBestPeers(cached)
}
}
var allPeers []PeerInfo
// 1. Always include WebSeed if available (highest priority)
if p.preferWebSeed {
webSeedPeer := p.gateway.WebSeedPeer()
webSeedPeer.Quality = 100 // Highest quality
webSeedPeer.Source = "webseed"
webSeedPeer.LastSeen = time.Now()
allPeers = append(allPeers, webSeedPeer)
}
// 2. Get tracker peers
if p.tracker != nil {
trackerPeers := p.getTrackerPeers(infoHash)
for _, peer := range trackerPeers {
peer.Source = "tracker"
peer.Quality = 80 // High quality
allPeers = append(allPeers, peer)
}
}
// 3. Get DHT peers
if p.dht != nil {
dhtPeers := p.getDHTPeers(infoHash)
for _, peer := range dhtPeers {
peer.Source = "dht"
peer.Quality = 60 // Medium quality
allPeers = append(allPeers, peer)
}
}
// Deduplicate and cache
dedupedPeers := p.deduplicate(allPeers)
p.peerCache[infoHash] = dedupedPeers
return p.selectBestPeers(dedupedPeers)
}
// rankPeers sorts peers by quality and connection reliability
func (p *P2PCoordinator) rankPeers(peers []PeerInfo) []PeerInfo {
sort.Slice(peers, func(i, j int) bool {
// Sort by quality first, then by last seen
if peers[i].Quality != peers[j].Quality {
return peers[i].Quality > peers[j].Quality
}
return peers[i].LastSeen.After(peers[j].LastSeen)
})
return peers
}
// selectBestPeers returns the best peers up to maxPeersReturn limit
func (p *P2PCoordinator) selectBestPeers(peers []PeerInfo) []PeerInfo {
ranked := p.rankPeers(peers)
if len(ranked) > p.maxPeersReturn {
return ranked[:p.maxPeersReturn]
}
return ranked
}
// deduplicate removes duplicate peers based on IP:Port
func (p *P2PCoordinator) deduplicate(peers []PeerInfo) []PeerInfo {
seen := make(map[string]bool)
var unique []PeerInfo
for _, peer := range peers {
key := fmt.Sprintf("%s:%d", peer.IP, peer.Port)
if !seen[key] {
seen[key] = true
unique = append(unique, peer)
}
}
return unique
}
// Helper methods to get peers from different sources
func (p *P2PCoordinator) getTrackerPeers(infoHash string) []PeerInfo {
if p.tracker == nil {
return nil
}
// This would integrate with the tracker's peer storage
// For now, return empty slice - tracker integration needed
return []PeerInfo{}
}
func (p *P2PCoordinator) getDHTPeers(infoHash string) []PeerInfo {
if p.dht == nil {
return nil
}
// This would integrate with DHT peer discovery
// For now, return empty slice - DHT integration needed
return []PeerInfo{}
}
// AnnounceToExternalServices announces torrent to DHT and other external services
func (p *P2PCoordinator) AnnounceToExternalServices(infoHash string, port int) error {
var errs []string
// Announce to DHT
if p.dht != nil {
if err := p.dht.AnnounceNewTorrent(infoHash, port); err != nil {
errs = append(errs, fmt.Sprintf("DHT: %v", err))
} else {
log.Printf("P2P: Successfully announced %s to DHT", infoHash[:8])
}
}
// Could add other external services here (like PEX, other trackers, etc.)
if len(errs) > 0 {
return fmt.Errorf("external announce errors: %v", errs)
}
return nil
}
// GetStats returns comprehensive P2P statistics
func (p *P2PCoordinator) GetStats() map[string]interface{} {
stats := make(map[string]interface{})
// Tracker stats (would need tracker interface methods)
if p.tracker != nil {
stats["tracker"] = map[string]interface{}{
"status": "active",
}
}
// DHT stats (would need DHT interface methods)
if p.dht != nil {
stats["dht"] = map[string]interface{}{
"status": "active",
}
}
// WebSeed stats (from existing implementation)
stats["webseed"] = map[string]interface{}{
"status": "integrated",
}
// Coordination stats
p.cacheMutex.RLock()
cacheSize := len(p.peerCache)
p.cacheMutex.RUnlock()
stats["coordination"] = map[string]interface{}{
"cached_peer_lists": cacheSize,
"prefer_webseed": p.preferWebSeed,
"announce_to_all": p.announceToAll,
"peer_exchange": p.peerExchange,
}
return stats
}
// SetAnnouncer sets the Nostr announcer
func (p *P2PCoordinator) SetAnnouncer(announcer *Announcer) {
p.announcer = announcer
}
// OnPeerConnect handles new peer connections for coordination
func (p *P2PCoordinator) OnPeerConnect(infoHash string, peer PeerInfo) {
// Update peer cache with new connection
p.cacheMutex.Lock()
defer p.cacheMutex.Unlock()
peers := p.peerCache[infoHash]
// Update existing peer or add new one
found := false
for i, existingPeer := range peers {
if existingPeer.IP == peer.IP && existingPeer.Port == peer.Port {
peers[i].LastSeen = time.Now()
peers[i].Quality += 10 // Boost quality for active peers
found = true
break
}
}
if !found {
peer.LastSeen = time.Now()
peers = append(peers, peer)
}
p.peerCache[infoHash] = peers
}