Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / Build Docker Images (push) Blocked by required conditions
CI Pipeline / E2E Tests (push) Blocked by required conditions
226 lines
6.0 KiB
Go
226 lines
6.0 KiB
Go
package transcoding
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"log"
|
|
"path/filepath"
|
|
)
|
|
|
|
// Manager coordinates transcoding with the existing storage system
|
|
type Manager struct {
|
|
transcoder *Transcoder
|
|
db *sql.DB
|
|
enabled bool
|
|
}
|
|
|
|
// NewManager creates a new transcoding manager
|
|
func NewManager(transcoder *Transcoder, db *sql.DB) *Manager {
|
|
return &Manager{
|
|
transcoder: transcoder,
|
|
db: db,
|
|
enabled: transcoder != nil && transcoder.IsEnabled(),
|
|
}
|
|
}
|
|
|
|
// QueueVideoForTranscoding adds a video file to the transcoding queue
|
|
func (tm *Manager) QueueVideoForTranscoding(fileHash, fileName, filePath string, fileSize int64) {
|
|
if !tm.enabled {
|
|
return
|
|
}
|
|
|
|
// Check if already transcoded
|
|
if tm.HasTranscodedVersion(fileHash) {
|
|
log.Printf("File %s already has transcoded version, skipping", fileHash)
|
|
return
|
|
}
|
|
|
|
// Check if file needs transcoding
|
|
needsTranscoding, err := tm.transcoder.NeedsTranscoding(filePath)
|
|
if err != nil {
|
|
log.Printf("Error checking if %s needs transcoding: %v", fileName, err)
|
|
// Continue anyway - better to try and fail than skip
|
|
}
|
|
|
|
if !needsTranscoding {
|
|
log.Printf("File %s doesn't need transcoding (already web-compatible)", fileName)
|
|
tm.markAsWebCompatible(fileHash)
|
|
return
|
|
}
|
|
|
|
// Create transcoding job
|
|
job := Job{
|
|
ID: fmt.Sprintf("transcode_%s", fileHash),
|
|
InputPath: filePath,
|
|
OutputDir: filepath.Join(tm.transcoder.workDir, fileHash),
|
|
FileHash: fileHash,
|
|
Qualities: DefaultQualities,
|
|
Priority: tm.calculatePriority(fileSize),
|
|
Callback: func(err error) {
|
|
if err != nil {
|
|
log.Printf("Transcoding failed for %s: %v", fileName, err)
|
|
tm.markTranscodingFailed(fileHash, err.Error())
|
|
} else {
|
|
log.Printf("Transcoding completed successfully for %s", fileName)
|
|
tm.markTranscodingCompleted(fileHash)
|
|
}
|
|
},
|
|
}
|
|
|
|
log.Printf("Queuing %s for transcoding (size: %.2f MB)", fileName, float64(fileSize)/1024/1024)
|
|
tm.transcoder.SubmitJob(job)
|
|
tm.markTranscodingQueued(fileHash)
|
|
}
|
|
|
|
// HasTranscodedVersion checks if a file has a transcoded version available
|
|
func (tm *Manager) HasTranscodedVersion(fileHash string) bool {
|
|
if !tm.enabled {
|
|
return false
|
|
}
|
|
|
|
// Check file system
|
|
transcodedPath := tm.transcoder.GetTranscodedPath(fileHash)
|
|
if transcodedPath != "" {
|
|
return true
|
|
}
|
|
|
|
// Check database record
|
|
var status string
|
|
err := tm.db.QueryRow(`
|
|
SELECT status FROM transcoding_status
|
|
WHERE file_hash = ? AND status IN ('completed', 'web_compatible')
|
|
`, fileHash).Scan(&status)
|
|
|
|
return err == nil && (status == "completed" || status == "web_compatible")
|
|
}
|
|
|
|
// GetTranscodedPath returns the path to transcoded version if available
|
|
func (tm *Manager) GetTranscodedPath(fileHash string) string {
|
|
if !tm.enabled {
|
|
return ""
|
|
}
|
|
|
|
return tm.transcoder.GetTranscodedPath(fileHash)
|
|
}
|
|
|
|
// GetTranscodingStatus returns the current status of transcoding for a file
|
|
func (tm *Manager) GetTranscodingStatus(fileHash string) string {
|
|
if !tm.enabled {
|
|
return "disabled"
|
|
}
|
|
|
|
// First check if job is in progress
|
|
if job, exists := tm.transcoder.GetJobStatus(fmt.Sprintf("transcode_%s", fileHash)); exists {
|
|
return job.Status
|
|
}
|
|
|
|
// Check database
|
|
var status string
|
|
err := tm.db.QueryRow(`
|
|
SELECT status FROM transcoding_status WHERE file_hash = ?
|
|
`, fileHash).Scan(&status)
|
|
|
|
if err != nil {
|
|
return "unknown"
|
|
}
|
|
|
|
return status
|
|
}
|
|
|
|
// GetJobProgress returns the progress percentage and whether the job exists
|
|
func (tm *Manager) GetJobProgress(fileHash string) (float64, bool) {
|
|
if !tm.enabled {
|
|
return 0.0, false
|
|
}
|
|
|
|
jobID := fmt.Sprintf("transcode_%s", fileHash)
|
|
job, exists := tm.transcoder.GetJobStatus(jobID)
|
|
if !exists {
|
|
return 0.0, false
|
|
}
|
|
|
|
return job.Progress, true
|
|
}
|
|
|
|
// calculatePriority determines transcoding priority based on file characteristics
|
|
func (tm *Manager) calculatePriority(fileSize int64) int {
|
|
priority := 5 // Default medium priority
|
|
|
|
if fileSize < 500*1024*1024 { // < 500MB
|
|
priority = 8 // Higher priority for smaller files (faster to transcode)
|
|
}
|
|
|
|
if fileSize > 5*1024*1024*1024 { // > 5GB
|
|
priority = 2 // Lower priority for very large files
|
|
}
|
|
|
|
return priority
|
|
}
|
|
|
|
// markTranscodingQueued records that a file has been queued for transcoding
|
|
func (tm *Manager) markTranscodingQueued(fileHash string) {
|
|
tm.updateTranscodingStatus(fileHash, "queued")
|
|
}
|
|
|
|
// markTranscodingCompleted records that transcoding completed successfully
|
|
func (tm *Manager) markTranscodingCompleted(fileHash string) {
|
|
tm.updateTranscodingStatus(fileHash, "completed")
|
|
}
|
|
|
|
// markTranscodingFailed records that transcoding failed
|
|
func (tm *Manager) markTranscodingFailed(fileHash string, errorMsg string) {
|
|
_, err := tm.db.Exec(`
|
|
INSERT OR REPLACE INTO transcoding_status
|
|
(file_hash, status, error_message, updated_at)
|
|
VALUES (?, ?, ?, datetime('now'))
|
|
`, fileHash, "failed", errorMsg)
|
|
|
|
if err != nil {
|
|
log.Printf("Warning: Failed to update transcoding status for %s: %v", fileHash, err)
|
|
}
|
|
}
|
|
|
|
// markAsWebCompatible records that a file doesn't need transcoding
|
|
func (tm *Manager) markAsWebCompatible(fileHash string) {
|
|
tm.updateTranscodingStatus(fileHash, "web_compatible")
|
|
}
|
|
|
|
// updateTranscodingStatus updates the transcoding status in database
|
|
func (tm *Manager) updateTranscodingStatus(fileHash, status string) {
|
|
if tm.db == nil {
|
|
return
|
|
}
|
|
|
|
_, err := tm.db.Exec(`
|
|
INSERT OR REPLACE INTO transcoding_status
|
|
(file_hash, status, updated_at)
|
|
VALUES (?, ?, datetime('now'))
|
|
`, fileHash, status)
|
|
|
|
if err != nil {
|
|
log.Printf("Warning: Failed to update transcoding status for %s: %v", fileHash, err)
|
|
}
|
|
}
|
|
|
|
// InitializeDatabase creates the transcoding status table if it doesn't exist
|
|
func (tm *Manager) InitializeDatabase() error {
|
|
if tm.db == nil {
|
|
return fmt.Errorf("no database connection")
|
|
}
|
|
|
|
_, err := tm.db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS transcoding_status (
|
|
file_hash TEXT PRIMARY KEY,
|
|
status TEXT NOT NULL,
|
|
error_message TEXT,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
`)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create transcoding_status table: %w", err)
|
|
}
|
|
|
|
return nil
|
|
} |