package p2p import ( "fmt" "net" "net/http" "strings" "sync" "time" "golang.org/x/time/rate" ) // P2PRateLimiter manages rate limiting for P2P operations type P2PRateLimiter struct { // Per-IP rate limiters ipLimiters map[string]*IPLimiter ipMutex sync.RWMutex // Global rate limits announceLimit *rate.Limiter // Global announce rate scrapeLimit *rate.Limiter // Global scrape rate dhtLimit *rate.Limiter // Global DHT query rate // Configuration perIPAnnounceRate rate.Limit perIPScrapeRate rate.Limit perIPDHTRate rate.Limit perIPBurst int // Cleanup cleanupInterval time.Duration lastCleanup time.Time } // IPLimiter tracks rate limits for a specific IP type IPLimiter struct { announceLimit *rate.Limiter scrapeLimit *rate.Limiter dhtLimit *rate.Limiter lastSeen time.Time } // NewP2PRateLimiter creates a new P2P rate limiter func NewP2PRateLimiter() *P2PRateLimiter { return &P2PRateLimiter{ ipLimiters: make(map[string]*IPLimiter), // Global limits (very high to prevent DoS) announceLimit: rate.NewLimiter(1000, 2000), // 1000/sec, burst 2000 scrapeLimit: rate.NewLimiter(100, 200), // 100/sec, burst 200 dhtLimit: rate.NewLimiter(500, 1000), // 500/sec, burst 1000 // Per-IP limits (reasonable for normal clients) perIPAnnounceRate: rate.Limit(1.0 / 30), // 1 announce per 30 seconds perIPScrapeRate: rate.Limit(1.0 / 5), // 1 scrape per 5 seconds perIPDHTRate: rate.Limit(10), // 10 DHT queries per second perIPBurst: 5, // Small burst allowance cleanupInterval: 10 * time.Minute, lastCleanup: time.Now(), } } // AllowAnnounce checks if an announce request should be allowed func (rl *P2PRateLimiter) AllowAnnounce(clientIP string) (bool, error) { // Check global limit first if !rl.announceLimit.Allow() { return false, fmt.Errorf("global announce rate limit exceeded") } // Check per-IP limit ipLimiter := rl.getIPLimiter(clientIP) if !ipLimiter.announceLimit.Allow() { return false, fmt.Errorf("per-IP announce rate limit exceeded for %s", clientIP) } // Update last seen rl.updateLastSeen(clientIP) return true, nil } // AllowScrape checks if a scrape request should be allowed func (rl *P2PRateLimiter) AllowScrape(clientIP string) (bool, error) { // Check global limit first if !rl.scrapeLimit.Allow() { return false, fmt.Errorf("global scrape rate limit exceeded") } // Check per-IP limit ipLimiter := rl.getIPLimiter(clientIP) if !ipLimiter.scrapeLimit.Allow() { return false, fmt.Errorf("per-IP scrape rate limit exceeded for %s", clientIP) } // Update last seen rl.updateLastSeen(clientIP) return true, nil } // AllowDHTQuery checks if a DHT query should be allowed func (rl *P2PRateLimiter) AllowDHTQuery(clientIP string) (bool, error) { // Check global limit first if !rl.dhtLimit.Allow() { return false, fmt.Errorf("global DHT rate limit exceeded") } // Check per-IP limit ipLimiter := rl.getIPLimiter(clientIP) if !ipLimiter.dhtLimit.Allow() { return false, fmt.Errorf("per-IP DHT rate limit exceeded for %s", clientIP) } // Update last seen rl.updateLastSeen(clientIP) return true, nil } // getIPLimiter returns or creates an IP limiter func (rl *P2PRateLimiter) getIPLimiter(ip string) *IPLimiter { rl.ipMutex.RLock() limiter, exists := rl.ipLimiters[ip] rl.ipMutex.RUnlock() if exists { return limiter } // Create new limiter rl.ipMutex.Lock() defer rl.ipMutex.Unlock() // Double-check after acquiring write lock if limiter, exists := rl.ipLimiters[ip]; exists { return limiter } limiter = &IPLimiter{ announceLimit: rate.NewLimiter(rl.perIPAnnounceRate, rl.perIPBurst), scrapeLimit: rate.NewLimiter(rl.perIPScrapeRate, rl.perIPBurst), dhtLimit: rate.NewLimiter(rl.perIPDHTRate, rl.perIPBurst*2), // DHT gets more burst lastSeen: time.Now(), } rl.ipLimiters[ip] = limiter // Trigger cleanup if needed if time.Since(rl.lastCleanup) > rl.cleanupInterval { go rl.cleanupStaleIPs() } return limiter } // updateLastSeen updates the last seen time for an IP func (rl *P2PRateLimiter) updateLastSeen(ip string) { rl.ipMutex.RLock() if limiter, exists := rl.ipLimiters[ip]; exists { limiter.lastSeen = time.Now() } rl.ipMutex.RUnlock() } // cleanupStaleIPs removes IP limiters that haven't been seen recently func (rl *P2PRateLimiter) cleanupStaleIPs() { rl.ipMutex.Lock() defer rl.ipMutex.Unlock() cutoff := time.Now().Add(-1 * time.Hour) // Remove IPs not seen for 1 hour for ip, limiter := range rl.ipLimiters { if limiter.lastSeen.Before(cutoff) { delete(rl.ipLimiters, ip) } } rl.lastCleanup = time.Now() } // GetStats returns rate limiting statistics func (rl *P2PRateLimiter) GetStats() map[string]interface{} { rl.ipMutex.RLock() activeIPs := len(rl.ipLimiters) rl.ipMutex.RUnlock() return map[string]interface{}{ "active_ips": activeIPs, "global_announce_limit": rl.announceLimit.Limit(), "global_scrape_limit": rl.scrapeLimit.Limit(), "global_dht_limit": rl.dhtLimit.Limit(), "per_ip_announce_rate": float64(rl.perIPAnnounceRate), "per_ip_scrape_rate": float64(rl.perIPScrapeRate), "per_ip_dht_rate": float64(rl.perIPDHTRate), } } // IsRateLimited checks if an IP is currently rate limited func (rl *P2PRateLimiter) IsRateLimited(ip string) bool { rl.ipMutex.RLock() limiter, exists := rl.ipLimiters[ip] rl.ipMutex.RUnlock() if !exists { return false } // Check if any of the limiters would deny a request return !limiter.announceLimit.Allow() && !limiter.scrapeLimit.Allow() && !limiter.dhtLimit.Allow() } // GetClientIP extracts client IP from various sources func GetClientIP(r *http.Request) string { // Check X-Forwarded-For header first if xff := r.Header.Get("X-Forwarded-For"); xff != "" { // Take the first IP in the chain if ips := strings.Split(xff, ","); len(ips) > 0 { return strings.TrimSpace(ips[0]) } } // Check X-Real-IP header if xri := r.Header.Get("X-Real-IP"); xri != "" { return strings.TrimSpace(xri) } // Fall back to RemoteAddr ip, _, err := net.SplitHostPort(r.RemoteAddr) if err != nil { return r.RemoteAddr } return ip } // AdjustLimitsForLoad dynamically adjusts rate limits based on system load func (rl *P2PRateLimiter) AdjustLimitsForLoad(cpuUsage, memoryUsage float64) { // If system is under heavy load, reduce limits if cpuUsage > 80.0 || memoryUsage > 80.0 { // Reduce global limits by 50% rl.announceLimit.SetLimit(500) rl.scrapeLimit.SetLimit(50) rl.dhtLimit.SetLimit(250) } else if cpuUsage < 40.0 && memoryUsage < 40.0 { // System has capacity, restore normal limits rl.announceLimit.SetLimit(1000) rl.scrapeLimit.SetLimit(100) rl.dhtLimit.SetLimit(500) } }