From 2ccfcd0718f79534e2a63d4b625f4b68fca03600 Mon Sep 17 00:00:00 2001 From: enki Date: Wed, 27 Aug 2025 20:20:32 -0700 Subject: [PATCH] Fix metadataStore sync issue for transcoded files --- internal/api/handlers.go | 32 ++++++++++++++++++++++++++++++++ internal/transcoding/manager.go | 19 ++++++++++++++++--- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/internal/api/handlers.go b/internal/api/handlers.go index f6ed790..dd9769d 100644 --- a/internal/api/handlers.go +++ b/internal/api/handlers.go @@ -298,6 +298,7 @@ type TranscodingManager interface { GetTranscodingStatus(fileHash string) string GetJobProgress(fileHash string) (float64, bool) InitializeDatabase() error + SetUpdateMetadataCallback(callback func(fileHash string) error) // Admin methods GetAllJobs() map[string]interface{} GetFailedJobsCount() (int, error) @@ -1390,6 +1391,32 @@ func (g *Gateway) storeMetadata(fileHash string, metadata FileMetadata) error { return nil } +// UpdateMetadataStreamingInfo updates the streaming info for an existing file in metadataStore +func (g *Gateway) UpdateMetadataStreamingInfo(fileHash string) error { + // Get existing metadata from store + existingMetadata, exists := metadataStore[fileHash] + if !exists { + log.Printf("DEBUG: File %s not found in metadataStore, skipping streaming info update", fileHash) + return nil + } + + // Get fresh metadata with streaming info from database + freshMetadata, err := g.getMetadata(fileHash) + if err != nil { + log.Printf("DEBUG: Failed to get fresh metadata for %s: %v", fileHash, err) + return err + } + + // Update the existing metadata with streaming info + if freshMetadata.StreamingInfo != nil { + existingMetadata.StreamingInfo = freshMetadata.StreamingInfo + metadataStore[fileHash] = existingMetadata + log.Printf("DEBUG: Updated metadataStore with streaming info for file %s", fileHash) + } + + return nil +} + func (g *Gateway) getMetadata(fileHash string) (*FileMetadata, error) { // Get metadata from storage backend dbMetadata, err := g.storage.GetFileMetadata(fileHash) @@ -3145,6 +3172,11 @@ func (g *Gateway) DHTStatsHandler(w http.ResponseWriter, r *http.Request) { func RegisterRoutes(r *mux.Router, cfg *config.Config, storage *storage.Backend) *Gateway { gateway := NewGateway(cfg, storage) + // Set up transcoding callback to update metadata store + if gateway.transcodingManager != nil { + gateway.transcodingManager.SetUpdateMetadataCallback(gateway.UpdateMetadataStreamingInfo) + } + // Initialize tracker if enabled var trackerInstance *tracker.Tracker var announceHandler *tracker.AnnounceHandler diff --git a/internal/transcoding/manager.go b/internal/transcoding/manager.go index 8331a38..2090a2a 100644 --- a/internal/transcoding/manager.go +++ b/internal/transcoding/manager.go @@ -11,9 +11,10 @@ import ( // Manager coordinates transcoding with the existing storage system type Manager struct { - transcoder *Transcoder - db *sql.DB - enabled bool + transcoder *Transcoder + db *sql.DB + enabled bool + updateMetadataCallback func(fileHash string) error } // NewManager creates a new transcoding manager @@ -25,6 +26,11 @@ func NewManager(transcoder *Transcoder, db *sql.DB) *Manager { } } +// SetUpdateMetadataCallback sets the callback for updating metadata store +func (tm *Manager) SetUpdateMetadataCallback(callback func(fileHash string) error) { + tm.updateMetadataCallback = callback +} + // QueueVideoForTranscoding adds a video file to the transcoding queue func (tm *Manager) QueueVideoForTranscoding(fileHash, fileName, filePath string, fileSize int64) { if !tm.enabled { @@ -65,6 +71,13 @@ func (tm *Manager) QueueVideoForTranscoding(fileHash, fileName, filePath string, } else { log.Printf("Transcoding completed successfully for %s", fileName) tm.markTranscodingCompleted(fileHash) + + // Update metadata store with streaming info + if tm.updateMetadataCallback != nil { + if updateErr := tm.updateMetadataCallback(fileHash); updateErr != nil { + log.Printf("Warning: Failed to update metadata store for %s: %v", fileHash, updateErr) + } + } } }, }