Some checks failed
CI Pipeline / Run Tests (push) Has been cancelled
CI Pipeline / Lint Code (push) Has been cancelled
CI Pipeline / Security Scan (push) Has been cancelled
CI Pipeline / Build Docker Images (push) Has been cancelled
CI Pipeline / E2E Tests (push) Has been cancelled
383 lines
9.8 KiB
Go
383 lines
9.8 KiB
Go
package transcoding
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
)
|
|
|
|
// Manager coordinates transcoding with the existing storage system
|
|
type Manager struct {
|
|
transcoder *Transcoder
|
|
db *sql.DB
|
|
enabled bool
|
|
}
|
|
|
|
// NewManager creates a new transcoding manager
|
|
func NewManager(transcoder *Transcoder, db *sql.DB) *Manager {
|
|
return &Manager{
|
|
transcoder: transcoder,
|
|
db: db,
|
|
enabled: transcoder != nil && transcoder.IsEnabled(),
|
|
}
|
|
}
|
|
|
|
// QueueVideoForTranscoding adds a video file to the transcoding queue
|
|
func (tm *Manager) QueueVideoForTranscoding(fileHash, fileName, filePath string, fileSize int64) {
|
|
if !tm.enabled {
|
|
return
|
|
}
|
|
|
|
// Check if already transcoded
|
|
if tm.HasTranscodedVersion(fileHash) {
|
|
log.Printf("File %s already has transcoded version, skipping", fileHash)
|
|
return
|
|
}
|
|
|
|
// Check if file needs transcoding
|
|
needsTranscoding, err := tm.transcoder.NeedsTranscoding(filePath)
|
|
if err != nil {
|
|
log.Printf("Error checking if %s needs transcoding: %v", fileName, err)
|
|
// Continue anyway - better to try and fail than skip
|
|
}
|
|
|
|
if !needsTranscoding {
|
|
log.Printf("File %s doesn't need transcoding (already web-compatible)", fileName)
|
|
tm.markAsWebCompatible(fileHash)
|
|
return
|
|
}
|
|
|
|
// Create transcoding job
|
|
job := Job{
|
|
ID: fmt.Sprintf("transcode_%s", fileHash),
|
|
InputPath: filePath,
|
|
OutputDir: filepath.Join(tm.transcoder.workDir, fileHash),
|
|
FileHash: fileHash,
|
|
Qualities: DefaultQualities,
|
|
Priority: tm.calculatePriority(fileSize),
|
|
Callback: func(err error) {
|
|
if err != nil {
|
|
log.Printf("Transcoding failed for %s: %v", fileName, err)
|
|
tm.markTranscodingFailed(fileHash, err.Error())
|
|
} else {
|
|
log.Printf("Transcoding completed successfully for %s", fileName)
|
|
tm.markTranscodingCompleted(fileHash)
|
|
}
|
|
},
|
|
}
|
|
|
|
log.Printf("Queuing %s for transcoding (size: %.2f MB)", fileName, float64(fileSize)/1024/1024)
|
|
tm.transcoder.SubmitJob(job)
|
|
tm.markTranscodingQueued(fileHash)
|
|
}
|
|
|
|
// HasTranscodedVersion checks if a file has a transcoded version available
|
|
func (tm *Manager) HasTranscodedVersion(fileHash string) bool {
|
|
if !tm.enabled {
|
|
return false
|
|
}
|
|
|
|
// Check file system
|
|
transcodedPath := tm.transcoder.GetTranscodedPath(fileHash)
|
|
if transcodedPath != "" {
|
|
return true
|
|
}
|
|
|
|
// Check database record
|
|
var status string
|
|
err := tm.db.QueryRow(`
|
|
SELECT status FROM transcoding_status
|
|
WHERE file_hash = ? AND status IN ('completed', 'web_compatible')
|
|
`, fileHash).Scan(&status)
|
|
|
|
return err == nil && (status == "completed" || status == "web_compatible")
|
|
}
|
|
|
|
// GetTranscodedPath returns the path to transcoded version if available
|
|
func (tm *Manager) GetTranscodedPath(fileHash string) string {
|
|
if !tm.enabled {
|
|
return ""
|
|
}
|
|
|
|
return tm.transcoder.GetTranscodedPath(fileHash)
|
|
}
|
|
|
|
// GetQualityPath returns the path to a specific quality version
|
|
func (tm *Manager) GetQualityPath(fileHash, quality string) string {
|
|
if !tm.enabled {
|
|
return ""
|
|
}
|
|
|
|
return tm.transcoder.GetQualityPath(fileHash, quality)
|
|
}
|
|
|
|
// GetAvailableQualities returns available quality versions for a file
|
|
func (tm *Manager) GetAvailableQualities(fileHash string) []Quality {
|
|
if !tm.enabled {
|
|
return nil
|
|
}
|
|
|
|
return tm.transcoder.GetAvailableQualities(fileHash)
|
|
}
|
|
|
|
// GetAvailableQualitiesInterface returns available quality versions as interface{} for API compatibility
|
|
func (tm *Manager) GetAvailableQualitiesInterface(fileHash string) []interface{} {
|
|
qualities := tm.GetAvailableQualities(fileHash)
|
|
result := make([]interface{}, len(qualities))
|
|
for i, q := range qualities {
|
|
result[i] = q
|
|
}
|
|
return result
|
|
}
|
|
|
|
// GetTranscodingStatus returns the current status of transcoding for a file
|
|
func (tm *Manager) GetTranscodingStatus(fileHash string) string {
|
|
if !tm.enabled {
|
|
return "disabled"
|
|
}
|
|
|
|
// First check if job is in progress
|
|
if job, exists := tm.transcoder.GetJobStatus(fmt.Sprintf("transcode_%s", fileHash)); exists {
|
|
return job.Status
|
|
}
|
|
|
|
// Check database
|
|
var status string
|
|
err := tm.db.QueryRow(`
|
|
SELECT status FROM transcoding_status WHERE file_hash = ?
|
|
`, fileHash).Scan(&status)
|
|
|
|
if err != nil {
|
|
return "unknown"
|
|
}
|
|
|
|
return status
|
|
}
|
|
|
|
// GetJobProgress returns the progress percentage and whether the job exists
|
|
func (tm *Manager) GetJobProgress(fileHash string) (float64, bool) {
|
|
if !tm.enabled {
|
|
return 0.0, false
|
|
}
|
|
|
|
jobID := fmt.Sprintf("transcode_%s", fileHash)
|
|
job, exists := tm.transcoder.GetJobStatus(jobID)
|
|
if !exists {
|
|
return 0.0, false
|
|
}
|
|
|
|
return job.Progress, true
|
|
}
|
|
|
|
// calculatePriority determines transcoding priority based on file characteristics
|
|
func (tm *Manager) calculatePriority(fileSize int64) int {
|
|
priority := 5 // Default medium priority
|
|
|
|
if fileSize < 500*1024*1024 { // < 500MB
|
|
priority = 8 // Higher priority for smaller files (faster to transcode)
|
|
}
|
|
|
|
if fileSize > 5*1024*1024*1024 { // > 5GB
|
|
priority = 2 // Lower priority for very large files
|
|
}
|
|
|
|
return priority
|
|
}
|
|
|
|
// markTranscodingQueued records that a file has been queued for transcoding
|
|
func (tm *Manager) markTranscodingQueued(fileHash string) {
|
|
tm.updateTranscodingStatus(fileHash, "queued")
|
|
}
|
|
|
|
// markTranscodingCompleted records that transcoding completed successfully
|
|
func (tm *Manager) markTranscodingCompleted(fileHash string) {
|
|
tm.updateTranscodingStatus(fileHash, "completed")
|
|
}
|
|
|
|
// markTranscodingFailed records that transcoding failed
|
|
func (tm *Manager) markTranscodingFailed(fileHash string, errorMsg string) {
|
|
_, err := tm.db.Exec(`
|
|
INSERT OR REPLACE INTO transcoding_status
|
|
(file_hash, status, error_message, updated_at)
|
|
VALUES (?, ?, ?, datetime('now'))
|
|
`, fileHash, "failed", errorMsg)
|
|
|
|
if err != nil {
|
|
log.Printf("Warning: Failed to update transcoding status for %s: %v", fileHash, err)
|
|
}
|
|
}
|
|
|
|
// markAsWebCompatible records that a file doesn't need transcoding
|
|
func (tm *Manager) markAsWebCompatible(fileHash string) {
|
|
tm.updateTranscodingStatus(fileHash, "web_compatible")
|
|
}
|
|
|
|
// updateTranscodingStatus updates the transcoding status in database
|
|
func (tm *Manager) updateTranscodingStatus(fileHash, status string) {
|
|
if tm.db == nil {
|
|
return
|
|
}
|
|
|
|
_, err := tm.db.Exec(`
|
|
INSERT OR REPLACE INTO transcoding_status
|
|
(file_hash, status, updated_at)
|
|
VALUES (?, ?, datetime('now'))
|
|
`, fileHash, status)
|
|
|
|
if err != nil {
|
|
log.Printf("Warning: Failed to update transcoding status for %s: %v", fileHash, err)
|
|
}
|
|
}
|
|
|
|
// InitializeDatabase creates the transcoding status table if it doesn't exist
|
|
func (tm *Manager) InitializeDatabase() error {
|
|
if tm.db == nil {
|
|
return fmt.Errorf("no database connection")
|
|
}
|
|
|
|
_, err := tm.db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS transcoding_status (
|
|
file_hash TEXT PRIMARY KEY,
|
|
status TEXT NOT NULL,
|
|
error_message TEXT,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
`)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create transcoding_status table: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
|
|
// GetFailedJobsCount returns the count of failed transcoding jobs
|
|
func (tm *Manager) GetFailedJobsCount() (int, error) {
|
|
if !tm.enabled {
|
|
return 0, nil
|
|
}
|
|
|
|
var count int
|
|
err := tm.db.QueryRow(`
|
|
SELECT COUNT(*) FROM transcoding_status WHERE status = 'failed'
|
|
`).Scan(&count)
|
|
|
|
return count, err
|
|
}
|
|
|
|
// Admin Methods for monitoring and management
|
|
|
|
// GetAllJobs returns information about all transcoding jobs
|
|
func (tm *Manager) GetAllJobs() map[string]interface{} {
|
|
if !tm.enabled || tm.transcoder == nil {
|
|
return map[string]interface{}{"enabled": false}
|
|
}
|
|
|
|
// Get in-memory jobs from transcoder
|
|
jobs := tm.transcoder.GetAllJobs()
|
|
|
|
return map[string]interface{}{
|
|
"enabled": true,
|
|
"jobs": jobs,
|
|
}
|
|
}
|
|
|
|
// Note: RetryFailedJob is now handled by admin handlers which have access to Gateway reconstruction
|
|
|
|
// ClearFailedJobs removes all failed job records
|
|
func (tm *Manager) ClearFailedJobs() error {
|
|
if !tm.enabled {
|
|
return fmt.Errorf("transcoding is disabled")
|
|
}
|
|
|
|
_, err := tm.db.Exec(`DELETE FROM transcoding_status WHERE status = 'failed'`)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to clear failed jobs: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PauseQueue pauses the transcoding queue
|
|
func (tm *Manager) PauseQueue() error {
|
|
if !tm.enabled || tm.transcoder == nil {
|
|
return fmt.Errorf("transcoding is disabled")
|
|
}
|
|
|
|
// TODO: Implement queue pausing in transcoder
|
|
return fmt.Errorf("pause queue not implemented yet")
|
|
}
|
|
|
|
// ResumeQueue resumes the transcoding queue
|
|
func (tm *Manager) ResumeQueue() error {
|
|
if !tm.enabled || tm.transcoder == nil {
|
|
return fmt.Errorf("transcoding is disabled")
|
|
}
|
|
|
|
// TODO: Implement queue resuming in transcoder
|
|
return fmt.Errorf("resume queue not implemented yet")
|
|
}
|
|
|
|
// GetSystemHealth returns system health information
|
|
func (tm *Manager) GetSystemHealth() map[string]interface{} {
|
|
health := map[string]interface{}{}
|
|
|
|
if !tm.enabled {
|
|
health["enabled"] = false
|
|
health["ffmpeg_status"] = "Disabled"
|
|
return health
|
|
}
|
|
|
|
health["enabled"] = true
|
|
|
|
// Check FFmpeg availability
|
|
_, err := exec.LookPath("ffmpeg")
|
|
if err != nil {
|
|
health["ffmpeg_status"] = "Not Found"
|
|
} else {
|
|
// Try running ffmpeg to check if it works
|
|
cmd := exec.Command("ffmpeg", "-version")
|
|
err := cmd.Run()
|
|
if err != nil {
|
|
health["ffmpeg_status"] = "Error"
|
|
} else {
|
|
health["ffmpeg_status"] = "Available"
|
|
}
|
|
}
|
|
|
|
// Calculate transcoded storage usage
|
|
if tm.transcoder != nil {
|
|
storageGB := tm.calculateTranscodedStorage()
|
|
health["transcoded_storage_gb"] = storageGB
|
|
}
|
|
|
|
return health
|
|
}
|
|
|
|
// calculateTranscodedStorage calculates disk space used by transcoded files
|
|
func (tm *Manager) calculateTranscodedStorage() float64 {
|
|
workDir := tm.transcoder.workDir
|
|
if workDir == "" {
|
|
return 0.0
|
|
}
|
|
|
|
var totalSize int64
|
|
|
|
filepath.Walk(workDir, func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if !info.IsDir() {
|
|
totalSize += info.Size()
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Convert bytes to gigabytes
|
|
return float64(totalSize) / (1024 * 1024 * 1024)
|
|
} |