package p2p import ( "fmt" "log" "net" "net/http" "sync" "time" ) // HealthStatus represents the health status of a P2P component type HealthStatus struct { IsHealthy bool `json:"is_healthy"` Score int `json:"score"` // 0-100 health score Issues []string `json:"issues"` // List of detected issues LastChecked time.Time `json:"last_checked"` ResponseTime int64 `json:"response_time"` // milliseconds Details map[string]interface{} `json:"details"` } // P2PHealthMonitor monitors the health of all P2P components type P2PHealthMonitor struct { coordinator *P2PCoordinator // Health check intervals checkInterval time.Duration alertThreshold int // Health score below this triggers alerts // Current status trackerHealth *HealthStatus dhtHealth *HealthStatus webseedHealth *HealthStatus overallHealth *HealthStatus mutex sync.RWMutex lastFullCheck time.Time // Alert callbacks alertCallbacks []func(component string, status *HealthStatus) // Background monitoring stopChannel chan bool running bool } // NewP2PHealthMonitor creates a new P2P health monitor func NewP2PHealthMonitor(coordinator *P2PCoordinator) *P2PHealthMonitor { return &P2PHealthMonitor{ coordinator: coordinator, checkInterval: 30 * time.Second, alertThreshold: 70, // Alert if health score < 70 trackerHealth: &HealthStatus{IsHealthy: true, Score: 100}, dhtHealth: &HealthStatus{IsHealthy: true, Score: 100}, webseedHealth: &HealthStatus{IsHealthy: true, Score: 100}, overallHealth: &HealthStatus{IsHealthy: true, Score: 100}, stopChannel: make(chan bool), } } // Start begins background health monitoring func (hm *P2PHealthMonitor) Start() { if hm.running { return } hm.running = true go hm.monitoringLoop() log.Printf("P2P Health Monitor started with %v check interval", hm.checkInterval) } // Stop stops background health monitoring func (hm *P2PHealthMonitor) Stop() { if !hm.running { return } hm.running = false hm.stopChannel <- true log.Printf("P2P Health Monitor stopped") } // monitoringLoop runs periodic health checks func (hm *P2PHealthMonitor) monitoringLoop() { ticker := time.NewTicker(hm.checkInterval) defer ticker.Stop() for { select { case <-ticker.C: hm.performHealthChecks() case <-hm.stopChannel: return } } } // performHealthChecks runs health checks on all components func (hm *P2PHealthMonitor) performHealthChecks() { hm.mutex.Lock() defer hm.mutex.Unlock() // Check each component in parallel var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() hm.trackerHealth = hm.CheckTrackerHealth() }() wg.Add(1) go func() { defer wg.Done() hm.dhtHealth = hm.CheckDHTHealth() }() wg.Add(1) go func() { defer wg.Done() hm.webseedHealth = hm.CheckWebSeedHealth() }() wg.Wait() // Calculate overall health hm.calculateOverallHealth() hm.lastFullCheck = time.Now() // Check for alerts hm.checkAndAlert() } // CheckTrackerHealth checks the health of the BitTorrent tracker func (hm *P2PHealthMonitor) CheckTrackerHealth() *HealthStatus { startTime := time.Now() status := &HealthStatus{ IsHealthy: true, Score: 100, Issues: []string{}, LastChecked: time.Now(), Details: make(map[string]interface{}), } // Check if tracker is accessible if hm.coordinator.tracker != nil { // Test tracker announce endpoint if !hm.testTrackerEndpoint() { status.IsHealthy = false status.Score -= 50 status.Issues = append(status.Issues, "Tracker announce endpoint not responding") } // Check for high error rates (would need tracker metrics) // This is a placeholder - real implementation would check actual metrics status.Details["active_torrents"] = "N/A" status.Details["peer_count"] = "N/A" status.Details["announce_rate"] = "N/A" } else { status.IsHealthy = false status.Score = 0 status.Issues = append(status.Issues, "Tracker not initialized") } status.ResponseTime = time.Since(startTime).Milliseconds() return status } // CheckDHTHealth checks the health of the DHT node func (hm *P2PHealthMonitor) CheckDHTHealth() *HealthStatus { startTime := time.Now() status := &HealthStatus{ IsHealthy: true, Score: 100, Issues: []string{}, LastChecked: time.Now(), Details: make(map[string]interface{}), } if hm.coordinator.dht != nil { // Check DHT node connectivity if !hm.testDHTConnectivity() { status.IsHealthy = false status.Score -= 30 status.Issues = append(status.Issues, "DHT node connectivity issues") } // Check routing table size (healthy DHT should have many nodes) // This would use real DHT metrics in production status.Details["routing_table_size"] = "N/A" status.Details["active_searches"] = "N/A" status.Details["bootstrap_status"] = "active" } else { status.IsHealthy = false status.Score = 0 status.Issues = append(status.Issues, "DHT not initialized") } status.ResponseTime = time.Since(startTime).Milliseconds() return status } // CheckWebSeedHealth checks the health of WebSeed functionality func (hm *P2PHealthMonitor) CheckWebSeedHealth() *HealthStatus { startTime := time.Now() status := &HealthStatus{ IsHealthy: true, Score: 100, Issues: []string{}, LastChecked: time.Now(), Details: make(map[string]interface{}), } // Test WebSeed endpoint accessibility if !hm.testWebSeedEndpoint() { status.IsHealthy = false status.Score -= 40 status.Issues = append(status.Issues, "WebSeed endpoint not accessible") } // Check cache performance cacheStats := hm.getWebSeedCacheStats() if cacheStats["hit_rate"].(float64) < 0.5 { status.Score -= 20 status.Issues = append(status.Issues, "Low cache hit rate") } // Check for storage issues if !hm.testWebSeedStorage() { status.IsHealthy = false status.Score -= 30 status.Issues = append(status.Issues, "WebSeed storage backend issues") } status.Details = cacheStats status.ResponseTime = time.Since(startTime).Milliseconds() return status } // Test helper methods func (hm *P2PHealthMonitor) testTrackerEndpoint() bool { // In production, this would make a test announce request // For now, just check if we have a tracker instance return hm.coordinator.tracker != nil } func (hm *P2PHealthMonitor) testDHTConnectivity() bool { // In production, this would test DHT node reachability // For now, just check if we have a DHT instance return hm.coordinator.dht != nil } func (hm *P2PHealthMonitor) testWebSeedEndpoint() bool { // Test WebSeed health endpoint client := &http.Client{Timeout: 5 * time.Second} _, err := client.Get("http://localhost:9877/api/webseed/health") return err == nil } func (hm *P2PHealthMonitor) testWebSeedStorage() bool { // In production, this would test storage backend connectivity // For now, always return true return true } func (hm *P2PHealthMonitor) getWebSeedCacheStats() map[string]interface{} { // In production, this would get real cache statistics return map[string]interface{}{ "hit_rate": 0.85, "cache_size": "45MB", "active_conns": 12, } } // calculateOverallHealth computes overall P2P system health func (hm *P2PHealthMonitor) calculateOverallHealth() { // Weighted average of component health scores // WebSeed is most critical (40%), then Tracker (35%), then DHT (25%) overallScore := int( float64(hm.webseedHealth.Score)*0.4 + float64(hm.trackerHealth.Score)*0.35 + float64(hm.dhtHealth.Score)*0.25, ) var allIssues []string allIssues = append(allIssues, hm.trackerHealth.Issues...) allIssues = append(allIssues, hm.dhtHealth.Issues...) allIssues = append(allIssues, hm.webseedHealth.Issues...) hm.overallHealth = &HealthStatus{ IsHealthy: overallScore >= hm.alertThreshold, Score: overallScore, Issues: allIssues, LastChecked: time.Now(), ResponseTime: 0, // Overall doesn't have response time Details: map[string]interface{}{ "tracker_score": hm.trackerHealth.Score, "dht_score": hm.dhtHealth.Score, "webseed_score": hm.webseedHealth.Score, "component_weights": map[string]float64{ "webseed": 0.4, "tracker": 0.35, "dht": 0.25, }, }, } } // checkAndAlert checks for issues and triggers alerts if needed func (hm *P2PHealthMonitor) checkAndAlert() { // Check overall health for alerts if hm.overallHealth.Score < hm.alertThreshold { hm.triggerAlert("overall", hm.overallHealth) } // Check individual components if hm.trackerHealth.Score < hm.alertThreshold { hm.triggerAlert("tracker", hm.trackerHealth) } if hm.dhtHealth.Score < hm.alertThreshold { hm.triggerAlert("dht", hm.dhtHealth) } if hm.webseedHealth.Score < hm.alertThreshold { hm.triggerAlert("webseed", hm.webseedHealth) } } // triggerAlert triggers an alert for a component func (hm *P2PHealthMonitor) triggerAlert(component string, status *HealthStatus) { log.Printf("P2P ALERT: %s health degraded (score: %d, issues: %v)", component, status.Score, status.Issues) // Call registered alert callbacks for _, callback := range hm.alertCallbacks { go callback(component, status) } } // RegisterAlertCallback registers a callback for health alerts func (hm *P2PHealthMonitor) RegisterAlertCallback(callback func(component string, status *HealthStatus)) { hm.alertCallbacks = append(hm.alertCallbacks, callback) } // GetHealth returns current health status for all components func (hm *P2PHealthMonitor) GetHealth() map[string]*HealthStatus { hm.mutex.RLock() defer hm.mutex.RUnlock() return map[string]*HealthStatus{ "overall": hm.overallHealth, "tracker": hm.trackerHealth, "dht": hm.dhtHealth, "webseed": hm.webseedHealth, } } // ForceHealthCheck triggers an immediate health check func (hm *P2PHealthMonitor) ForceHealthCheck() { go hm.performHealthChecks() }