Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / Build Docker Images (push) Blocked by required conditions
CI Pipeline / E2E Tests (push) Blocked by required conditions
229 lines
5.4 KiB
Go
229 lines
5.4 KiB
Go
package blossom
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// BlossomPool manages a pool of Blossom server connections with load balancing
|
|
type BlossomPool struct {
|
|
servers []PooledClient
|
|
healthMutex sync.RWMutex
|
|
roundRobin uint64
|
|
config *PoolConfig
|
|
}
|
|
|
|
// PooledClient wraps a Blossom client with health status
|
|
type PooledClient struct {
|
|
client *Client
|
|
serverURL string
|
|
healthy bool
|
|
lastCheck time.Time
|
|
failures int
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// PoolConfig configures the connection pool
|
|
type PoolConfig struct {
|
|
HealthCheckInterval time.Duration
|
|
HealthCheckTimeout time.Duration
|
|
MaxFailures int
|
|
RetryDelay time.Duration
|
|
LoadBalanceMethod string // "round_robin", "least_connections", "health_weighted"
|
|
}
|
|
|
|
// NewBlossomPool creates a new connection pool for Blossom servers
|
|
func NewBlossomPool(serverURLs []string, config *PoolConfig) (*BlossomPool, error) {
|
|
if len(serverURLs) == 0 {
|
|
return nil, fmt.Errorf("no Blossom servers provided")
|
|
}
|
|
|
|
if config == nil {
|
|
config = &PoolConfig{
|
|
HealthCheckInterval: 30 * time.Second,
|
|
HealthCheckTimeout: 5 * time.Second,
|
|
MaxFailures: 3,
|
|
RetryDelay: 10 * time.Second,
|
|
LoadBalanceMethod: "round_robin",
|
|
}
|
|
}
|
|
|
|
pool := &BlossomPool{
|
|
servers: make([]PooledClient, len(serverURLs)),
|
|
config: config,
|
|
}
|
|
|
|
// Initialize clients
|
|
for i, serverURL := range serverURLs {
|
|
client := NewClient(serverURL)
|
|
pool.servers[i] = PooledClient{
|
|
client: client,
|
|
serverURL: serverURL,
|
|
healthy: true, // Assume healthy initially
|
|
lastCheck: time.Now(),
|
|
}
|
|
}
|
|
|
|
// Start health check routine
|
|
go pool.healthCheckRoutine()
|
|
|
|
return pool, nil
|
|
}
|
|
|
|
// GetClient returns a healthy client using load balancing
|
|
func (p *BlossomPool) GetClient() *Client {
|
|
p.healthMutex.RLock()
|
|
defer p.healthMutex.RUnlock()
|
|
|
|
// Get healthy servers
|
|
var healthyServers []int
|
|
for i := range p.servers {
|
|
p.servers[i].mutex.RLock()
|
|
if p.servers[i].healthy {
|
|
healthyServers = append(healthyServers, i)
|
|
}
|
|
p.servers[i].mutex.RUnlock()
|
|
}
|
|
|
|
if len(healthyServers) == 0 {
|
|
log.Printf("Warning: No healthy Blossom servers available, using first server")
|
|
return p.servers[0].client
|
|
}
|
|
|
|
// Load balance among healthy servers
|
|
switch p.config.LoadBalanceMethod {
|
|
case "round_robin":
|
|
idx := atomic.AddUint64(&p.roundRobin, 1) % uint64(len(healthyServers))
|
|
return p.servers[healthyServers[idx]].client
|
|
default:
|
|
// Default to round robin
|
|
idx := atomic.AddUint64(&p.roundRobin, 1) % uint64(len(healthyServers))
|
|
return p.servers[healthyServers[idx]].client
|
|
}
|
|
}
|
|
|
|
// healthCheckRoutine periodically checks server health
|
|
func (p *BlossomPool) healthCheckRoutine() {
|
|
ticker := time.NewTicker(p.config.HealthCheckInterval)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
p.checkAllServers()
|
|
}
|
|
}
|
|
|
|
// checkAllServers performs health checks on all servers
|
|
func (p *BlossomPool) checkAllServers() {
|
|
var wg sync.WaitGroup
|
|
|
|
for i := range p.servers {
|
|
wg.Add(1)
|
|
go func(idx int) {
|
|
defer wg.Done()
|
|
p.checkServerHealth(idx)
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// checkServerHealth checks if a specific server is healthy
|
|
func (p *BlossomPool) checkServerHealth(idx int) {
|
|
server := &p.servers[idx]
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), p.config.HealthCheckTimeout)
|
|
defer cancel()
|
|
|
|
// Simple health check - try to get server info
|
|
req, err := http.NewRequestWithContext(ctx, "GET", server.serverURL+"/health", nil)
|
|
if err != nil {
|
|
p.markServerUnhealthy(idx, err)
|
|
return
|
|
}
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
p.markServerUnhealthy(idx, err)
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode == http.StatusOK {
|
|
p.markServerHealthy(idx)
|
|
} else {
|
|
p.markServerUnhealthy(idx, fmt.Errorf("health check returned status %d", resp.StatusCode))
|
|
}
|
|
}
|
|
|
|
// markServerHealthy marks a server as healthy
|
|
func (p *BlossomPool) markServerHealthy(idx int) {
|
|
server := &p.servers[idx]
|
|
server.mutex.Lock()
|
|
defer server.mutex.Unlock()
|
|
|
|
if !server.healthy {
|
|
log.Printf("Blossom server %s is now healthy", server.serverURL)
|
|
}
|
|
|
|
server.healthy = true
|
|
server.failures = 0
|
|
server.lastCheck = time.Now()
|
|
}
|
|
|
|
// markServerUnhealthy marks a server as unhealthy
|
|
func (p *BlossomPool) markServerUnhealthy(idx int, err error) {
|
|
server := &p.servers[idx]
|
|
server.mutex.Lock()
|
|
defer server.mutex.Unlock()
|
|
|
|
server.failures++
|
|
server.lastCheck = time.Now()
|
|
|
|
if server.failures >= p.config.MaxFailures {
|
|
if server.healthy {
|
|
log.Printf("Blossom server %s marked unhealthy after %d failures: %v",
|
|
server.serverURL, server.failures, err)
|
|
}
|
|
server.healthy = false
|
|
}
|
|
}
|
|
|
|
// GetHealthyServerCount returns the number of healthy servers
|
|
func (p *BlossomPool) GetHealthyServerCount() int {
|
|
p.healthMutex.RLock()
|
|
defer p.healthMutex.RUnlock()
|
|
|
|
count := 0
|
|
for i := range p.servers {
|
|
p.servers[i].mutex.RLock()
|
|
if p.servers[i].healthy {
|
|
count++
|
|
}
|
|
p.servers[i].mutex.RUnlock()
|
|
}
|
|
return count
|
|
}
|
|
|
|
// GetServerStatus returns status of all servers
|
|
func (p *BlossomPool) GetServerStatus() []map[string]interface{} {
|
|
p.healthMutex.RLock()
|
|
defer p.healthMutex.RUnlock()
|
|
|
|
status := make([]map[string]interface{}, len(p.servers))
|
|
for i, server := range p.servers {
|
|
server.mutex.RLock()
|
|
status[i] = map[string]interface{}{
|
|
"url": server.serverURL,
|
|
"healthy": server.healthy,
|
|
"failures": server.failures,
|
|
"last_check": server.lastCheck,
|
|
}
|
|
server.mutex.RUnlock()
|
|
}
|
|
return status
|
|
} |