torrent-gateway/internal/p2p/peer_ranker.go
enki b3204ea07a
Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / Build Docker Images (push) Blocked by required conditions
CI Pipeline / E2E Tests (push) Blocked by required conditions
first commit
2025-08-18 00:40:15 -07:00

258 lines
6.8 KiB
Go

package p2p
import (
"math"
"net"
"sort"
"strings"
"sync"
"time"
)
// PeerRanker implements smart peer selection and load balancing
type PeerRanker struct {
maxPeersToReturn int
preferLocal bool
geoIP *GeoIPDatabase
peerHistory map[string]*PeerQuality
qualityMutex sync.RWMutex
}
// PeerQuality tracks peer performance history
type PeerQuality struct {
SuccessfulConnections int64
FailedConnections int64
AverageSpeed float64 // bytes/sec
LastConnected time.Time
ReputationScore float64 // 0.0 - 1.0
}
// GeoIPDatabase simulates geographic IP lookup
type GeoIPDatabase struct {
// In production, this would be a real GeoIP database
enabled bool
}
// NewPeerRanker creates a new peer ranking system
func NewPeerRanker(maxPeers int, preferLocal bool) *PeerRanker {
return &PeerRanker{
maxPeersToReturn: maxPeers,
preferLocal: preferLocal,
geoIP: &GeoIPDatabase{enabled: false}, // Disabled for now
peerHistory: make(map[string]*PeerQuality),
}
}
// RankPeers intelligently ranks and selects best peers for a client
func (pr *PeerRanker) RankPeers(peers []PeerInfo, clientIP string) []PeerInfo {
if len(peers) == 0 {
return peers
}
// Calculate scores for each peer
type scoredPeer struct {
peer PeerInfo
score float64
}
var scored []scoredPeer
clientCountry := pr.getCountryCode(clientIP)
for _, peer := range peers {
score := pr.calculatePeerScore(peer, clientIP, clientCountry)
scored = append(scored, scoredPeer{peer: peer, score: score})
}
// Sort by score (highest first)
sort.Slice(scored, func(i, j int) bool {
return scored[i].score > scored[j].score
})
// Return top peers up to limit
result := make([]PeerInfo, 0, pr.maxPeersToReturn)
for i, sp := range scored {
if i >= pr.maxPeersToReturn {
break
}
result = append(result, sp.peer)
}
return result
}
// calculatePeerScore computes a comprehensive score for peer ranking
func (pr *PeerRanker) calculatePeerScore(peer PeerInfo, clientIP, clientCountry string) float64 {
score := float64(peer.Quality) // Base quality from source
// 1. WebSeed gets highest priority (always first)
if peer.Source == "webseed" {
return 1000.0 // Always highest score
}
// 2. Geographic proximity bonus
if pr.preferLocal && pr.geoIP.enabled {
peerCountry := pr.getCountryCode(peer.IP)
if peerCountry == clientCountry {
score += 50.0 // Local peers get significant boost
} else if pr.isSameContinent(clientCountry, peerCountry) {
score += 20.0 // Same continent gets smaller boost
}
}
// 3. Network proximity (same subnet bonus)
if pr.isSameSubnet(clientIP, peer.IP) {
score += 30.0
}
// 4. Peer history and reputation
pr.qualityMutex.RLock()
if quality, exists := pr.peerHistory[pr.peerKey(peer)]; exists {
// Factor in success rate
if quality.SuccessfulConnections+quality.FailedConnections > 0 {
successRate := float64(quality.SuccessfulConnections) /
float64(quality.SuccessfulConnections+quality.FailedConnections)
score += successRate * 40.0 // Up to 40 point bonus for reliable peers
}
// Factor in speed history
if quality.AverageSpeed > 0 {
// Bonus for fast peers (normalized, max 20 points)
speedBonus := math.Min(quality.AverageSpeed/1024/1024, 20.0) // MB/s -> points
score += speedBonus
}
// Reputation score
score += quality.ReputationScore * 25.0
// Recency bonus - prefer recently seen peers
recencyHours := time.Since(quality.LastConnected).Hours()
if recencyHours < 1 {
score += 15.0 // Very recent
} else if recencyHours < 24 {
score += 10.0 // Recent
}
}
pr.qualityMutex.RUnlock()
// 5. Port analysis (avoid suspicious ports)
if pr.isSuspiciousPort(peer.Port) {
score -= 20.0
}
// 6. Ensure minimum score for valid peers
if score < 1.0 {
score = 1.0
}
return score
}
// UpdatePeerQuality updates peer performance history
func (pr *PeerRanker) UpdatePeerQuality(peer PeerInfo, success bool, speed float64) {
pr.qualityMutex.Lock()
defer pr.qualityMutex.Unlock()
key := pr.peerKey(peer)
quality, exists := pr.peerHistory[key]
if !exists {
quality = &PeerQuality{
ReputationScore: 0.5, // Start with neutral reputation
}
pr.peerHistory[key] = quality
}
// Update connection statistics
if success {
quality.SuccessfulConnections++
// Boost reputation for successful connections
quality.ReputationScore = math.Min(1.0, quality.ReputationScore+0.1)
} else {
quality.FailedConnections++
// Decrease reputation for failed connections
quality.ReputationScore = math.Max(0.0, quality.ReputationScore-0.2)
}
// Update speed (exponential moving average)
if speed > 0 {
if quality.AverageSpeed == 0 {
quality.AverageSpeed = speed
} else {
// 80% old speed, 20% new speed
quality.AverageSpeed = quality.AverageSpeed*0.8 + speed*0.2
}
}
quality.LastConnected = time.Now()
}
// Helper methods for peer analysis
func (pr *PeerRanker) peerKey(peer PeerInfo) string {
return peer.IP + ":" + string(rune(peer.Port))
}
func (pr *PeerRanker) getCountryCode(ip string) string {
if !pr.geoIP.enabled {
return "Unknown"
}
// In production, this would query a real GeoIP database
// For now, simulate based on IP ranges
if strings.HasPrefix(ip, "192.168.") || strings.HasPrefix(ip, "10.") {
return "Local"
}
return "Unknown"
}
func (pr *PeerRanker) isSameContinent(country1, country2 string) bool {
// Simplified continent mapping
continentMap := map[string]string{
"US": "NA", "CA": "NA", "MX": "NA",
"GB": "EU", "DE": "EU", "FR": "EU",
"JP": "AS", "CN": "AS", "IN": "AS",
}
return continentMap[country1] == continentMap[country2]
}
func (pr *PeerRanker) isSameSubnet(ip1, ip2 string) bool {
// Parse IPs and check if they're in same /24 subnet
parsedIP1 := net.ParseIP(ip1)
parsedIP2 := net.ParseIP(ip2)
if parsedIP1 == nil || parsedIP2 == nil {
return false
}
// Create /24 subnet mask
mask := net.CIDRMask(24, 32)
return parsedIP1.Mask(mask).Equal(parsedIP2.Mask(mask))
}
func (pr *PeerRanker) isSuspiciousPort(port int) bool {
// Flag potentially suspicious ports
suspiciousPorts := map[int]bool{
22: true, // SSH
23: true, // Telnet
25: true, // SMTP
53: true, // DNS
80: true, // HTTP (could be misconfigured server)
443: true, // HTTPS (could be misconfigured server)
3389: true, // RDP
}
// Also flag ports < 1024 (privileged ports are suspicious for P2P)
return suspiciousPorts[port] || port < 1024
}
// CleanupStaleEntries removes old peer quality data
func (pr *PeerRanker) CleanupStaleEntries() {
pr.qualityMutex.Lock()
defer pr.qualityMutex.Unlock()
cutoff := time.Now().Add(-7 * 24 * time.Hour) // Remove data older than 7 days
for key, quality := range pr.peerHistory {
if quality.LastConnected.Before(cutoff) {
delete(pr.peerHistory, key)
}
}
}