enki bbc7c259b4
Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / E2E Tests (push) Blocked by required conditions
Fix DHT bootstrap by adding configurable public URL
2025-08-27 17:47:08 -07:00

384 lines
9.9 KiB
Go

package transcoding
import (
"database/sql"
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
)
// Manager coordinates transcoding with the existing storage system
type Manager struct {
transcoder *Transcoder
db *sql.DB
enabled bool
}
// NewManager creates a new transcoding manager
func NewManager(transcoder *Transcoder, db *sql.DB) *Manager {
return &Manager{
transcoder: transcoder,
db: db,
enabled: transcoder != nil && transcoder.IsEnabled(),
}
}
// QueueVideoForTranscoding adds a video file to the transcoding queue
func (tm *Manager) QueueVideoForTranscoding(fileHash, fileName, filePath string, fileSize int64) {
if !tm.enabled {
return
}
// Check if already transcoded
if tm.HasTranscodedVersion(fileHash) {
log.Printf("File %s already has transcoded version, skipping", fileHash)
return
}
// Check if file needs transcoding
needsTranscoding, err := tm.transcoder.NeedsTranscoding(filePath)
if err != nil {
log.Printf("Error checking if %s needs transcoding: %v", fileName, err)
// Continue anyway - better to try and fail than skip
}
if !needsTranscoding {
log.Printf("File %s doesn't need transcoding (already web-compatible)", fileName)
tm.markAsWebCompatible(fileHash)
return
}
// Create transcoding job
job := Job{
ID: fmt.Sprintf("transcode_%s", fileHash),
InputPath: filePath,
OutputDir: filepath.Join(tm.transcoder.workDir, fileHash),
FileHash: fileHash,
Qualities: DefaultQualities,
Priority: tm.calculatePriority(fileSize),
Callback: func(err error) {
if err != nil {
log.Printf("Transcoding failed for %s: %v", fileName, err)
tm.markTranscodingFailed(fileHash, err.Error())
} else {
log.Printf("Transcoding completed successfully for %s", fileName)
tm.markTranscodingCompleted(fileHash)
}
},
}
log.Printf("Queuing %s for transcoding (size: %.2f MB)", fileName, float64(fileSize)/1024/1024)
tm.transcoder.SubmitJob(job)
tm.markTranscodingQueued(fileHash)
}
// HasTranscodedVersion checks if a file has a transcoded version available
func (tm *Manager) HasTranscodedVersion(fileHash string) bool {
if !tm.enabled {
return false
}
// Check file system
transcodedPath := tm.transcoder.GetTranscodedPath(fileHash)
if transcodedPath != "" {
return true
}
// Check database record
var status string
err := tm.db.QueryRow(`
SELECT status FROM transcoding_status
WHERE file_hash = ? AND status IN ('completed', 'web_compatible')
`, fileHash).Scan(&status)
return err == nil && (status == "completed" || status == "web_compatible")
}
// GetTranscodedPath returns the path to transcoded version if available
func (tm *Manager) GetTranscodedPath(fileHash string) string {
if !tm.enabled {
return ""
}
return tm.transcoder.GetTranscodedPath(fileHash)
}
// GetQualityPath returns the path to a specific quality version
func (tm *Manager) GetQualityPath(fileHash, quality string) string {
if !tm.enabled {
return ""
}
return tm.transcoder.GetQualityPath(fileHash, quality)
}
// GetAvailableQualities returns available quality versions for a file
func (tm *Manager) GetAvailableQualities(fileHash string) []Quality {
if !tm.enabled {
return nil
}
return tm.transcoder.GetAvailableQualities(fileHash)
}
// GetAvailableQualitiesInterface returns available quality versions as interface{} for API compatibility
func (tm *Manager) GetAvailableQualitiesInterface(fileHash string) []interface{} {
qualities := tm.GetAvailableQualities(fileHash)
result := make([]interface{}, len(qualities))
for i, q := range qualities {
result[i] = q
}
return result
}
// GetTranscodingStatus returns the current status of transcoding for a file
func (tm *Manager) GetTranscodingStatus(fileHash string) string {
if !tm.enabled {
return "disabled"
}
// First check if job is in progress
if job, exists := tm.transcoder.GetJobStatus(fmt.Sprintf("transcode_%s", fileHash)); exists {
return job.Status
}
// Check database
var status string
err := tm.db.QueryRow(`
SELECT status FROM transcoding_status WHERE file_hash = ?
`, fileHash).Scan(&status)
if err != nil {
return "unknown"
}
return status
}
// GetJobProgress returns the progress percentage and whether the job exists
func (tm *Manager) GetJobProgress(fileHash string) (float64, bool) {
if !tm.enabled {
return 0.0, false
}
jobID := fmt.Sprintf("transcode_%s", fileHash)
job, exists := tm.transcoder.GetJobStatus(jobID)
if !exists {
return 0.0, false
}
return job.Progress, true
}
// calculatePriority determines transcoding priority based on file characteristics
func (tm *Manager) calculatePriority(fileSize int64) int {
priority := 5 // Default medium priority
if fileSize < 500*1024*1024 { // < 500MB
priority = 8 // Higher priority for smaller files (faster to transcode)
}
if fileSize > 5*1024*1024*1024 { // > 5GB
priority = 2 // Lower priority for very large files
}
return priority
}
// markTranscodingQueued records that a file has been queued for transcoding
func (tm *Manager) markTranscodingQueued(fileHash string) {
tm.updateTranscodingStatus(fileHash, "queued")
}
// markTranscodingCompleted records that transcoding completed successfully
func (tm *Manager) markTranscodingCompleted(fileHash string) {
tm.updateTranscodingStatus(fileHash, "completed")
log.Printf("DEBUG: Marked transcoding completed for file %s", fileHash)
}
// markTranscodingFailed records that transcoding failed
func (tm *Manager) markTranscodingFailed(fileHash string, errorMsg string) {
_, err := tm.db.Exec(`
INSERT OR REPLACE INTO transcoding_status
(file_hash, status, error_message, updated_at)
VALUES (?, ?, ?, datetime('now'))
`, fileHash, "failed", errorMsg)
if err != nil {
log.Printf("Warning: Failed to update transcoding status for %s: %v", fileHash, err)
}
}
// markAsWebCompatible records that a file doesn't need transcoding
func (tm *Manager) markAsWebCompatible(fileHash string) {
tm.updateTranscodingStatus(fileHash, "web_compatible")
}
// updateTranscodingStatus updates the transcoding status in database
func (tm *Manager) updateTranscodingStatus(fileHash, status string) {
if tm.db == nil {
return
}
_, err := tm.db.Exec(`
INSERT OR REPLACE INTO transcoding_status
(file_hash, status, updated_at)
VALUES (?, ?, datetime('now'))
`, fileHash, status)
if err != nil {
log.Printf("Warning: Failed to update transcoding status for %s: %v", fileHash, err)
}
}
// InitializeDatabase creates the transcoding status table if it doesn't exist
func (tm *Manager) InitializeDatabase() error {
if tm.db == nil {
return fmt.Errorf("no database connection")
}
_, err := tm.db.Exec(`
CREATE TABLE IF NOT EXISTS transcoding_status (
file_hash TEXT PRIMARY KEY,
status TEXT NOT NULL,
error_message TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
`)
if err != nil {
return fmt.Errorf("failed to create transcoding_status table: %w", err)
}
return nil
}
// GetFailedJobsCount returns the count of failed transcoding jobs
func (tm *Manager) GetFailedJobsCount() (int, error) {
if !tm.enabled {
return 0, nil
}
var count int
err := tm.db.QueryRow(`
SELECT COUNT(*) FROM transcoding_status WHERE status = 'failed'
`).Scan(&count)
return count, err
}
// Admin Methods for monitoring and management
// GetAllJobs returns information about all transcoding jobs
func (tm *Manager) GetAllJobs() map[string]interface{} {
if !tm.enabled || tm.transcoder == nil {
return map[string]interface{}{"enabled": false}
}
// Get in-memory jobs from transcoder
jobs := tm.transcoder.GetAllJobs()
return map[string]interface{}{
"enabled": true,
"jobs": jobs,
}
}
// Note: RetryFailedJob is now handled by admin handlers which have access to Gateway reconstruction
// ClearFailedJobs removes all failed job records
func (tm *Manager) ClearFailedJobs() error {
if !tm.enabled {
return fmt.Errorf("transcoding is disabled")
}
_, err := tm.db.Exec(`DELETE FROM transcoding_status WHERE status = 'failed'`)
if err != nil {
return fmt.Errorf("failed to clear failed jobs: %w", err)
}
return nil
}
// PauseQueue pauses the transcoding queue
func (tm *Manager) PauseQueue() error {
if !tm.enabled || tm.transcoder == nil {
return fmt.Errorf("transcoding is disabled")
}
// TODO: Implement queue pausing in transcoder
return fmt.Errorf("pause queue not implemented yet")
}
// ResumeQueue resumes the transcoding queue
func (tm *Manager) ResumeQueue() error {
if !tm.enabled || tm.transcoder == nil {
return fmt.Errorf("transcoding is disabled")
}
// TODO: Implement queue resuming in transcoder
return fmt.Errorf("resume queue not implemented yet")
}
// GetSystemHealth returns system health information
func (tm *Manager) GetSystemHealth() map[string]interface{} {
health := map[string]interface{}{}
if !tm.enabled {
health["enabled"] = false
health["ffmpeg_status"] = "Disabled"
return health
}
health["enabled"] = true
// Check FFmpeg availability
_, err := exec.LookPath("ffmpeg")
if err != nil {
health["ffmpeg_status"] = "Not Found"
} else {
// Try running ffmpeg to check if it works
cmd := exec.Command("ffmpeg", "-version")
err := cmd.Run()
if err != nil {
health["ffmpeg_status"] = "Error"
} else {
health["ffmpeg_status"] = "Available"
}
}
// Calculate transcoded storage usage
if tm.transcoder != nil {
storageGB := tm.calculateTranscodedStorage()
health["transcoded_storage_gb"] = storageGB
}
return health
}
// calculateTranscodedStorage calculates disk space used by transcoded files
func (tm *Manager) calculateTranscodedStorage() float64 {
workDir := tm.transcoder.workDir
if workDir == "" {
return 0.0
}
var totalSize int64
filepath.Walk(workDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil
}
if !info.IsDir() {
totalSize += info.Size()
}
return nil
})
// Convert bytes to gigabytes
return float64(totalSize) / (1024 * 1024 * 1024)
}