enki b3204ea07a
Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / Build Docker Images (push) Blocked by required conditions
CI Pipeline / E2E Tests (push) Blocked by required conditions
first commit
2025-08-18 00:40:15 -07:00

229 lines
5.4 KiB
Go

package blossom
import (
"context"
"fmt"
"log"
"net/http"
"sync"
"sync/atomic"
"time"
)
// BlossomPool manages a pool of Blossom server connections with load balancing
type BlossomPool struct {
servers []PooledClient
healthMutex sync.RWMutex
roundRobin uint64
config *PoolConfig
}
// PooledClient wraps a Blossom client with health status
type PooledClient struct {
client *Client
serverURL string
healthy bool
lastCheck time.Time
failures int
mutex sync.RWMutex
}
// PoolConfig configures the connection pool
type PoolConfig struct {
HealthCheckInterval time.Duration
HealthCheckTimeout time.Duration
MaxFailures int
RetryDelay time.Duration
LoadBalanceMethod string // "round_robin", "least_connections", "health_weighted"
}
// NewBlossomPool creates a new connection pool for Blossom servers
func NewBlossomPool(serverURLs []string, config *PoolConfig) (*BlossomPool, error) {
if len(serverURLs) == 0 {
return nil, fmt.Errorf("no Blossom servers provided")
}
if config == nil {
config = &PoolConfig{
HealthCheckInterval: 30 * time.Second,
HealthCheckTimeout: 5 * time.Second,
MaxFailures: 3,
RetryDelay: 10 * time.Second,
LoadBalanceMethod: "round_robin",
}
}
pool := &BlossomPool{
servers: make([]PooledClient, len(serverURLs)),
config: config,
}
// Initialize clients
for i, serverURL := range serverURLs {
client := NewClient(serverURL)
pool.servers[i] = PooledClient{
client: client,
serverURL: serverURL,
healthy: true, // Assume healthy initially
lastCheck: time.Now(),
}
}
// Start health check routine
go pool.healthCheckRoutine()
return pool, nil
}
// GetClient returns a healthy client using load balancing
func (p *BlossomPool) GetClient() *Client {
p.healthMutex.RLock()
defer p.healthMutex.RUnlock()
// Get healthy servers
var healthyServers []int
for i := range p.servers {
p.servers[i].mutex.RLock()
if p.servers[i].healthy {
healthyServers = append(healthyServers, i)
}
p.servers[i].mutex.RUnlock()
}
if len(healthyServers) == 0 {
log.Printf("Warning: No healthy Blossom servers available, using first server")
return p.servers[0].client
}
// Load balance among healthy servers
switch p.config.LoadBalanceMethod {
case "round_robin":
idx := atomic.AddUint64(&p.roundRobin, 1) % uint64(len(healthyServers))
return p.servers[healthyServers[idx]].client
default:
// Default to round robin
idx := atomic.AddUint64(&p.roundRobin, 1) % uint64(len(healthyServers))
return p.servers[healthyServers[idx]].client
}
}
// healthCheckRoutine periodically checks server health
func (p *BlossomPool) healthCheckRoutine() {
ticker := time.NewTicker(p.config.HealthCheckInterval)
defer ticker.Stop()
for range ticker.C {
p.checkAllServers()
}
}
// checkAllServers performs health checks on all servers
func (p *BlossomPool) checkAllServers() {
var wg sync.WaitGroup
for i := range p.servers {
wg.Add(1)
go func(idx int) {
defer wg.Done()
p.checkServerHealth(idx)
}(i)
}
wg.Wait()
}
// checkServerHealth checks if a specific server is healthy
func (p *BlossomPool) checkServerHealth(idx int) {
server := &p.servers[idx]
ctx, cancel := context.WithTimeout(context.Background(), p.config.HealthCheckTimeout)
defer cancel()
// Simple health check - try to get server info
req, err := http.NewRequestWithContext(ctx, "GET", server.serverURL+"/health", nil)
if err != nil {
p.markServerUnhealthy(idx, err)
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
p.markServerUnhealthy(idx, err)
return
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
p.markServerHealthy(idx)
} else {
p.markServerUnhealthy(idx, fmt.Errorf("health check returned status %d", resp.StatusCode))
}
}
// markServerHealthy marks a server as healthy
func (p *BlossomPool) markServerHealthy(idx int) {
server := &p.servers[idx]
server.mutex.Lock()
defer server.mutex.Unlock()
if !server.healthy {
log.Printf("Blossom server %s is now healthy", server.serverURL)
}
server.healthy = true
server.failures = 0
server.lastCheck = time.Now()
}
// markServerUnhealthy marks a server as unhealthy
func (p *BlossomPool) markServerUnhealthy(idx int, err error) {
server := &p.servers[idx]
server.mutex.Lock()
defer server.mutex.Unlock()
server.failures++
server.lastCheck = time.Now()
if server.failures >= p.config.MaxFailures {
if server.healthy {
log.Printf("Blossom server %s marked unhealthy after %d failures: %v",
server.serverURL, server.failures, err)
}
server.healthy = false
}
}
// GetHealthyServerCount returns the number of healthy servers
func (p *BlossomPool) GetHealthyServerCount() int {
p.healthMutex.RLock()
defer p.healthMutex.RUnlock()
count := 0
for i := range p.servers {
p.servers[i].mutex.RLock()
if p.servers[i].healthy {
count++
}
p.servers[i].mutex.RUnlock()
}
return count
}
// GetServerStatus returns status of all servers
func (p *BlossomPool) GetServerStatus() []map[string]interface{} {
p.healthMutex.RLock()
defer p.healthMutex.RUnlock()
status := make([]map[string]interface{}, len(p.servers))
for i, server := range p.servers {
server.mutex.RLock()
status[i] = map[string]interface{}{
"url": server.serverURL,
"healthy": server.healthy,
"failures": server.failures,
"last_check": server.lastCheck,
}
server.mutex.RUnlock()
}
return status
}