enki b3204ea07a
Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / Build Docker Images (push) Blocked by required conditions
CI Pipeline / E2E Tests (push) Blocked by required conditions
first commit
2025-08-18 00:40:15 -07:00

291 lines
7.0 KiB
Go

package tracker
import (
"log"
"sync"
"time"
)
// PeerManager handles peer lifecycle and cleanup operations
type PeerManager struct {
tracker *Tracker
mutex sync.RWMutex
}
// NewPeerManager creates a new peer manager
func NewPeerManager(tracker *Tracker) *PeerManager {
pm := &PeerManager{
tracker: tracker,
}
// Start background cleanup routine
go pm.startCleanupRoutine()
return pm
}
// AddPeer adds or updates a peer for a specific torrent
func (pm *PeerManager) AddPeer(infoHash string, peer *PeerInfo) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
// Initialize torrent peer map if not exists
if pm.tracker.peers[infoHash] == nil {
pm.tracker.peers[infoHash] = make(map[string]*PeerInfo)
}
// Update last seen time
peer.LastSeen = time.Now()
// Store peer
pm.tracker.peers[infoHash][peer.PeerID] = peer
log.Printf("Added/updated peer %s for torrent %s (left: %d)",
peer.PeerID[:8], infoHash[:8], peer.Left)
}
// RemovePeer removes a peer from a specific torrent
func (pm *PeerManager) RemovePeer(infoHash, peerID string) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if torrentPeers, exists := pm.tracker.peers[infoHash]; exists {
if _, peerExists := torrentPeers[peerID]; peerExists {
delete(torrentPeers, peerID)
log.Printf("Removed peer %s from torrent %s", peerID[:8], infoHash[:8])
// Remove empty torrent entries
if len(torrentPeers) == 0 {
delete(pm.tracker.peers, infoHash)
log.Printf("Removed empty torrent %s", infoHash[:8])
}
}
}
}
// GetPeers returns all peers for a specific torrent
func (pm *PeerManager) GetPeers(infoHash string) map[string]*PeerInfo {
pm.mutex.RLock()
defer pm.mutex.RUnlock()
if torrentPeers, exists := pm.tracker.peers[infoHash]; exists {
// Create a copy to avoid concurrent access issues
peersCopy := make(map[string]*PeerInfo)
for id, peer := range torrentPeers {
peersCopy[id] = &PeerInfo{
PeerID: peer.PeerID,
IP: peer.IP,
Port: peer.Port,
Uploaded: peer.Uploaded,
Downloaded: peer.Downloaded,
Left: peer.Left,
LastSeen: peer.LastSeen,
Event: peer.Event,
Key: peer.Key,
Compact: peer.Compact,
}
}
return peersCopy
}
return make(map[string]*PeerInfo)
}
// GetAllTorrents returns info hashes of all tracked torrents
func (pm *PeerManager) GetAllTorrents() []string {
pm.mutex.RLock()
defer pm.mutex.RUnlock()
var torrents []string
for infoHash := range pm.tracker.peers {
torrents = append(torrents, infoHash)
}
return torrents
}
// UpdatePeerStats updates upload/download statistics for a peer
func (pm *PeerManager) UpdatePeerStats(infoHash, peerID string, uploaded, downloaded, left int64) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if torrentPeers, exists := pm.tracker.peers[infoHash]; exists {
if peer, peerExists := torrentPeers[peerID]; peerExists {
peer.Uploaded = uploaded
peer.Downloaded = downloaded
peer.Left = left
peer.LastSeen = time.Now()
}
}
}
// MarkPeerCompleted marks a peer as having completed the download
func (pm *PeerManager) MarkPeerCompleted(infoHash, peerID string) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if torrentPeers, exists := pm.tracker.peers[infoHash]; exists {
if peer, peerExists := torrentPeers[peerID]; peerExists {
peer.Left = 0
peer.Event = "completed"
peer.LastSeen = time.Now()
log.Printf("Peer %s completed torrent %s", peerID[:8], infoHash[:8])
}
}
}
// startCleanupRoutine starts the background cleanup process
func (pm *PeerManager) startCleanupRoutine() {
if pm.tracker.config.CleanupInterval <= 0 {
log.Printf("Cleanup routine disabled (interval <= 0)")
return
}
ticker := time.NewTicker(pm.tracker.config.CleanupInterval)
defer ticker.Stop()
log.Printf("Starting peer cleanup routine (interval: %v, timeout: %v)",
pm.tracker.config.CleanupInterval, pm.tracker.config.PeerTimeout)
for range ticker.C {
pm.cleanupExpiredPeers()
}
}
// cleanupExpiredPeers removes peers that haven't announced recently
func (pm *PeerManager) cleanupExpiredPeers() {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if pm.tracker.config.PeerTimeout <= 0 {
return
}
now := time.Now()
expiry := now.Add(-pm.tracker.config.PeerTimeout)
removedPeers := 0
removedTorrents := 0
for infoHash, torrentPeers := range pm.tracker.peers {
initialPeerCount := len(torrentPeers)
// Remove expired peers
for peerID, peer := range torrentPeers {
if peer.LastSeen.Before(expiry) {
delete(torrentPeers, peerID)
removedPeers++
}
}
// Remove empty torrents
if len(torrentPeers) == 0 && initialPeerCount > 0 {
delete(pm.tracker.peers, infoHash)
removedTorrents++
}
}
if removedPeers > 0 || removedTorrents > 0 {
log.Printf("Cleanup completed: removed %d expired peers and %d empty torrents",
removedPeers, removedTorrents)
}
}
// GetTorrentStats returns statistics for a specific torrent
func (pm *PeerManager) GetTorrentStats(infoHash string) map[string]interface{} {
pm.mutex.RLock()
defer pm.mutex.RUnlock()
stats := map[string]interface{}{
"info_hash": infoHash,
"seeders": 0,
"leechers": 0,
"total": 0,
"last_activity": "",
}
if torrentPeers, exists := pm.tracker.peers[infoHash]; exists {
var lastActivity time.Time
for _, peer := range torrentPeers {
if peer.Left == 0 {
stats["seeders"] = stats["seeders"].(int) + 1
} else {
stats["leechers"] = stats["leechers"].(int) + 1
}
if peer.LastSeen.After(lastActivity) {
lastActivity = peer.LastSeen
}
}
stats["total"] = len(torrentPeers)
if !lastActivity.IsZero() {
stats["last_activity"] = lastActivity.Format(time.RFC3339)
}
}
return stats
}
// GetAllStats returns comprehensive tracker statistics
func (pm *PeerManager) GetAllStats() map[string]interface{} {
pm.mutex.RLock()
defer pm.mutex.RUnlock()
totalTorrents := len(pm.tracker.peers)
totalPeers := 0
totalSeeders := 0
totalLeechers := 0
var oldestPeer, newestPeer time.Time
for _, torrentPeers := range pm.tracker.peers {
totalPeers += len(torrentPeers)
for _, peer := range torrentPeers {
if peer.Left == 0 {
totalSeeders++
} else {
totalLeechers++
}
// Track oldest and newest peer activity
if oldestPeer.IsZero() || peer.LastSeen.Before(oldestPeer) {
oldestPeer = peer.LastSeen
}
if peer.LastSeen.After(newestPeer) {
newestPeer = peer.LastSeen
}
}
}
stats := map[string]interface{}{
"torrents": totalTorrents,
"total_peers": totalPeers,
"total_seeders": totalSeeders,
"total_leechers": totalLeechers,
"uptime": time.Since(pm.tracker.startTime).String(),
}
if !oldestPeer.IsZero() {
stats["oldest_peer"] = oldestPeer.Format(time.RFC3339)
}
if !newestPeer.IsZero() {
stats["newest_peer"] = newestPeer.Format(time.RFC3339)
}
return stats
}
// ForceCleanup manually triggers peer cleanup
func (pm *PeerManager) ForceCleanup() map[string]interface{} {
log.Printf("Manual cleanup triggered")
before := pm.GetAllStats()
pm.cleanupExpiredPeers()
after := pm.GetAllStats()
return map[string]interface{}{
"before": before,
"after": after,
}
}