Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / Build Docker Images (push) Blocked by required conditions
CI Pipeline / E2E Tests (push) Blocked by required conditions
254 lines
6.7 KiB
Go
254 lines
6.7 KiB
Go
package p2p
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
// P2PRateLimiter manages rate limiting for P2P operations
|
|
type P2PRateLimiter struct {
|
|
// Per-IP rate limiters
|
|
ipLimiters map[string]*IPLimiter
|
|
ipMutex sync.RWMutex
|
|
|
|
// Global rate limits
|
|
announceLimit *rate.Limiter // Global announce rate
|
|
scrapeLimit *rate.Limiter // Global scrape rate
|
|
dhtLimit *rate.Limiter // Global DHT query rate
|
|
|
|
// Configuration
|
|
perIPAnnounceRate rate.Limit
|
|
perIPScrapeRate rate.Limit
|
|
perIPDHTRate rate.Limit
|
|
perIPBurst int
|
|
|
|
// Cleanup
|
|
cleanupInterval time.Duration
|
|
lastCleanup time.Time
|
|
}
|
|
|
|
// IPLimiter tracks rate limits for a specific IP
|
|
type IPLimiter struct {
|
|
announceLimit *rate.Limiter
|
|
scrapeLimit *rate.Limiter
|
|
dhtLimit *rate.Limiter
|
|
lastSeen time.Time
|
|
}
|
|
|
|
// NewP2PRateLimiter creates a new P2P rate limiter
|
|
func NewP2PRateLimiter() *P2PRateLimiter {
|
|
return &P2PRateLimiter{
|
|
ipLimiters: make(map[string]*IPLimiter),
|
|
|
|
// Global limits (very high to prevent DoS)
|
|
announceLimit: rate.NewLimiter(1000, 2000), // 1000/sec, burst 2000
|
|
scrapeLimit: rate.NewLimiter(100, 200), // 100/sec, burst 200
|
|
dhtLimit: rate.NewLimiter(500, 1000), // 500/sec, burst 1000
|
|
|
|
// Per-IP limits (reasonable for normal clients)
|
|
perIPAnnounceRate: rate.Limit(1.0 / 30), // 1 announce per 30 seconds
|
|
perIPScrapeRate: rate.Limit(1.0 / 5), // 1 scrape per 5 seconds
|
|
perIPDHTRate: rate.Limit(10), // 10 DHT queries per second
|
|
perIPBurst: 5, // Small burst allowance
|
|
|
|
cleanupInterval: 10 * time.Minute,
|
|
lastCleanup: time.Now(),
|
|
}
|
|
}
|
|
|
|
// AllowAnnounce checks if an announce request should be allowed
|
|
func (rl *P2PRateLimiter) AllowAnnounce(clientIP string) (bool, error) {
|
|
// Check global limit first
|
|
if !rl.announceLimit.Allow() {
|
|
return false, fmt.Errorf("global announce rate limit exceeded")
|
|
}
|
|
|
|
// Check per-IP limit
|
|
ipLimiter := rl.getIPLimiter(clientIP)
|
|
if !ipLimiter.announceLimit.Allow() {
|
|
return false, fmt.Errorf("per-IP announce rate limit exceeded for %s", clientIP)
|
|
}
|
|
|
|
// Update last seen
|
|
rl.updateLastSeen(clientIP)
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// AllowScrape checks if a scrape request should be allowed
|
|
func (rl *P2PRateLimiter) AllowScrape(clientIP string) (bool, error) {
|
|
// Check global limit first
|
|
if !rl.scrapeLimit.Allow() {
|
|
return false, fmt.Errorf("global scrape rate limit exceeded")
|
|
}
|
|
|
|
// Check per-IP limit
|
|
ipLimiter := rl.getIPLimiter(clientIP)
|
|
if !ipLimiter.scrapeLimit.Allow() {
|
|
return false, fmt.Errorf("per-IP scrape rate limit exceeded for %s", clientIP)
|
|
}
|
|
|
|
// Update last seen
|
|
rl.updateLastSeen(clientIP)
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// AllowDHTQuery checks if a DHT query should be allowed
|
|
func (rl *P2PRateLimiter) AllowDHTQuery(clientIP string) (bool, error) {
|
|
// Check global limit first
|
|
if !rl.dhtLimit.Allow() {
|
|
return false, fmt.Errorf("global DHT rate limit exceeded")
|
|
}
|
|
|
|
// Check per-IP limit
|
|
ipLimiter := rl.getIPLimiter(clientIP)
|
|
if !ipLimiter.dhtLimit.Allow() {
|
|
return false, fmt.Errorf("per-IP DHT rate limit exceeded for %s", clientIP)
|
|
}
|
|
|
|
// Update last seen
|
|
rl.updateLastSeen(clientIP)
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// getIPLimiter returns or creates an IP limiter
|
|
func (rl *P2PRateLimiter) getIPLimiter(ip string) *IPLimiter {
|
|
rl.ipMutex.RLock()
|
|
limiter, exists := rl.ipLimiters[ip]
|
|
rl.ipMutex.RUnlock()
|
|
|
|
if exists {
|
|
return limiter
|
|
}
|
|
|
|
// Create new limiter
|
|
rl.ipMutex.Lock()
|
|
defer rl.ipMutex.Unlock()
|
|
|
|
// Double-check after acquiring write lock
|
|
if limiter, exists := rl.ipLimiters[ip]; exists {
|
|
return limiter
|
|
}
|
|
|
|
limiter = &IPLimiter{
|
|
announceLimit: rate.NewLimiter(rl.perIPAnnounceRate, rl.perIPBurst),
|
|
scrapeLimit: rate.NewLimiter(rl.perIPScrapeRate, rl.perIPBurst),
|
|
dhtLimit: rate.NewLimiter(rl.perIPDHTRate, rl.perIPBurst*2), // DHT gets more burst
|
|
lastSeen: time.Now(),
|
|
}
|
|
|
|
rl.ipLimiters[ip] = limiter
|
|
|
|
// Trigger cleanup if needed
|
|
if time.Since(rl.lastCleanup) > rl.cleanupInterval {
|
|
go rl.cleanupStaleIPs()
|
|
}
|
|
|
|
return limiter
|
|
}
|
|
|
|
// updateLastSeen updates the last seen time for an IP
|
|
func (rl *P2PRateLimiter) updateLastSeen(ip string) {
|
|
rl.ipMutex.RLock()
|
|
if limiter, exists := rl.ipLimiters[ip]; exists {
|
|
limiter.lastSeen = time.Now()
|
|
}
|
|
rl.ipMutex.RUnlock()
|
|
}
|
|
|
|
// cleanupStaleIPs removes IP limiters that haven't been seen recently
|
|
func (rl *P2PRateLimiter) cleanupStaleIPs() {
|
|
rl.ipMutex.Lock()
|
|
defer rl.ipMutex.Unlock()
|
|
|
|
cutoff := time.Now().Add(-1 * time.Hour) // Remove IPs not seen for 1 hour
|
|
|
|
for ip, limiter := range rl.ipLimiters {
|
|
if limiter.lastSeen.Before(cutoff) {
|
|
delete(rl.ipLimiters, ip)
|
|
}
|
|
}
|
|
|
|
rl.lastCleanup = time.Now()
|
|
}
|
|
|
|
// GetStats returns rate limiting statistics
|
|
func (rl *P2PRateLimiter) GetStats() map[string]interface{} {
|
|
rl.ipMutex.RLock()
|
|
activeIPs := len(rl.ipLimiters)
|
|
rl.ipMutex.RUnlock()
|
|
|
|
return map[string]interface{}{
|
|
"active_ips": activeIPs,
|
|
"global_announce_limit": rl.announceLimit.Limit(),
|
|
"global_scrape_limit": rl.scrapeLimit.Limit(),
|
|
"global_dht_limit": rl.dhtLimit.Limit(),
|
|
"per_ip_announce_rate": float64(rl.perIPAnnounceRate),
|
|
"per_ip_scrape_rate": float64(rl.perIPScrapeRate),
|
|
"per_ip_dht_rate": float64(rl.perIPDHTRate),
|
|
}
|
|
}
|
|
|
|
// IsRateLimited checks if an IP is currently rate limited
|
|
func (rl *P2PRateLimiter) IsRateLimited(ip string) bool {
|
|
rl.ipMutex.RLock()
|
|
limiter, exists := rl.ipLimiters[ip]
|
|
rl.ipMutex.RUnlock()
|
|
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
// Check if any of the limiters would deny a request
|
|
return !limiter.announceLimit.Allow() &&
|
|
!limiter.scrapeLimit.Allow() &&
|
|
!limiter.dhtLimit.Allow()
|
|
}
|
|
|
|
// GetClientIP extracts client IP from various sources
|
|
func GetClientIP(r *http.Request) string {
|
|
// Check X-Forwarded-For header first
|
|
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
|
|
// Take the first IP in the chain
|
|
if ips := strings.Split(xff, ","); len(ips) > 0 {
|
|
return strings.TrimSpace(ips[0])
|
|
}
|
|
}
|
|
|
|
// Check X-Real-IP header
|
|
if xri := r.Header.Get("X-Real-IP"); xri != "" {
|
|
return strings.TrimSpace(xri)
|
|
}
|
|
|
|
// Fall back to RemoteAddr
|
|
ip, _, err := net.SplitHostPort(r.RemoteAddr)
|
|
if err != nil {
|
|
return r.RemoteAddr
|
|
}
|
|
|
|
return ip
|
|
}
|
|
|
|
// AdjustLimitsForLoad dynamically adjusts rate limits based on system load
|
|
func (rl *P2PRateLimiter) AdjustLimitsForLoad(cpuUsage, memoryUsage float64) {
|
|
// If system is under heavy load, reduce limits
|
|
if cpuUsage > 80.0 || memoryUsage > 80.0 {
|
|
// Reduce global limits by 50%
|
|
rl.announceLimit.SetLimit(500)
|
|
rl.scrapeLimit.SetLimit(50)
|
|
rl.dhtLimit.SetLimit(250)
|
|
} else if cpuUsage < 40.0 && memoryUsage < 40.0 {
|
|
// System has capacity, restore normal limits
|
|
rl.announceLimit.SetLimit(1000)
|
|
rl.scrapeLimit.SetLimit(100)
|
|
rl.dhtLimit.SetLimit(500)
|
|
}
|
|
} |