enki b6fb938a02
Some checks failed
CI Pipeline / Run Tests (push) Has been cancelled
CI Pipeline / Lint Code (push) Has been cancelled
CI Pipeline / Security Scan (push) Has been cancelled
CI Pipeline / Build Docker Images (push) Has been cancelled
CI Pipeline / E2E Tests (push) Has been cancelled
player rework, UI updates, streaming fixes
2025-08-25 22:01:13 -07:00

459 lines
12 KiB
Go

package transcoding
import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
// Quality represents a transcoding quality profile
type Quality struct {
Name string `json:"name"`
Width int `json:"width"`
Height int `json:"height"`
Bitrate string `json:"bitrate"`
Preset string `json:"preset"`
}
// Job represents a transcoding job
type Job struct {
ID string `json:"id"`
InputPath string `json:"input_path"`
OutputDir string `json:"output_dir"`
FileHash string `json:"file_hash"`
Qualities []Quality `json:"qualities"`
Priority int `json:"priority"`
Status string `json:"status"` // "queued", "processing", "completed", "failed"
Progress float64 `json:"progress"`
Error string `json:"error,omitempty"`
StartTime time.Time `json:"start_time"`
CompletedAt time.Time `json:"completed_at,omitempty"`
Callback func(error) `json:"-"`
}
// Transcoder handles video transcoding operations
type Transcoder struct {
workDir string
ffmpegPath string
concurrent int
queue chan Job
jobs map[string]*Job // Track job status
jobsMutex sync.RWMutex // Protect jobs map
enabled bool
}
// DefaultQualities provides standard quality profiles
var DefaultQualities = []Quality{
{Name: "1080p", Width: 1920, Height: 1080, Bitrate: "5000k", Preset: "fast"},
{Name: "720p", Width: 1280, Height: 720, Bitrate: "2500k", Preset: "fast"},
{Name: "480p", Width: 854, Height: 480, Bitrate: "1000k", Preset: "fast"},
}
// NewTranscoder creates a new transcoder instance
func NewTranscoder(workDir string, concurrent int, enabled bool) (*Transcoder, error) {
if !enabled {
return &Transcoder{
enabled: false,
jobs: make(map[string]*Job),
}, nil
}
// Verify FFmpeg is available
if _, err := exec.LookPath("ffmpeg"); err != nil {
return nil, fmt.Errorf("ffmpeg not found in PATH: %w", err)
}
// Create work directory if it doesn't exist
if err := os.MkdirAll(workDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create work directory: %w", err)
}
t := &Transcoder{
workDir: workDir,
ffmpegPath: "ffmpeg",
concurrent: concurrent,
queue: make(chan Job, 100),
jobs: make(map[string]*Job),
enabled: true,
}
// Start worker goroutines
for i := 0; i < concurrent; i++ {
go t.worker()
}
return t, nil
}
// IsEnabled returns whether transcoding is enabled
func (t *Transcoder) IsEnabled() bool {
return t.enabled
}
// worker processes jobs from the queue
func (t *Transcoder) worker() {
for job := range t.queue {
t.processJob(job)
}
}
// SubmitJob adds a job to the transcoding queue
func (t *Transcoder) SubmitJob(job Job) {
if !t.enabled {
if job.Callback != nil {
job.Callback(fmt.Errorf("transcoding is disabled"))
}
return
}
job.Status = "queued"
job.StartTime = time.Now()
t.jobsMutex.Lock()
t.jobs[job.ID] = &job
t.jobsMutex.Unlock()
t.queue <- job
}
// GetJobStatus returns the current status of a job
func (t *Transcoder) GetJobStatus(jobID string) (*Job, bool) {
t.jobsMutex.RLock()
job, exists := t.jobs[jobID]
t.jobsMutex.RUnlock()
return job, exists
}
// NeedsTranscoding checks if a file needs transcoding for web compatibility
func (t *Transcoder) NeedsTranscoding(filePath string) (bool, error) {
if !t.enabled {
return false, nil
}
// Use ffprobe to analyze the file
cmd := exec.Command("ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", filePath)
output, err := cmd.Output()
if err != nil {
return true, err // Assume needs transcoding if we can't analyze
}
outputStr := string(output)
// Check if already in web-friendly format
hasH264 := strings.Contains(outputStr, "\"codec_name\": \"h264\"")
hasAAC := strings.Contains(outputStr, "\"codec_name\": \"aac\"")
isMP4 := strings.HasSuffix(strings.ToLower(filePath), ".mp4")
// If it's H.264/AAC in MP4 container, probably doesn't need transcoding
if hasH264 && hasAAC && isMP4 {
return false, nil
}
return true, nil
}
// CreateStreamingVersion creates a single web-compatible MP4 version (backward compatibility)
func (t *Transcoder) CreateStreamingVersion(inputPath, outputPath string) error {
if !t.enabled {
return fmt.Errorf("transcoding is disabled")
}
// Ensure output directory exists
if err := os.MkdirAll(filepath.Dir(outputPath), 0755); err != nil {
return fmt.Errorf("failed to create output directory: %w", err)
}
args := []string{
"-i", inputPath,
"-c:v", "libx264", // H.264 video codec
"-c:a", "aac", // AAC audio codec
"-preset", "fast", // Balance speed/quality
"-crf", "23", // Quality (23 is default, lower = better)
"-maxrate", "2M", // Max bitrate for streaming
"-bufsize", "4M", // Buffer size
"-movflags", "+faststart", // Enable fast start for web
"-y", // Overwrite output file
outputPath,
}
cmd := exec.Command(t.ffmpegPath, args...)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("transcoding failed: %w\nFFmpeg output: %s", err, string(output))
}
return nil
}
// CreateMultiQualityVersions creates multiple quality versions for adaptive streaming
func (t *Transcoder) CreateMultiQualityVersions(inputPath, outputDir string, qualities []Quality) error {
if !t.enabled {
return fmt.Errorf("transcoding is disabled")
}
// Ensure output directory exists
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("failed to create output directory: %w", err)
}
// Get input video info first to determine which qualities to generate
inputInfo, err := t.getVideoInfo(inputPath)
if err != nil {
return fmt.Errorf("failed to get input video info: %w", err)
}
// Filter qualities based on input resolution (don't upscale)
availableQualities := t.filterQualities(qualities, inputInfo.Height)
// Generate each quality version
for _, quality := range availableQualities {
outputPath := filepath.Join(outputDir, fmt.Sprintf("%s.mp4", quality.Name))
args := []string{
"-i", inputPath,
"-c:v", "libx264",
"-c:a", "aac",
"-preset", quality.Preset,
"-b:v", quality.Bitrate,
"-maxrate", quality.Bitrate,
"-bufsize", fmt.Sprintf("%dk", parseInt(quality.Bitrate)*2), // 2x bitrate for buffer
"-vf", fmt.Sprintf("scale=%d:%d", quality.Width, quality.Height),
"-movflags", "+faststart",
"-y",
outputPath,
}
cmd := exec.Command(t.ffmpegPath, args...)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to create %s quality: %w\nFFmpeg output: %s", quality.Name, err, string(output))
}
}
// Always create a "stream.mp4" version (highest available quality) for backward compatibility
if len(availableQualities) > 0 {
bestQuality := availableQualities[0] // Qualities should be ordered best to worst
srcPath := filepath.Join(outputDir, fmt.Sprintf("%s.mp4", bestQuality.Name))
dstPath := filepath.Join(outputDir, "stream.mp4")
// Copy the best quality as stream.mp4
if err := t.copyFile(srcPath, dstPath); err != nil {
return fmt.Errorf("failed to create stream.mp4: %w", err)
}
}
return nil
}
// getVideoInfo extracts basic video information
func (t *Transcoder) getVideoInfo(inputPath string) (*VideoInfo, error) {
cmd := exec.Command("ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", inputPath)
output, err := cmd.Output()
if err != nil {
return nil, err
}
// Parse JSON to get video dimensions
// This is a simplified version - you might want to use a proper JSON parser
outputStr := string(output)
// Extract height from JSON (simplified)
height := 1080 // Default fallback
if strings.Contains(outputStr, "\"height\":") {
// Simple regex would be better here, but keeping it simple
if strings.Contains(outputStr, "\"height\": 720") || strings.Contains(outputStr, "\"height\":720") {
height = 720
} else if strings.Contains(outputStr, "\"height\": 480") || strings.Contains(outputStr, "\"height\":480") {
height = 480
} else if strings.Contains(outputStr, "\"height\": 1080") || strings.Contains(outputStr, "\"height\":1080") {
height = 1080
} else if strings.Contains(outputStr, "\"height\": 2160") || strings.Contains(outputStr, "\"height\":2160") {
height = 2160
}
}
return &VideoInfo{Height: height}, nil
}
type VideoInfo struct {
Height int
}
// filterQualities removes qualities that would upscale the video
func (t *Transcoder) filterQualities(qualities []Quality, inputHeight int) []Quality {
var filtered []Quality
for _, quality := range qualities {
if quality.Height <= inputHeight {
filtered = append(filtered, quality)
}
}
// If no qualities fit (very low resolution input), at least include the lowest
if len(filtered) == 0 && len(qualities) > 0 {
// Get the lowest quality
lowest := qualities[len(qualities)-1]
filtered = append(filtered, lowest)
}
return filtered
}
// Helper function to parse bitrate string to int
func parseInt(bitrate string) int {
// Remove 'k' suffix and convert to int
if strings.HasSuffix(bitrate, "k") {
if val := strings.TrimSuffix(bitrate, "k"); val != "" {
if i, err := strconv.Atoi(val); err == nil {
return i
}
}
}
return 2000 // Default fallback
}
// copyFile copies a file from src to dst
func (t *Transcoder) copyFile(src, dst string) error {
input, err := os.Open(src)
if err != nil {
return err
}
defer input.Close()
output, err := os.Create(dst)
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(output, input)
return err
}
// processJob handles the actual transcoding work
func (t *Transcoder) processJob(job Job) {
job.Status = "processing"
t.jobsMutex.Lock()
t.jobs[job.ID] = &job
t.jobsMutex.Unlock()
var err error
defer func() {
if err != nil {
job.Status = "failed"
job.Error = err.Error()
} else {
job.Status = "completed"
job.Progress = 100.0
}
job.CompletedAt = time.Now()
t.jobsMutex.Lock()
t.jobs[job.ID] = &job
t.jobsMutex.Unlock()
if job.Callback != nil {
job.Callback(err)
}
}()
// Create output directory
if err = os.MkdirAll(job.OutputDir, 0755); err != nil {
err = fmt.Errorf("failed to create output directory: %w", err)
return
}
// Create multiple quality versions if qualities are specified
if len(job.Qualities) > 0 {
err = t.CreateMultiQualityVersions(job.InputPath, job.OutputDir, job.Qualities)
if err != nil {
err = fmt.Errorf("multi-quality transcoding failed: %w", err)
return
}
} else {
// Fallback to single quality for backward compatibility
outputPath := filepath.Join(job.OutputDir, "stream.mp4")
err = t.CreateStreamingVersion(job.InputPath, outputPath)
if err != nil {
err = fmt.Errorf("transcoding failed: %w", err)
return
}
}
job.Progress = 100.0
}
// GetTranscodedPath returns the path to a transcoded file if it exists
func (t *Transcoder) GetTranscodedPath(fileHash string) string {
if !t.enabled {
return ""
}
streamPath := filepath.Join(t.workDir, fileHash, "stream.mp4")
if _, err := os.Stat(streamPath); err == nil {
return streamPath
}
return ""
}
// GetQualityPath returns the path to a specific quality version
func (t *Transcoder) GetQualityPath(fileHash, quality string) string {
if !t.enabled {
return ""
}
qualityPath := filepath.Join(t.workDir, fileHash, fmt.Sprintf("%s.mp4", quality))
if _, err := os.Stat(qualityPath); err == nil {
return qualityPath
}
return ""
}
// GetAvailableQualities returns a list of available quality versions for a file
func (t *Transcoder) GetAvailableQualities(fileHash string) []Quality {
if !t.enabled {
return nil
}
var availableQualities []Quality
transcodedDir := filepath.Join(t.workDir, fileHash)
// Check which quality files exist
for _, quality := range DefaultQualities {
qualityPath := filepath.Join(transcodedDir, fmt.Sprintf("%s.mp4", quality.Name))
if _, err := os.Stat(qualityPath); err == nil {
availableQualities = append(availableQualities, quality)
}
}
return availableQualities
}
// GetAllJobs returns information about all jobs (for admin monitoring)
func (t *Transcoder) GetAllJobs() map[string]*Job {
if !t.enabled {
return make(map[string]*Job)
}
t.jobsMutex.RLock()
defer t.jobsMutex.RUnlock()
// Create a copy of the jobs map
jobs := make(map[string]*Job)
for id, job := range t.jobs {
// Create a copy of the job
jobCopy := *job
jobs[id] = &jobCopy
}
return jobs
}
// Close shuts down the transcoder
func (t *Transcoder) Close() {
if t.enabled && t.queue != nil {
close(t.queue)
}
}