torrent-gateway/internal/p2p/rate_limiter.go
enki b3204ea07a
Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / Build Docker Images (push) Blocked by required conditions
CI Pipeline / E2E Tests (push) Blocked by required conditions
first commit
2025-08-18 00:40:15 -07:00

254 lines
6.7 KiB
Go

package p2p
import (
"fmt"
"net"
"net/http"
"strings"
"sync"
"time"
"golang.org/x/time/rate"
)
// P2PRateLimiter manages rate limiting for P2P operations
type P2PRateLimiter struct {
// Per-IP rate limiters
ipLimiters map[string]*IPLimiter
ipMutex sync.RWMutex
// Global rate limits
announceLimit *rate.Limiter // Global announce rate
scrapeLimit *rate.Limiter // Global scrape rate
dhtLimit *rate.Limiter // Global DHT query rate
// Configuration
perIPAnnounceRate rate.Limit
perIPScrapeRate rate.Limit
perIPDHTRate rate.Limit
perIPBurst int
// Cleanup
cleanupInterval time.Duration
lastCleanup time.Time
}
// IPLimiter tracks rate limits for a specific IP
type IPLimiter struct {
announceLimit *rate.Limiter
scrapeLimit *rate.Limiter
dhtLimit *rate.Limiter
lastSeen time.Time
}
// NewP2PRateLimiter creates a new P2P rate limiter
func NewP2PRateLimiter() *P2PRateLimiter {
return &P2PRateLimiter{
ipLimiters: make(map[string]*IPLimiter),
// Global limits (very high to prevent DoS)
announceLimit: rate.NewLimiter(1000, 2000), // 1000/sec, burst 2000
scrapeLimit: rate.NewLimiter(100, 200), // 100/sec, burst 200
dhtLimit: rate.NewLimiter(500, 1000), // 500/sec, burst 1000
// Per-IP limits (reasonable for normal clients)
perIPAnnounceRate: rate.Limit(1.0 / 30), // 1 announce per 30 seconds
perIPScrapeRate: rate.Limit(1.0 / 5), // 1 scrape per 5 seconds
perIPDHTRate: rate.Limit(10), // 10 DHT queries per second
perIPBurst: 5, // Small burst allowance
cleanupInterval: 10 * time.Minute,
lastCleanup: time.Now(),
}
}
// AllowAnnounce checks if an announce request should be allowed
func (rl *P2PRateLimiter) AllowAnnounce(clientIP string) (bool, error) {
// Check global limit first
if !rl.announceLimit.Allow() {
return false, fmt.Errorf("global announce rate limit exceeded")
}
// Check per-IP limit
ipLimiter := rl.getIPLimiter(clientIP)
if !ipLimiter.announceLimit.Allow() {
return false, fmt.Errorf("per-IP announce rate limit exceeded for %s", clientIP)
}
// Update last seen
rl.updateLastSeen(clientIP)
return true, nil
}
// AllowScrape checks if a scrape request should be allowed
func (rl *P2PRateLimiter) AllowScrape(clientIP string) (bool, error) {
// Check global limit first
if !rl.scrapeLimit.Allow() {
return false, fmt.Errorf("global scrape rate limit exceeded")
}
// Check per-IP limit
ipLimiter := rl.getIPLimiter(clientIP)
if !ipLimiter.scrapeLimit.Allow() {
return false, fmt.Errorf("per-IP scrape rate limit exceeded for %s", clientIP)
}
// Update last seen
rl.updateLastSeen(clientIP)
return true, nil
}
// AllowDHTQuery checks if a DHT query should be allowed
func (rl *P2PRateLimiter) AllowDHTQuery(clientIP string) (bool, error) {
// Check global limit first
if !rl.dhtLimit.Allow() {
return false, fmt.Errorf("global DHT rate limit exceeded")
}
// Check per-IP limit
ipLimiter := rl.getIPLimiter(clientIP)
if !ipLimiter.dhtLimit.Allow() {
return false, fmt.Errorf("per-IP DHT rate limit exceeded for %s", clientIP)
}
// Update last seen
rl.updateLastSeen(clientIP)
return true, nil
}
// getIPLimiter returns or creates an IP limiter
func (rl *P2PRateLimiter) getIPLimiter(ip string) *IPLimiter {
rl.ipMutex.RLock()
limiter, exists := rl.ipLimiters[ip]
rl.ipMutex.RUnlock()
if exists {
return limiter
}
// Create new limiter
rl.ipMutex.Lock()
defer rl.ipMutex.Unlock()
// Double-check after acquiring write lock
if limiter, exists := rl.ipLimiters[ip]; exists {
return limiter
}
limiter = &IPLimiter{
announceLimit: rate.NewLimiter(rl.perIPAnnounceRate, rl.perIPBurst),
scrapeLimit: rate.NewLimiter(rl.perIPScrapeRate, rl.perIPBurst),
dhtLimit: rate.NewLimiter(rl.perIPDHTRate, rl.perIPBurst*2), // DHT gets more burst
lastSeen: time.Now(),
}
rl.ipLimiters[ip] = limiter
// Trigger cleanup if needed
if time.Since(rl.lastCleanup) > rl.cleanupInterval {
go rl.cleanupStaleIPs()
}
return limiter
}
// updateLastSeen updates the last seen time for an IP
func (rl *P2PRateLimiter) updateLastSeen(ip string) {
rl.ipMutex.RLock()
if limiter, exists := rl.ipLimiters[ip]; exists {
limiter.lastSeen = time.Now()
}
rl.ipMutex.RUnlock()
}
// cleanupStaleIPs removes IP limiters that haven't been seen recently
func (rl *P2PRateLimiter) cleanupStaleIPs() {
rl.ipMutex.Lock()
defer rl.ipMutex.Unlock()
cutoff := time.Now().Add(-1 * time.Hour) // Remove IPs not seen for 1 hour
for ip, limiter := range rl.ipLimiters {
if limiter.lastSeen.Before(cutoff) {
delete(rl.ipLimiters, ip)
}
}
rl.lastCleanup = time.Now()
}
// GetStats returns rate limiting statistics
func (rl *P2PRateLimiter) GetStats() map[string]interface{} {
rl.ipMutex.RLock()
activeIPs := len(rl.ipLimiters)
rl.ipMutex.RUnlock()
return map[string]interface{}{
"active_ips": activeIPs,
"global_announce_limit": rl.announceLimit.Limit(),
"global_scrape_limit": rl.scrapeLimit.Limit(),
"global_dht_limit": rl.dhtLimit.Limit(),
"per_ip_announce_rate": float64(rl.perIPAnnounceRate),
"per_ip_scrape_rate": float64(rl.perIPScrapeRate),
"per_ip_dht_rate": float64(rl.perIPDHTRate),
}
}
// IsRateLimited checks if an IP is currently rate limited
func (rl *P2PRateLimiter) IsRateLimited(ip string) bool {
rl.ipMutex.RLock()
limiter, exists := rl.ipLimiters[ip]
rl.ipMutex.RUnlock()
if !exists {
return false
}
// Check if any of the limiters would deny a request
return !limiter.announceLimit.Allow() &&
!limiter.scrapeLimit.Allow() &&
!limiter.dhtLimit.Allow()
}
// GetClientIP extracts client IP from various sources
func GetClientIP(r *http.Request) string {
// Check X-Forwarded-For header first
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
// Take the first IP in the chain
if ips := strings.Split(xff, ","); len(ips) > 0 {
return strings.TrimSpace(ips[0])
}
}
// Check X-Real-IP header
if xri := r.Header.Get("X-Real-IP"); xri != "" {
return strings.TrimSpace(xri)
}
// Fall back to RemoteAddr
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return r.RemoteAddr
}
return ip
}
// AdjustLimitsForLoad dynamically adjusts rate limits based on system load
func (rl *P2PRateLimiter) AdjustLimitsForLoad(cpuUsage, memoryUsage float64) {
// If system is under heavy load, reduce limits
if cpuUsage > 80.0 || memoryUsage > 80.0 {
// Reduce global limits by 50%
rl.announceLimit.SetLimit(500)
rl.scrapeLimit.SetLimit(50)
rl.dhtLimit.SetLimit(250)
} else if cpuUsage < 40.0 && memoryUsage < 40.0 {
// System has capacity, restore normal limits
rl.announceLimit.SetLimit(1000)
rl.scrapeLimit.SetLimit(100)
rl.dhtLimit.SetLimit(500)
}
}