369 lines
8.4 KiB
Go

// internal/nostr/relay/manager.go
package relay
import (
"context"
"fmt"
"sync"
"time"
"git.sovbit.dev/Enki/nostr-poster/internal/models"
"git.sovbit.dev/Enki/nostr-poster/internal/nostr/events"
"github.com/nbd-wtf/go-nostr"
"go.uber.org/zap"
)
// Manager handles connections to Nostr relays
type Manager struct {
relays map[string]*nostr.Relay
readURLs []string
writeURLs []string
mu sync.RWMutex
logger *zap.Logger
}
func convertTags(tags nostr.Tags) [][]string {
result := make([][]string, len(tags))
for i, tag := range tags {
// Create a new string slice for each tag
tagStrings := make([]string, len(tag))
for j, v := range tag {
tagStrings[j] = v
}
result[i] = tagStrings
}
return result
}
// NewManager creates a new relay manager
func NewManager(logger *zap.Logger) *Manager {
if logger == nil {
// Create a default logger if none is provided
var err error
logger, err = zap.NewProduction()
if err != nil {
// If we can't create a logger, use a no-op logger
logger = zap.NewNop()
}
}
return &Manager{
relays: make(map[string]*nostr.Relay),
readURLs: []string{},
writeURLs: []string{},
logger: logger,
}
}
// AddRelay adds a relay to the manager and connects to it
func (m *Manager) AddRelay(url string, read, write bool) error {
m.mu.Lock()
defer m.mu.Unlock()
// Check if we're already connected to this relay
if relay, exists := m.relays[url]; exists {
// Update read/write flags
if read && !isInSlice(url, m.readURLs) {
m.readURLs = append(m.readURLs, url)
} else if !read {
m.readURLs = removeFromSlice(url, m.readURLs)
}
if write && !isInSlice(url, m.writeURLs) {
m.writeURLs = append(m.writeURLs, url)
} else if !write {
m.writeURLs = removeFromSlice(url, m.writeURLs)
}
// If we don't need to read or write to this relay, close the connection
if !read && !write {
relay.Close()
delete(m.relays, url)
}
return nil
}
// Only connect if we need to read or write
if !read && !write {
return nil
}
// Connect to the relay
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
relay, err := nostr.RelayConnect(ctx, url)
if err != nil {
return fmt.Errorf("failed to connect to relay %s: %w", url, err)
}
// Store the relay
m.relays[url] = relay
// Update read/write lists
if read {
m.readURLs = append(m.readURLs, url)
}
if write {
m.writeURLs = append(m.writeURLs, url)
}
return nil
}
// RemoveRelay removes a relay from the manager and closes the connection
func (m *Manager) RemoveRelay(url string) {
m.mu.Lock()
defer m.mu.Unlock()
// Check if we're connected to this relay
if relay, exists := m.relays[url]; exists {
relay.Close()
delete(m.relays, url)
}
// Remove from read/write lists
m.readURLs = removeFromSlice(url, m.readURLs)
m.writeURLs = removeFromSlice(url, m.writeURLs)
}
// PublishEvent publishes an event to all write relays
func (m *Manager) PublishEvent(ctx context.Context, event *nostr.Event) ([]string, error) {
m.mu.RLock()
writeURLs := make([]string, len(m.writeURLs))
copy(writeURLs, m.writeURLs)
m.mu.RUnlock()
if len(writeURLs) == 0 {
return nil, fmt.Errorf("no write relays configured")
}
// Keep track of successful publishes
var successful []string
var wg sync.WaitGroup
var mu sync.Mutex
// Publish to all write relays in parallel
for _, url := range writeURLs {
wg.Add(1)
go func(relayURL string) {
defer wg.Done()
m.mu.RLock()
relay, exists := m.relays[relayURL]
m.mu.RUnlock()
if !exists {
m.logger.Warn("Relay not found in connection pool", zap.String("relay", relayURL))
return
}
// Create a new context with timeout
publishCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
// Publish the event
err := relay.Publish(publishCtx, *event)
status := err == nil // Assuming no error means success
if err != nil {
m.logger.Warn("Failed to publish event",
zap.String("relay", relayURL),
zap.Error(err))
return
}
// Check if the publish was successful
if status {
mu.Lock()
successful = append(successful, relayURL)
mu.Unlock()
m.logger.Info("Event published successfully",
zap.String("relay", relayURL),
zap.String("event_id", event.ID))
} else {
m.logger.Warn("Relay rejected event",
zap.String("relay", relayURL),
zap.String("event_id", event.ID))
}
}(url)
}
// Wait for all publish operations to complete
wg.Wait()
if len(successful) == 0 {
return nil, fmt.Errorf("failed to publish event to any relay")
}
return successful, nil
}
// PublishEventWithEncoding publishes an event and returns encoded identifiers
func (m *Manager) PublishEventWithEncoding(ctx context.Context, event *nostr.Event) (*models.EventResponse, error) {
// First publish the event using the existing method
publishedRelays, err := m.PublishEvent(ctx, event)
if err != nil {
return nil, err
}
// Now encode the event with NIP-19 identifiers
encodedEvent, err := events.EncodeEvent(event, publishedRelays)
if err != nil {
m.logger.Warn("Failed to encode event with NIP-19",
zap.String("event_id", event.ID),
zap.Error(err))
// Return a basic response if encoding fails
return &models.EventResponse{
ID: event.ID,
Pubkey: event.PubKey,
CreatedAt: int64(event.CreatedAt),
Kind: event.Kind,
Content: event.Content,
Tags: convertTags(event.Tags), // Use the helper function here
Relays: publishedRelays,
}, nil
}
return encodedEvent, nil
}
// SubscribeToEvents subscribes to events matching the given filters
func (m *Manager) SubscribeToEvents(ctx context.Context, filters []nostr.Filter) (<-chan *nostr.Event, error) {
m.mu.RLock()
readURLs := make([]string, len(m.readURLs))
copy(readURLs, m.readURLs)
m.mu.RUnlock()
if len(readURLs) == 0 {
return nil, fmt.Errorf("no read relays configured")
}
// Create a channel for events
eventChan := make(chan *nostr.Event)
// Create a new context with timeout for subscriptions
subCtx, cancel := context.WithCancel(ctx)
// Keep track of subscriptions
var subscriptions []*nostr.Subscription
// Subscribe to all read relays
var wg sync.WaitGroup
for _, url := range readURLs {
wg.Add(1)
go func(relayURL string) {
defer wg.Done()
m.mu.RLock()
relay, exists := m.relays[relayURL]
m.mu.RUnlock()
if !exists {
m.logger.Warn("Relay not found in connection pool", zap.String("relay", relayURL))
return
}
// Subscribe to events
sub, err := relay.Subscribe(subCtx, filters)
if err != nil {
m.logger.Warn("Failed to subscribe to relay",
zap.String("relay", relayURL),
zap.Error(err))
return
}
m.mu.Lock()
subscriptions = append(subscriptions, sub)
m.mu.Unlock()
// Handle events
go func() {
for ev := range sub.Events {
select {
case eventChan <- ev:
// Event sent to caller
case <-subCtx.Done():
return
}
}
}()
}(url)
}
// Wait for all subscriptions to be set up
wg.Wait()
// Return a cleanup function
go func() {
<-ctx.Done()
cancel()
close(eventChan)
// Close all subscriptions
for _, sub := range subscriptions {
sub.Unsub()
}
}()
return eventChan, nil
}
// Close closes all relay connections
func (m *Manager) Close() {
m.mu.Lock()
defer m.mu.Unlock()
for url, relay := range m.relays {
relay.Close()
delete(m.relays, url)
}
m.readURLs = []string{}
m.writeURLs = []string{}
}
// GetRelays returns the list of connected relays
func (m *Manager) GetRelays() map[string]struct{ Read, Write bool } {
m.mu.RLock()
defer m.mu.RUnlock()
result := make(map[string]struct{ Read, Write bool })
for url := range m.relays {
result[url] = struct {
Read bool
Write bool
}{
Read: isInSlice(url, m.readURLs),
Write: isInSlice(url, m.writeURLs),
}
}
return result
}
// Helper functions
// isInSlice checks if a string is in a slice
func isInSlice(s string, slice []string) bool {
for _, item := range slice {
if item == s {
return true
}
}
return false
}
// removeFromSlice removes a string from a slice
func removeFromSlice(s string, slice []string) []string {
var result []string
for _, item := range slice {
if item != s {
result = append(result, item)
}
}
return result
}