package transcoding import ( "database/sql" "fmt" "log" "os" "os/exec" "path/filepath" ) // Manager coordinates transcoding with the existing storage system type Manager struct { transcoder *Transcoder db *sql.DB enabled bool } // NewManager creates a new transcoding manager func NewManager(transcoder *Transcoder, db *sql.DB) *Manager { return &Manager{ transcoder: transcoder, db: db, enabled: transcoder != nil && transcoder.IsEnabled(), } } // QueueVideoForTranscoding adds a video file to the transcoding queue func (tm *Manager) QueueVideoForTranscoding(fileHash, fileName, filePath string, fileSize int64) { if !tm.enabled { return } // Check if already transcoded if tm.HasTranscodedVersion(fileHash) { log.Printf("File %s already has transcoded version, skipping", fileHash) return } // Check if file needs transcoding needsTranscoding, err := tm.transcoder.NeedsTranscoding(filePath) if err != nil { log.Printf("Error checking if %s needs transcoding: %v", fileName, err) // Continue anyway - better to try and fail than skip } if !needsTranscoding { log.Printf("File %s doesn't need transcoding (already web-compatible)", fileName) tm.markAsWebCompatible(fileHash) return } // Create transcoding job job := Job{ ID: fmt.Sprintf("transcode_%s", fileHash), InputPath: filePath, OutputDir: filepath.Join(tm.transcoder.workDir, fileHash), FileHash: fileHash, Qualities: DefaultQualities, Priority: tm.calculatePriority(fileSize), Callback: func(err error) { if err != nil { log.Printf("Transcoding failed for %s: %v", fileName, err) tm.markTranscodingFailed(fileHash, err.Error()) } else { log.Printf("Transcoding completed successfully for %s", fileName) tm.markTranscodingCompleted(fileHash) } }, } log.Printf("Queuing %s for transcoding (size: %.2f MB)", fileName, float64(fileSize)/1024/1024) tm.transcoder.SubmitJob(job) tm.markTranscodingQueued(fileHash) } // HasTranscodedVersion checks if a file has a transcoded version available func (tm *Manager) HasTranscodedVersion(fileHash string) bool { if !tm.enabled { return false } // Check file system transcodedPath := tm.transcoder.GetTranscodedPath(fileHash) if transcodedPath != "" { return true } // Check database record var status string err := tm.db.QueryRow(` SELECT status FROM transcoding_status WHERE file_hash = ? AND status IN ('completed', 'web_compatible') `, fileHash).Scan(&status) return err == nil && (status == "completed" || status == "web_compatible") } // GetTranscodedPath returns the path to transcoded version if available func (tm *Manager) GetTranscodedPath(fileHash string) string { if !tm.enabled { return "" } return tm.transcoder.GetTranscodedPath(fileHash) } // GetQualityPath returns the path to a specific quality version func (tm *Manager) GetQualityPath(fileHash, quality string) string { if !tm.enabled { return "" } return tm.transcoder.GetQualityPath(fileHash, quality) } // GetAvailableQualities returns available quality versions for a file func (tm *Manager) GetAvailableQualities(fileHash string) []Quality { if !tm.enabled { return nil } return tm.transcoder.GetAvailableQualities(fileHash) } // GetAvailableQualitiesInterface returns available quality versions as interface{} for API compatibility func (tm *Manager) GetAvailableQualitiesInterface(fileHash string) []interface{} { qualities := tm.GetAvailableQualities(fileHash) result := make([]interface{}, len(qualities)) for i, q := range qualities { result[i] = q } return result } // GetTranscodingStatus returns the current status of transcoding for a file func (tm *Manager) GetTranscodingStatus(fileHash string) string { if !tm.enabled { return "disabled" } // First check if job is in progress if job, exists := tm.transcoder.GetJobStatus(fmt.Sprintf("transcode_%s", fileHash)); exists { return job.Status } // Check database var status string err := tm.db.QueryRow(` SELECT status FROM transcoding_status WHERE file_hash = ? `, fileHash).Scan(&status) if err != nil { return "unknown" } return status } // GetJobProgress returns the progress percentage and whether the job exists func (tm *Manager) GetJobProgress(fileHash string) (float64, bool) { if !tm.enabled { return 0.0, false } jobID := fmt.Sprintf("transcode_%s", fileHash) job, exists := tm.transcoder.GetJobStatus(jobID) if !exists { return 0.0, false } return job.Progress, true } // calculatePriority determines transcoding priority based on file characteristics func (tm *Manager) calculatePriority(fileSize int64) int { priority := 5 // Default medium priority if fileSize < 500*1024*1024 { // < 500MB priority = 8 // Higher priority for smaller files (faster to transcode) } if fileSize > 5*1024*1024*1024 { // > 5GB priority = 2 // Lower priority for very large files } return priority } // markTranscodingQueued records that a file has been queued for transcoding func (tm *Manager) markTranscodingQueued(fileHash string) { tm.updateTranscodingStatus(fileHash, "queued") } // markTranscodingCompleted records that transcoding completed successfully func (tm *Manager) markTranscodingCompleted(fileHash string) { tm.updateTranscodingStatus(fileHash, "completed") log.Printf("DEBUG: Marked transcoding completed for file %s", fileHash) } // markTranscodingFailed records that transcoding failed func (tm *Manager) markTranscodingFailed(fileHash string, errorMsg string) { _, err := tm.db.Exec(` INSERT OR REPLACE INTO transcoding_status (file_hash, status, error_message, updated_at) VALUES (?, ?, ?, datetime('now')) `, fileHash, "failed", errorMsg) if err != nil { log.Printf("Warning: Failed to update transcoding status for %s: %v", fileHash, err) } } // markAsWebCompatible records that a file doesn't need transcoding func (tm *Manager) markAsWebCompatible(fileHash string) { tm.updateTranscodingStatus(fileHash, "web_compatible") } // updateTranscodingStatus updates the transcoding status in database func (tm *Manager) updateTranscodingStatus(fileHash, status string) { if tm.db == nil { return } _, err := tm.db.Exec(` INSERT OR REPLACE INTO transcoding_status (file_hash, status, updated_at) VALUES (?, ?, datetime('now')) `, fileHash, status) if err != nil { log.Printf("Warning: Failed to update transcoding status for %s: %v", fileHash, err) } } // InitializeDatabase creates the transcoding status table if it doesn't exist func (tm *Manager) InitializeDatabase() error { if tm.db == nil { return fmt.Errorf("no database connection") } _, err := tm.db.Exec(` CREATE TABLE IF NOT EXISTS transcoding_status ( file_hash TEXT PRIMARY KEY, status TEXT NOT NULL, error_message TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) `) if err != nil { return fmt.Errorf("failed to create transcoding_status table: %w", err) } return nil } // GetFailedJobsCount returns the count of failed transcoding jobs func (tm *Manager) GetFailedJobsCount() (int, error) { if !tm.enabled { return 0, nil } var count int err := tm.db.QueryRow(` SELECT COUNT(*) FROM transcoding_status WHERE status = 'failed' `).Scan(&count) return count, err } // Admin Methods for monitoring and management // GetAllJobs returns information about all transcoding jobs func (tm *Manager) GetAllJobs() map[string]interface{} { if !tm.enabled || tm.transcoder == nil { return map[string]interface{}{"enabled": false} } // Get in-memory jobs from transcoder jobs := tm.transcoder.GetAllJobs() return map[string]interface{}{ "enabled": true, "jobs": jobs, } } // Note: RetryFailedJob is now handled by admin handlers which have access to Gateway reconstruction // ClearFailedJobs removes all failed job records func (tm *Manager) ClearFailedJobs() error { if !tm.enabled { return fmt.Errorf("transcoding is disabled") } _, err := tm.db.Exec(`DELETE FROM transcoding_status WHERE status = 'failed'`) if err != nil { return fmt.Errorf("failed to clear failed jobs: %w", err) } return nil } // PauseQueue pauses the transcoding queue func (tm *Manager) PauseQueue() error { if !tm.enabled || tm.transcoder == nil { return fmt.Errorf("transcoding is disabled") } // TODO: Implement queue pausing in transcoder return fmt.Errorf("pause queue not implemented yet") } // ResumeQueue resumes the transcoding queue func (tm *Manager) ResumeQueue() error { if !tm.enabled || tm.transcoder == nil { return fmt.Errorf("transcoding is disabled") } // TODO: Implement queue resuming in transcoder return fmt.Errorf("resume queue not implemented yet") } // GetSystemHealth returns system health information func (tm *Manager) GetSystemHealth() map[string]interface{} { health := map[string]interface{}{} if !tm.enabled { health["enabled"] = false health["ffmpeg_status"] = "Disabled" return health } health["enabled"] = true // Check FFmpeg availability _, err := exec.LookPath("ffmpeg") if err != nil { health["ffmpeg_status"] = "Not Found" } else { // Try running ffmpeg to check if it works cmd := exec.Command("ffmpeg", "-version") err := cmd.Run() if err != nil { health["ffmpeg_status"] = "Error" } else { health["ffmpeg_status"] = "Available" } } // Calculate transcoded storage usage if tm.transcoder != nil { storageGB := tm.calculateTranscodedStorage() health["transcoded_storage_gb"] = storageGB } return health } // calculateTranscodedStorage calculates disk space used by transcoded files func (tm *Manager) calculateTranscodedStorage() float64 { workDir := tm.transcoder.workDir if workDir == "" { return 0.0 } var totalSize int64 filepath.Walk(workDir, func(path string, info os.FileInfo, err error) error { if err != nil { return nil } if !info.IsDir() { totalSize += info.Size() } return nil }) // Convert bytes to gigabytes return float64(totalSize) / (1024 * 1024 * 1024) }