325 lines
7.0 KiB
Go
325 lines
7.0 KiB
Go
![]() |
// internal/nostr/relay/manager.go
|
||
|
package relay
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/nbd-wtf/go-nostr"
|
||
|
"go.uber.org/zap"
|
||
|
)
|
||
|
|
||
|
// Manager handles connections to Nostr relays
|
||
|
type Manager struct {
|
||
|
relays map[string]*nostr.Relay
|
||
|
readURLs []string
|
||
|
writeURLs []string
|
||
|
mu sync.RWMutex
|
||
|
logger *zap.Logger
|
||
|
}
|
||
|
|
||
|
// NewManager creates a new relay manager
|
||
|
func NewManager(logger *zap.Logger) *Manager {
|
||
|
if logger == nil {
|
||
|
// Create a default logger if none is provided
|
||
|
var err error
|
||
|
logger, err = zap.NewProduction()
|
||
|
if err != nil {
|
||
|
// If we can't create a logger, use a no-op logger
|
||
|
logger = zap.NewNop()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return &Manager{
|
||
|
relays: make(map[string]*nostr.Relay),
|
||
|
readURLs: []string{},
|
||
|
writeURLs: []string{},
|
||
|
logger: logger,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// AddRelay adds a relay to the manager and connects to it
|
||
|
func (m *Manager) AddRelay(url string, read, write bool) error {
|
||
|
m.mu.Lock()
|
||
|
defer m.mu.Unlock()
|
||
|
|
||
|
// Check if we're already connected to this relay
|
||
|
if relay, exists := m.relays[url]; exists {
|
||
|
// Update read/write flags
|
||
|
if read && !isInSlice(url, m.readURLs) {
|
||
|
m.readURLs = append(m.readURLs, url)
|
||
|
} else if !read {
|
||
|
m.readURLs = removeFromSlice(url, m.readURLs)
|
||
|
}
|
||
|
|
||
|
if write && !isInSlice(url, m.writeURLs) {
|
||
|
m.writeURLs = append(m.writeURLs, url)
|
||
|
} else if !write {
|
||
|
m.writeURLs = removeFromSlice(url, m.writeURLs)
|
||
|
}
|
||
|
|
||
|
// If we don't need to read or write to this relay, close the connection
|
||
|
if !read && !write {
|
||
|
relay.Close()
|
||
|
delete(m.relays, url)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Only connect if we need to read or write
|
||
|
if !read && !write {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Connect to the relay
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||
|
defer cancel()
|
||
|
|
||
|
relay, err := nostr.RelayConnect(ctx, url)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to connect to relay %s: %w", url, err)
|
||
|
}
|
||
|
|
||
|
// Store the relay
|
||
|
m.relays[url] = relay
|
||
|
|
||
|
// Update read/write lists
|
||
|
if read {
|
||
|
m.readURLs = append(m.readURLs, url)
|
||
|
}
|
||
|
|
||
|
if write {
|
||
|
m.writeURLs = append(m.writeURLs, url)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// RemoveRelay removes a relay from the manager and closes the connection
|
||
|
func (m *Manager) RemoveRelay(url string) {
|
||
|
m.mu.Lock()
|
||
|
defer m.mu.Unlock()
|
||
|
|
||
|
// Check if we're connected to this relay
|
||
|
if relay, exists := m.relays[url]; exists {
|
||
|
relay.Close()
|
||
|
delete(m.relays, url)
|
||
|
}
|
||
|
|
||
|
// Remove from read/write lists
|
||
|
m.readURLs = removeFromSlice(url, m.readURLs)
|
||
|
m.writeURLs = removeFromSlice(url, m.writeURLs)
|
||
|
}
|
||
|
|
||
|
// PublishEvent publishes an event to all write relays
|
||
|
func (m *Manager) PublishEvent(ctx context.Context, event *nostr.Event) ([]string, error) {
|
||
|
m.mu.RLock()
|
||
|
writeURLs := make([]string, len(m.writeURLs))
|
||
|
copy(writeURLs, m.writeURLs)
|
||
|
m.mu.RUnlock()
|
||
|
|
||
|
if len(writeURLs) == 0 {
|
||
|
return nil, fmt.Errorf("no write relays configured")
|
||
|
}
|
||
|
|
||
|
// Keep track of successful publishes
|
||
|
var successful []string
|
||
|
var wg sync.WaitGroup
|
||
|
var mu sync.Mutex
|
||
|
|
||
|
// Publish to all write relays in parallel
|
||
|
for _, url := range writeURLs {
|
||
|
wg.Add(1)
|
||
|
go func(relayURL string) {
|
||
|
defer wg.Done()
|
||
|
|
||
|
m.mu.RLock()
|
||
|
relay, exists := m.relays[relayURL]
|
||
|
m.mu.RUnlock()
|
||
|
|
||
|
if !exists {
|
||
|
m.logger.Warn("Relay not found in connection pool", zap.String("relay", relayURL))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Create a new context with timeout
|
||
|
publishCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||
|
defer cancel()
|
||
|
|
||
|
// Publish the event
|
||
|
err := relay.Publish(publishCtx, *event)
|
||
|
status := err == nil // Assuming no error means success
|
||
|
if err != nil {
|
||
|
m.logger.Warn("Failed to publish event",
|
||
|
zap.String("relay", relayURL),
|
||
|
zap.Error(err))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Check if the publish was successful
|
||
|
if status {
|
||
|
mu.Lock()
|
||
|
successful = append(successful, relayURL)
|
||
|
mu.Unlock()
|
||
|
|
||
|
m.logger.Info("Event published successfully",
|
||
|
zap.String("relay", relayURL),
|
||
|
zap.String("event_id", event.ID))
|
||
|
} else {
|
||
|
m.logger.Warn("Relay rejected event",
|
||
|
zap.String("relay", relayURL),
|
||
|
zap.String("event_id", event.ID))
|
||
|
}
|
||
|
}(url)
|
||
|
}
|
||
|
|
||
|
// Wait for all publish operations to complete
|
||
|
wg.Wait()
|
||
|
|
||
|
if len(successful) == 0 {
|
||
|
return nil, fmt.Errorf("failed to publish event to any relay")
|
||
|
}
|
||
|
|
||
|
return successful, nil
|
||
|
}
|
||
|
|
||
|
// SubscribeToEvents subscribes to events matching the given filters
|
||
|
func (m *Manager) SubscribeToEvents(ctx context.Context, filters []nostr.Filter) (<-chan *nostr.Event, error) {
|
||
|
m.mu.RLock()
|
||
|
readURLs := make([]string, len(m.readURLs))
|
||
|
copy(readURLs, m.readURLs)
|
||
|
m.mu.RUnlock()
|
||
|
|
||
|
if len(readURLs) == 0 {
|
||
|
return nil, fmt.Errorf("no read relays configured")
|
||
|
}
|
||
|
|
||
|
// Create a channel for events
|
||
|
eventChan := make(chan *nostr.Event)
|
||
|
|
||
|
// Create a new context with timeout for subscriptions
|
||
|
subCtx, cancel := context.WithCancel(ctx)
|
||
|
|
||
|
// Keep track of subscriptions
|
||
|
var subscriptions []*nostr.Subscription
|
||
|
|
||
|
// Subscribe to all read relays
|
||
|
var wg sync.WaitGroup
|
||
|
for _, url := range readURLs {
|
||
|
wg.Add(1)
|
||
|
go func(relayURL string) {
|
||
|
defer wg.Done()
|
||
|
|
||
|
m.mu.RLock()
|
||
|
relay, exists := m.relays[relayURL]
|
||
|
m.mu.RUnlock()
|
||
|
|
||
|
if !exists {
|
||
|
m.logger.Warn("Relay not found in connection pool", zap.String("relay", relayURL))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Subscribe to events
|
||
|
sub, err := relay.Subscribe(subCtx, filters)
|
||
|
if err != nil {
|
||
|
m.logger.Warn("Failed to subscribe to relay",
|
||
|
zap.String("relay", relayURL),
|
||
|
zap.Error(err))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
m.mu.Lock()
|
||
|
subscriptions = append(subscriptions, sub)
|
||
|
m.mu.Unlock()
|
||
|
|
||
|
// Handle events
|
||
|
go func() {
|
||
|
for ev := range sub.Events {
|
||
|
select {
|
||
|
case eventChan <- ev:
|
||
|
// Event sent to caller
|
||
|
case <-subCtx.Done():
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
}(url)
|
||
|
}
|
||
|
|
||
|
// Wait for all subscriptions to be set up
|
||
|
wg.Wait()
|
||
|
|
||
|
// Return a cleanup function
|
||
|
go func() {
|
||
|
<-ctx.Done()
|
||
|
cancel()
|
||
|
close(eventChan)
|
||
|
|
||
|
// Close all subscriptions
|
||
|
for _, sub := range subscriptions {
|
||
|
sub.Unsub()
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
return eventChan, nil
|
||
|
}
|
||
|
|
||
|
// Close closes all relay connections
|
||
|
func (m *Manager) Close() {
|
||
|
m.mu.Lock()
|
||
|
defer m.mu.Unlock()
|
||
|
|
||
|
for url, relay := range m.relays {
|
||
|
relay.Close()
|
||
|
delete(m.relays, url)
|
||
|
}
|
||
|
|
||
|
m.readURLs = []string{}
|
||
|
m.writeURLs = []string{}
|
||
|
}
|
||
|
|
||
|
// GetRelays returns the list of connected relays
|
||
|
func (m *Manager) GetRelays() map[string]struct{ Read, Write bool } {
|
||
|
m.mu.RLock()
|
||
|
defer m.mu.RUnlock()
|
||
|
|
||
|
result := make(map[string]struct{ Read, Write bool })
|
||
|
|
||
|
for url := range m.relays {
|
||
|
result[url] = struct {
|
||
|
Read bool
|
||
|
Write bool
|
||
|
}{
|
||
|
Read: isInSlice(url, m.readURLs),
|
||
|
Write: isInSlice(url, m.writeURLs),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return result
|
||
|
}
|
||
|
|
||
|
// Helper functions
|
||
|
|
||
|
// isInSlice checks if a string is in a slice
|
||
|
func isInSlice(s string, slice []string) bool {
|
||
|
for _, item := range slice {
|
||
|
if item == s {
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
// removeFromSlice removes a string from a slice
|
||
|
func removeFromSlice(s string, slice []string) []string {
|
||
|
var result []string
|
||
|
for _, item := range slice {
|
||
|
if item != s {
|
||
|
result = append(result, item)
|
||
|
}
|
||
|
}
|
||
|
return result
|
||
|
}
|