package tracker import ( "fmt" "log" "net/http" "sync" "time" "github.com/gorilla/websocket" ) type WebSocketTracker struct { upgrader websocket.Upgrader swarms map[string]*Swarm mu sync.RWMutex tracker *Tracker // Reference to main tracker for HTTP fallback statsTracker *StatsTracker } // StatsTracker collects WebRTC statistics type StatsTracker struct { mu sync.RWMutex totalConnections int64 activeConnections int64 totalBytesUploaded int64 totalBytesDownloaded int64 connectionFailures int64 iceFailures int64 lastReported time.Time } type Swarm struct { peers map[string]*WebRTCPeer mu sync.RWMutex } // PeerConnectionState represents the connection state of a peer type PeerConnectionState string const ( StateConnecting PeerConnectionState = "connecting" StateConnected PeerConnectionState = "connected" StateDisconnected PeerConnectionState = "disconnected" StateFailed PeerConnectionState = "failed" ) // PeerStats tracks statistics for a peer type PeerStats struct { BytesUploaded int64 `json:"bytes_uploaded"` BytesDownloaded int64 `json:"bytes_downloaded"` ConnectionTime time.Time `json:"connection_time"` LastActivity time.Time `json:"last_activity"` RTT int `json:"rtt_ms"` ConnectionQuality string `json:"connection_quality"` } type WebRTCPeer struct { ID string `json:"peer_id"` Conn *websocket.Conn `json:"-"` LastSeen time.Time `json:"last_seen"` InfoHashes []string `json:"info_hashes"` State PeerConnectionState `json:"state"` IsSeeder bool `json:"is_seeder"` Stats *PeerStats `json:"stats"` UserAgent string `json:"user_agent"` WebRTCPeers map[string]time.Time `json:"-"` // Track connections to other peers SupportsHTTP bool `json:"supports_http"` Endpoint string `json:"endpoint,omitempty"` } type WebTorrentMessage struct { Action string `json:"action"` InfoHash string `json:"info_hash,omitempty"` PeerID string `json:"peer_id,omitempty"` Answer map[string]interface{} `json:"answer,omitempty"` Offer map[string]interface{} `json:"offer,omitempty"` ToPeerID string `json:"to_peer_id,omitempty"` FromPeerID string `json:"from_peer_id,omitempty"` NumWant int `json:"numwant,omitempty"` // ICE candidate exchange Candidate map[string]interface{} `json:"candidate,omitempty"` // Connection state tracking ConnectionState string `json:"connection_state,omitempty"` // Statistics Uploaded int64 `json:"uploaded,omitempty"` Downloaded int64 `json:"downloaded,omitempty"` Left int64 `json:"left,omitempty"` Event string `json:"event,omitempty"` // Client information UserAgent string `json:"user_agent,omitempty"` SupportsHTTP bool `json:"supports_http,omitempty"` Port int `json:"port,omitempty"` // HTTP fallback RequestHTTP bool `json:"request_http,omitempty"` } func NewWebSocketTracker(tracker *Tracker) *WebSocketTracker { return &WebSocketTracker{ upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true // Allow all origins for WebTorrent compatibility }, }, swarms: make(map[string]*Swarm), tracker: tracker, statsTracker: &StatsTracker{ lastReported: time.Now(), }, } } func (wt *WebSocketTracker) HandleWS(w http.ResponseWriter, r *http.Request) { conn, err := wt.upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("WebSocket upgrade failed: %v", err) return } defer conn.Close() log.Printf("WebTorrent client connected from %s", r.RemoteAddr) // Handle WebTorrent protocol for { var msg WebTorrentMessage if err := conn.ReadJSON(&msg); err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket error: %v", err) } break } switch msg.Action { case "announce": wt.handleAnnounce(conn, msg) case "scrape": wt.handleScrape(conn, msg) case "offer": wt.handleOffer(conn, msg) case "answer": wt.handleAnswer(conn, msg) case "ice": wt.handleICE(conn, msg) case "connection_state": wt.handleConnectionState(conn, msg) case "stats": wt.handleStats(conn, msg) } } } func (wt *WebSocketTracker) handleAnnounce(conn *websocket.Conn, msg WebTorrentMessage) { wt.mu.Lock() defer wt.mu.Unlock() // Get or create swarm if wt.swarms[msg.InfoHash] == nil { wt.swarms[msg.InfoHash] = &Swarm{ peers: make(map[string]*WebRTCPeer), } } swarm := wt.swarms[msg.InfoHash] swarm.mu.Lock() defer swarm.mu.Unlock() now := time.Now() // Determine if peer is seeder isSeeder := msg.Left == 0 || msg.Event == "completed" // Create or update peer with enhanced state tracking existingPeer := swarm.peers[msg.PeerID] var stats *PeerStats if existingPeer != nil { stats = existingPeer.Stats // Update existing stats if msg.Uploaded > 0 { stats.BytesUploaded = msg.Uploaded } if msg.Downloaded > 0 { stats.BytesDownloaded = msg.Downloaded } stats.LastActivity = now } else { // New peer stats = &PeerStats{ BytesUploaded: msg.Uploaded, BytesDownloaded: msg.Downloaded, ConnectionTime: now, LastActivity: now, ConnectionQuality: "unknown", } wt.statsTracker.mu.Lock() wt.statsTracker.totalConnections++ wt.statsTracker.activeConnections++ wt.statsTracker.mu.Unlock() } peer := &WebRTCPeer{ ID: msg.PeerID, Conn: conn, LastSeen: now, InfoHashes: []string{msg.InfoHash}, State: StateConnecting, IsSeeder: isSeeder, Stats: stats, UserAgent: msg.UserAgent, WebRTCPeers: make(map[string]time.Time), SupportsHTTP: msg.SupportsHTTP, Endpoint: fmt.Sprintf("%s:%d", conn.RemoteAddr().String(), msg.Port), } swarm.peers[msg.PeerID] = peer // Handle different events switch msg.Event { case "stopped": wt.removePeer(swarm, msg.PeerID) return case "completed": log.Printf("Peer %s completed torrent %s", msg.PeerID, msg.InfoHash[:8]) } // Count active seeders and leechers var seeders, leechers int var activePeers []map[string]interface{} numWant := msg.NumWant if numWant == 0 { numWant = 30 // Default } count := 0 for peerID, p := range swarm.peers { if p.State != StateDisconnected && p.State != StateFailed { if p.IsSeeder { seeders++ } else { leechers++ } if peerID != msg.PeerID && count < numWant { peerData := map[string]interface{}{ "id": peerID, } // Include HTTP endpoint if available and WebRTC not working if p.SupportsHTTP && msg.RequestHTTP { peerData["endpoint"] = p.Endpoint peerData["protocol"] = "http" } activePeers = append(activePeers, peerData) count++ } } } response := map[string]interface{}{ "action": "announce", "interval": 300, // 5 minutes for WebTorrent "info_hash": msg.InfoHash, "complete": seeders, "incomplete": leechers, "peers": activePeers, } // Add HTTP fallback information if requested or if WebRTC is failing if msg.RequestHTTP || wt.shouldProvideHTTPFallback(msg.InfoHash) { wt.addHTTPFallback(response, msg.InfoHash) } if err := conn.WriteJSON(response); err != nil { log.Printf("Failed to send announce response: %v", err) // Mark peer as disconnected if we can't send to them peer.State = StateDisconnected } } func (wt *WebSocketTracker) handleScrape(conn *websocket.Conn, msg WebTorrentMessage) { wt.mu.RLock() defer wt.mu.RUnlock() files := make(map[string]map[string]int) if swarm := wt.swarms[msg.InfoHash]; swarm != nil { swarm.mu.RLock() files[msg.InfoHash] = map[string]int{ "complete": len(swarm.peers), // Simplified "incomplete": 0, "downloaded": len(swarm.peers), } swarm.mu.RUnlock() } response := map[string]interface{}{ "action": "scrape", "files": files, } if err := conn.WriteJSON(response); err != nil { log.Printf("Failed to send scrape response: %v", err) } } func (wt *WebSocketTracker) handleOffer(conn *websocket.Conn, msg WebTorrentMessage) { wt.mu.RLock() defer wt.mu.RUnlock() if swarm := wt.swarms[msg.InfoHash]; swarm != nil { swarm.mu.RLock() if targetPeer := swarm.peers[msg.ToPeerID]; targetPeer != nil { // Forward offer to target peer offerMsg := map[string]interface{}{ "action": "offer", "info_hash": msg.InfoHash, "peer_id": msg.FromPeerID, "offer": msg.Offer, "from_peer_id": msg.FromPeerID, "to_peer_id": msg.ToPeerID, } if err := targetPeer.Conn.WriteJSON(offerMsg); err != nil { log.Printf("Failed to forward offer: %v", err) targetPeer.State = StateDisconnected } else { // Track connection attempt if fromPeer := swarm.peers[msg.FromPeerID]; fromPeer != nil { fromPeer.WebRTCPeers[msg.ToPeerID] = time.Now() log.Printf("Forwarded offer from %s to %s for %s", msg.FromPeerID, msg.ToPeerID, msg.InfoHash[:8]) } } } else { log.Printf("Target peer %s not found for offer", msg.ToPeerID) } swarm.mu.RUnlock() } } func (wt *WebSocketTracker) handleAnswer(conn *websocket.Conn, msg WebTorrentMessage) { wt.mu.RLock() defer wt.mu.RUnlock() if swarm := wt.swarms[msg.InfoHash]; swarm != nil { swarm.mu.RLock() if targetPeer := swarm.peers[msg.ToPeerID]; targetPeer != nil { // Forward answer to target peer answerMsg := map[string]interface{}{ "action": "answer", "info_hash": msg.InfoHash, "peer_id": msg.FromPeerID, "answer": msg.Answer, "from_peer_id": msg.FromPeerID, "to_peer_id": msg.ToPeerID, } if err := targetPeer.Conn.WriteJSON(answerMsg); err != nil { log.Printf("Failed to forward answer: %v", err) targetPeer.State = StateDisconnected } else { // Track connection completion if fromPeer := swarm.peers[msg.FromPeerID]; fromPeer != nil { fromPeer.WebRTCPeers[msg.ToPeerID] = time.Now() log.Printf("Forwarded answer from %s to %s for %s", msg.FromPeerID, msg.ToPeerID, msg.InfoHash[:8]) } } } else { log.Printf("Target peer %s not found for answer", msg.ToPeerID) } swarm.mu.RUnlock() } } // Cleanup expired peers func (wt *WebSocketTracker) StartCleanup() { ticker := time.NewTicker(5 * time.Minute) go func() { defer ticker.Stop() for range ticker.C { wt.cleanupExpiredPeers() } }() } func (wt *WebSocketTracker) cleanupExpiredPeers() { wt.mu.Lock() defer wt.mu.Unlock() now := time.Now() expiry := now.Add(-10 * time.Minute) // 10 minute timeout removedPeers := 0 for infoHash, swarm := range wt.swarms { swarm.mu.Lock() for peerID, peer := range swarm.peers { if peer.LastSeen.Before(expiry) || peer.State == StateDisconnected || peer.State == StateFailed { if peer.Conn != nil { peer.Conn.Close() } delete(swarm.peers, peerID) removedPeers++ // Update stats wt.statsTracker.mu.Lock() if wt.statsTracker.activeConnections > 0 { wt.statsTracker.activeConnections-- } wt.statsTracker.mu.Unlock() } else { // Update connection quality for active peers wt.updateConnectionQuality(peer) } } // Remove empty swarms if len(swarm.peers) == 0 { delete(wt.swarms, infoHash) } swarm.mu.Unlock() } if removedPeers > 0 { log.Printf("Cleaned up %d expired/disconnected peers", removedPeers) } } // updateConnectionQuality calculates connection quality based on various metrics func (wt *WebSocketTracker) updateConnectionQuality(peer *WebRTCPeer) { if peer.Stats == nil { return } now := time.Now() connectionAge := now.Sub(peer.Stats.ConnectionTime) timeSinceActivity := now.Sub(peer.Stats.LastActivity) quality := "good" // Determine quality based on multiple factors if timeSinceActivity > 5*time.Minute { quality = "poor" } else if timeSinceActivity > 2*time.Minute { quality = "fair" } else if connectionAge > 30*time.Minute && peer.Stats.BytesUploaded > 0 { quality = "excellent" } else if len(peer.WebRTCPeers) > 0 { quality = "good" } peer.Stats.ConnectionQuality = quality } // handleICE forwards ICE candidates between peers func (wt *WebSocketTracker) handleICE(conn *websocket.Conn, msg WebTorrentMessage) { wt.mu.RLock() defer wt.mu.RUnlock() if swarm := wt.swarms[msg.InfoHash]; swarm != nil { swarm.mu.RLock() defer swarm.mu.RUnlock() if targetPeer := swarm.peers[msg.ToPeerID]; targetPeer != nil { // Forward ICE candidate to target peer iceMsg := map[string]interface{}{ "action": "ice", "info_hash": msg.InfoHash, "peer_id": msg.FromPeerID, "candidate": msg.Candidate, "from_peer_id": msg.FromPeerID, "to_peer_id": msg.ToPeerID, } if err := targetPeer.Conn.WriteJSON(iceMsg); err != nil { log.Printf("Failed to forward ICE candidate: %v", err) targetPeer.State = StateDisconnected // Track ICE failure wt.statsTracker.mu.Lock() wt.statsTracker.iceFailures++ wt.statsTracker.mu.Unlock() } else { log.Printf("Forwarded ICE candidate from %s to %s for %s", msg.FromPeerID, msg.ToPeerID, msg.InfoHash[:8]) } } else { log.Printf("Target peer %s not found for ICE candidate", msg.ToPeerID) } } } // handleConnectionState updates peer connection states func (wt *WebSocketTracker) handleConnectionState(conn *websocket.Conn, msg WebTorrentMessage) { wt.mu.Lock() defer wt.mu.Unlock() if swarm := wt.swarms[msg.InfoHash]; swarm != nil { swarm.mu.Lock() defer swarm.mu.Unlock() if peer := swarm.peers[msg.PeerID]; peer != nil { oldState := peer.State newState := PeerConnectionState(msg.ConnectionState) peer.State = newState peer.LastSeen = time.Now() log.Printf("Peer %s connection state changed from %s to %s for %s", msg.PeerID, oldState, newState, msg.InfoHash[:8]) // Update stats based on state change wt.statsTracker.mu.Lock() if oldState != StateConnected && newState == StateConnected { wt.statsTracker.activeConnections++ } else if oldState == StateConnected && newState != StateConnected { wt.statsTracker.activeConnections-- if newState == StateFailed { wt.statsTracker.connectionFailures++ } } wt.statsTracker.mu.Unlock() // If peer disconnected, remove from WebRTC connections if newState == StateDisconnected || newState == StateFailed { wt.removePeerConnections(swarm, msg.PeerID) } } } } // handleStats processes peer statistics updates func (wt *WebSocketTracker) handleStats(conn *websocket.Conn, msg WebTorrentMessage) { wt.mu.Lock() defer wt.mu.Unlock() if swarm := wt.swarms[msg.InfoHash]; swarm != nil { swarm.mu.Lock() defer swarm.mu.Unlock() if peer := swarm.peers[msg.PeerID]; peer != nil && peer.Stats != nil { oldUploaded := peer.Stats.BytesUploaded oldDownloaded := peer.Stats.BytesDownloaded // Update peer stats peer.Stats.BytesUploaded = msg.Uploaded peer.Stats.BytesDownloaded = msg.Downloaded peer.Stats.LastActivity = time.Now() // Update global stats wt.statsTracker.mu.Lock() wt.statsTracker.totalBytesUploaded += (msg.Uploaded - oldUploaded) wt.statsTracker.totalBytesDownloaded += (msg.Downloaded - oldDownloaded) wt.statsTracker.mu.Unlock() // Report to main tracker periodically if wt.tracker != nil && time.Since(wt.statsTracker.lastReported) > 5*time.Minute { go wt.reportStatsToTracker() } log.Printf("Updated stats for peer %s: %d uploaded, %d downloaded", msg.PeerID, msg.Uploaded, msg.Downloaded) } } } // Helper methods for peer management func (wt *WebSocketTracker) removePeer(swarm *Swarm, peerID string) { if peer := swarm.peers[peerID]; peer != nil { peer.Conn.Close() delete(swarm.peers, peerID) wt.statsTracker.mu.Lock() wt.statsTracker.activeConnections-- wt.statsTracker.mu.Unlock() log.Printf("Removed peer %s", peerID) } } func (wt *WebSocketTracker) removePeerConnections(swarm *Swarm, peerID string) { // Remove peer from other peers' connection maps for _, peer := range swarm.peers { delete(peer.WebRTCPeers, peerID) } } // HTTP fallback functionality func (wt *WebSocketTracker) shouldProvideHTTPFallback(infoHash string) bool { if wt.tracker == nil { return false } // Check if WebRTC connections are failing for this torrent wt.statsTracker.mu.RLock() failureRate := float64(wt.statsTracker.connectionFailures) / float64(wt.statsTracker.totalConnections) wt.statsTracker.mu.RUnlock() return failureRate > 0.3 // If more than 30% of connections are failing } func (wt *WebSocketTracker) addHTTPFallback(response map[string]interface{}, infoHash string) { if wt.tracker == nil { return } // Get HTTP peers from main tracker httpPeers, err := wt.tracker.GetPeersForTorrent(infoHash) if err != nil { log.Printf("Failed to get HTTP peers: %v", err) return } // Add HTTP peer endpoints var fallbackPeers []map[string]interface{} for _, peer := range httpPeers { if peer.IsWebSeed { // Add WebSeed URLs - get URL from the tracker gateway if available webSeedURL := fmt.Sprintf("http://localhost/webseed/%s", infoHash) // Fallback URL fallbackPeers = append(fallbackPeers, map[string]interface{}{ "id": peer.PeerID, "url": webSeedURL, "protocol": "webseed", }) } else { // Add HTTP tracker peers fallbackPeers = append(fallbackPeers, map[string]interface{}{ "id": peer.PeerID, "endpoint": fmt.Sprintf("%s:%d", peer.IP, peer.Port), "protocol": "http", }) } } if len(fallbackPeers) > 0 { response["http_fallback"] = fallbackPeers response["supports_hybrid"] = true log.Printf("Added %d HTTP fallback peers for %s", len(fallbackPeers), infoHash[:8]) } } // Stats reporting to main tracker func (wt *WebSocketTracker) reportStatsToTracker() { wt.statsTracker.mu.Lock() stats := map[string]interface{}{ "webrtc_connections": wt.statsTracker.activeConnections, "total_connections": wt.statsTracker.totalConnections, "bytes_uploaded": wt.statsTracker.totalBytesUploaded, "bytes_downloaded": wt.statsTracker.totalBytesDownloaded, "connection_failures": wt.statsTracker.connectionFailures, "ice_failures": wt.statsTracker.iceFailures, "timestamp": time.Now(), } wt.statsTracker.lastReported = time.Now() wt.statsTracker.mu.Unlock() // Report to main tracker (placeholder - would integrate with tracker's stats system) log.Printf("WebRTC Stats: %+v", stats) } // GetStats returns comprehensive WebSocket tracker statistics func (wt *WebSocketTracker) GetStats() map[string]interface{} { wt.mu.RLock() defer wt.mu.RUnlock() totalPeers := 0 connectedPeers := 0 seeders := 0 leechers := 0 totalSwarms := len(wt.swarms) for _, swarm := range wt.swarms { swarm.mu.RLock() for _, peer := range swarm.peers { totalPeers++ if peer.State == StateConnected { connectedPeers++ if peer.IsSeeder { seeders++ } else { leechers++ } } } swarm.mu.RUnlock() } wt.statsTracker.mu.RLock() statsData := map[string]interface{}{ "total_swarms": totalSwarms, "total_peers": totalPeers, "connected_peers": connectedPeers, "seeders": seeders, "leechers": leechers, "total_connections": wt.statsTracker.totalConnections, "active_connections": wt.statsTracker.activeConnections, "connection_failures": wt.statsTracker.connectionFailures, "ice_failures": wt.statsTracker.iceFailures, "total_bytes_uploaded": wt.statsTracker.totalBytesUploaded, "total_bytes_downloaded": wt.statsTracker.totalBytesDownloaded, "last_stats_report": wt.statsTracker.lastReported, "status": "active", } wt.statsTracker.mu.RUnlock() return statsData }