Fix metadataStore sync issue for transcoded files
Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / E2E Tests (push) Blocked by required conditions

This commit is contained in:
Enki 2025-08-27 20:20:32 -07:00
parent 28e32c33de
commit 2ccfcd0718
2 changed files with 48 additions and 3 deletions

View File

@ -298,6 +298,7 @@ type TranscodingManager interface {
GetTranscodingStatus(fileHash string) string GetTranscodingStatus(fileHash string) string
GetJobProgress(fileHash string) (float64, bool) GetJobProgress(fileHash string) (float64, bool)
InitializeDatabase() error InitializeDatabase() error
SetUpdateMetadataCallback(callback func(fileHash string) error)
// Admin methods // Admin methods
GetAllJobs() map[string]interface{} GetAllJobs() map[string]interface{}
GetFailedJobsCount() (int, error) GetFailedJobsCount() (int, error)
@ -1390,6 +1391,32 @@ func (g *Gateway) storeMetadata(fileHash string, metadata FileMetadata) error {
return nil return nil
} }
// UpdateMetadataStreamingInfo updates the streaming info for an existing file in metadataStore
func (g *Gateway) UpdateMetadataStreamingInfo(fileHash string) error {
// Get existing metadata from store
existingMetadata, exists := metadataStore[fileHash]
if !exists {
log.Printf("DEBUG: File %s not found in metadataStore, skipping streaming info update", fileHash)
return nil
}
// Get fresh metadata with streaming info from database
freshMetadata, err := g.getMetadata(fileHash)
if err != nil {
log.Printf("DEBUG: Failed to get fresh metadata for %s: %v", fileHash, err)
return err
}
// Update the existing metadata with streaming info
if freshMetadata.StreamingInfo != nil {
existingMetadata.StreamingInfo = freshMetadata.StreamingInfo
metadataStore[fileHash] = existingMetadata
log.Printf("DEBUG: Updated metadataStore with streaming info for file %s", fileHash)
}
return nil
}
func (g *Gateway) getMetadata(fileHash string) (*FileMetadata, error) { func (g *Gateway) getMetadata(fileHash string) (*FileMetadata, error) {
// Get metadata from storage backend // Get metadata from storage backend
dbMetadata, err := g.storage.GetFileMetadata(fileHash) dbMetadata, err := g.storage.GetFileMetadata(fileHash)
@ -3145,6 +3172,11 @@ func (g *Gateway) DHTStatsHandler(w http.ResponseWriter, r *http.Request) {
func RegisterRoutes(r *mux.Router, cfg *config.Config, storage *storage.Backend) *Gateway { func RegisterRoutes(r *mux.Router, cfg *config.Config, storage *storage.Backend) *Gateway {
gateway := NewGateway(cfg, storage) gateway := NewGateway(cfg, storage)
// Set up transcoding callback to update metadata store
if gateway.transcodingManager != nil {
gateway.transcodingManager.SetUpdateMetadataCallback(gateway.UpdateMetadataStreamingInfo)
}
// Initialize tracker if enabled // Initialize tracker if enabled
var trackerInstance *tracker.Tracker var trackerInstance *tracker.Tracker
var announceHandler *tracker.AnnounceHandler var announceHandler *tracker.AnnounceHandler

View File

@ -11,9 +11,10 @@ import (
// Manager coordinates transcoding with the existing storage system // Manager coordinates transcoding with the existing storage system
type Manager struct { type Manager struct {
transcoder *Transcoder transcoder *Transcoder
db *sql.DB db *sql.DB
enabled bool enabled bool
updateMetadataCallback func(fileHash string) error
} }
// NewManager creates a new transcoding manager // NewManager creates a new transcoding manager
@ -25,6 +26,11 @@ func NewManager(transcoder *Transcoder, db *sql.DB) *Manager {
} }
} }
// SetUpdateMetadataCallback sets the callback for updating metadata store
func (tm *Manager) SetUpdateMetadataCallback(callback func(fileHash string) error) {
tm.updateMetadataCallback = callback
}
// QueueVideoForTranscoding adds a video file to the transcoding queue // QueueVideoForTranscoding adds a video file to the transcoding queue
func (tm *Manager) QueueVideoForTranscoding(fileHash, fileName, filePath string, fileSize int64) { func (tm *Manager) QueueVideoForTranscoding(fileHash, fileName, filePath string, fileSize int64) {
if !tm.enabled { if !tm.enabled {
@ -65,6 +71,13 @@ func (tm *Manager) QueueVideoForTranscoding(fileHash, fileName, filePath string,
} else { } else {
log.Printf("Transcoding completed successfully for %s", fileName) log.Printf("Transcoding completed successfully for %s", fileName)
tm.markTranscodingCompleted(fileHash) tm.markTranscodingCompleted(fileHash)
// Update metadata store with streaming info
if tm.updateMetadataCallback != nil {
if updateErr := tm.updateMetadataCallback(fileHash); updateErr != nil {
log.Printf("Warning: Failed to update metadata store for %s: %v", fileHash, updateErr)
}
}
} }
}, },
} }