Some checks failed
CI Pipeline / Run Tests (push) Has been cancelled
CI Pipeline / Lint Code (push) Has been cancelled
CI Pipeline / Security Scan (push) Has been cancelled
CI Pipeline / Build Docker Images (push) Has been cancelled
CI Pipeline / E2E Tests (push) Has been cancelled
459 lines
12 KiB
Go
459 lines
12 KiB
Go
package transcoding
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Quality represents a transcoding quality profile
|
|
type Quality struct {
|
|
Name string `json:"name"`
|
|
Width int `json:"width"`
|
|
Height int `json:"height"`
|
|
Bitrate string `json:"bitrate"`
|
|
Preset string `json:"preset"`
|
|
}
|
|
|
|
// Job represents a transcoding job
|
|
type Job struct {
|
|
ID string `json:"id"`
|
|
InputPath string `json:"input_path"`
|
|
OutputDir string `json:"output_dir"`
|
|
FileHash string `json:"file_hash"`
|
|
Qualities []Quality `json:"qualities"`
|
|
Priority int `json:"priority"`
|
|
Status string `json:"status"` // "queued", "processing", "completed", "failed"
|
|
Progress float64 `json:"progress"`
|
|
Error string `json:"error,omitempty"`
|
|
StartTime time.Time `json:"start_time"`
|
|
CompletedAt time.Time `json:"completed_at,omitempty"`
|
|
Callback func(error) `json:"-"`
|
|
}
|
|
|
|
// Transcoder handles video transcoding operations
|
|
type Transcoder struct {
|
|
workDir string
|
|
ffmpegPath string
|
|
concurrent int
|
|
queue chan Job
|
|
jobs map[string]*Job // Track job status
|
|
jobsMutex sync.RWMutex // Protect jobs map
|
|
enabled bool
|
|
}
|
|
|
|
// DefaultQualities provides standard quality profiles
|
|
var DefaultQualities = []Quality{
|
|
{Name: "1080p", Width: 1920, Height: 1080, Bitrate: "5000k", Preset: "fast"},
|
|
{Name: "720p", Width: 1280, Height: 720, Bitrate: "2500k", Preset: "fast"},
|
|
{Name: "480p", Width: 854, Height: 480, Bitrate: "1000k", Preset: "fast"},
|
|
}
|
|
|
|
// NewTranscoder creates a new transcoder instance
|
|
func NewTranscoder(workDir string, concurrent int, enabled bool) (*Transcoder, error) {
|
|
if !enabled {
|
|
return &Transcoder{
|
|
enabled: false,
|
|
jobs: make(map[string]*Job),
|
|
}, nil
|
|
}
|
|
|
|
// Verify FFmpeg is available
|
|
if _, err := exec.LookPath("ffmpeg"); err != nil {
|
|
return nil, fmt.Errorf("ffmpeg not found in PATH: %w", err)
|
|
}
|
|
|
|
// Create work directory if it doesn't exist
|
|
if err := os.MkdirAll(workDir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create work directory: %w", err)
|
|
}
|
|
|
|
t := &Transcoder{
|
|
workDir: workDir,
|
|
ffmpegPath: "ffmpeg",
|
|
concurrent: concurrent,
|
|
queue: make(chan Job, 100),
|
|
jobs: make(map[string]*Job),
|
|
enabled: true,
|
|
}
|
|
|
|
// Start worker goroutines
|
|
for i := 0; i < concurrent; i++ {
|
|
go t.worker()
|
|
}
|
|
|
|
return t, nil
|
|
}
|
|
|
|
// IsEnabled returns whether transcoding is enabled
|
|
func (t *Transcoder) IsEnabled() bool {
|
|
return t.enabled
|
|
}
|
|
|
|
// worker processes jobs from the queue
|
|
func (t *Transcoder) worker() {
|
|
for job := range t.queue {
|
|
t.processJob(job)
|
|
}
|
|
}
|
|
|
|
// SubmitJob adds a job to the transcoding queue
|
|
func (t *Transcoder) SubmitJob(job Job) {
|
|
if !t.enabled {
|
|
if job.Callback != nil {
|
|
job.Callback(fmt.Errorf("transcoding is disabled"))
|
|
}
|
|
return
|
|
}
|
|
|
|
job.Status = "queued"
|
|
job.StartTime = time.Now()
|
|
t.jobsMutex.Lock()
|
|
t.jobs[job.ID] = &job
|
|
t.jobsMutex.Unlock()
|
|
t.queue <- job
|
|
}
|
|
|
|
// GetJobStatus returns the current status of a job
|
|
func (t *Transcoder) GetJobStatus(jobID string) (*Job, bool) {
|
|
t.jobsMutex.RLock()
|
|
job, exists := t.jobs[jobID]
|
|
t.jobsMutex.RUnlock()
|
|
return job, exists
|
|
}
|
|
|
|
// NeedsTranscoding checks if a file needs transcoding for web compatibility
|
|
func (t *Transcoder) NeedsTranscoding(filePath string) (bool, error) {
|
|
if !t.enabled {
|
|
return false, nil
|
|
}
|
|
|
|
// Use ffprobe to analyze the file
|
|
cmd := exec.Command("ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", filePath)
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
return true, err // Assume needs transcoding if we can't analyze
|
|
}
|
|
|
|
outputStr := string(output)
|
|
|
|
// Check if already in web-friendly format
|
|
hasH264 := strings.Contains(outputStr, "\"codec_name\": \"h264\"")
|
|
hasAAC := strings.Contains(outputStr, "\"codec_name\": \"aac\"")
|
|
isMP4 := strings.HasSuffix(strings.ToLower(filePath), ".mp4")
|
|
|
|
// If it's H.264/AAC in MP4 container, probably doesn't need transcoding
|
|
if hasH264 && hasAAC && isMP4 {
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// CreateStreamingVersion creates a single web-compatible MP4 version (backward compatibility)
|
|
func (t *Transcoder) CreateStreamingVersion(inputPath, outputPath string) error {
|
|
if !t.enabled {
|
|
return fmt.Errorf("transcoding is disabled")
|
|
}
|
|
|
|
// Ensure output directory exists
|
|
if err := os.MkdirAll(filepath.Dir(outputPath), 0755); err != nil {
|
|
return fmt.Errorf("failed to create output directory: %w", err)
|
|
}
|
|
|
|
args := []string{
|
|
"-i", inputPath,
|
|
"-c:v", "libx264", // H.264 video codec
|
|
"-c:a", "aac", // AAC audio codec
|
|
"-preset", "fast", // Balance speed/quality
|
|
"-crf", "23", // Quality (23 is default, lower = better)
|
|
"-maxrate", "2M", // Max bitrate for streaming
|
|
"-bufsize", "4M", // Buffer size
|
|
"-movflags", "+faststart", // Enable fast start for web
|
|
"-y", // Overwrite output file
|
|
outputPath,
|
|
}
|
|
|
|
cmd := exec.Command(t.ffmpegPath, args...)
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
return fmt.Errorf("transcoding failed: %w\nFFmpeg output: %s", err, string(output))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CreateMultiQualityVersions creates multiple quality versions for adaptive streaming
|
|
func (t *Transcoder) CreateMultiQualityVersions(inputPath, outputDir string, qualities []Quality) error {
|
|
if !t.enabled {
|
|
return fmt.Errorf("transcoding is disabled")
|
|
}
|
|
|
|
// Ensure output directory exists
|
|
if err := os.MkdirAll(outputDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create output directory: %w", err)
|
|
}
|
|
|
|
// Get input video info first to determine which qualities to generate
|
|
inputInfo, err := t.getVideoInfo(inputPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get input video info: %w", err)
|
|
}
|
|
|
|
// Filter qualities based on input resolution (don't upscale)
|
|
availableQualities := t.filterQualities(qualities, inputInfo.Height)
|
|
|
|
// Generate each quality version
|
|
for _, quality := range availableQualities {
|
|
outputPath := filepath.Join(outputDir, fmt.Sprintf("%s.mp4", quality.Name))
|
|
|
|
args := []string{
|
|
"-i", inputPath,
|
|
"-c:v", "libx264",
|
|
"-c:a", "aac",
|
|
"-preset", quality.Preset,
|
|
"-b:v", quality.Bitrate,
|
|
"-maxrate", quality.Bitrate,
|
|
"-bufsize", fmt.Sprintf("%dk", parseInt(quality.Bitrate)*2), // 2x bitrate for buffer
|
|
"-vf", fmt.Sprintf("scale=%d:%d", quality.Width, quality.Height),
|
|
"-movflags", "+faststart",
|
|
"-y",
|
|
outputPath,
|
|
}
|
|
|
|
cmd := exec.Command(t.ffmpegPath, args...)
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create %s quality: %w\nFFmpeg output: %s", quality.Name, err, string(output))
|
|
}
|
|
}
|
|
|
|
// Always create a "stream.mp4" version (highest available quality) for backward compatibility
|
|
if len(availableQualities) > 0 {
|
|
bestQuality := availableQualities[0] // Qualities should be ordered best to worst
|
|
srcPath := filepath.Join(outputDir, fmt.Sprintf("%s.mp4", bestQuality.Name))
|
|
dstPath := filepath.Join(outputDir, "stream.mp4")
|
|
|
|
// Copy the best quality as stream.mp4
|
|
if err := t.copyFile(srcPath, dstPath); err != nil {
|
|
return fmt.Errorf("failed to create stream.mp4: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// getVideoInfo extracts basic video information
|
|
func (t *Transcoder) getVideoInfo(inputPath string) (*VideoInfo, error) {
|
|
cmd := exec.Command("ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", inputPath)
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Parse JSON to get video dimensions
|
|
// This is a simplified version - you might want to use a proper JSON parser
|
|
outputStr := string(output)
|
|
|
|
// Extract height from JSON (simplified)
|
|
height := 1080 // Default fallback
|
|
if strings.Contains(outputStr, "\"height\":") {
|
|
// Simple regex would be better here, but keeping it simple
|
|
if strings.Contains(outputStr, "\"height\": 720") || strings.Contains(outputStr, "\"height\":720") {
|
|
height = 720
|
|
} else if strings.Contains(outputStr, "\"height\": 480") || strings.Contains(outputStr, "\"height\":480") {
|
|
height = 480
|
|
} else if strings.Contains(outputStr, "\"height\": 1080") || strings.Contains(outputStr, "\"height\":1080") {
|
|
height = 1080
|
|
} else if strings.Contains(outputStr, "\"height\": 2160") || strings.Contains(outputStr, "\"height\":2160") {
|
|
height = 2160
|
|
}
|
|
}
|
|
|
|
return &VideoInfo{Height: height}, nil
|
|
}
|
|
|
|
type VideoInfo struct {
|
|
Height int
|
|
}
|
|
|
|
// filterQualities removes qualities that would upscale the video
|
|
func (t *Transcoder) filterQualities(qualities []Quality, inputHeight int) []Quality {
|
|
var filtered []Quality
|
|
|
|
for _, quality := range qualities {
|
|
if quality.Height <= inputHeight {
|
|
filtered = append(filtered, quality)
|
|
}
|
|
}
|
|
|
|
// If no qualities fit (very low resolution input), at least include the lowest
|
|
if len(filtered) == 0 && len(qualities) > 0 {
|
|
// Get the lowest quality
|
|
lowest := qualities[len(qualities)-1]
|
|
filtered = append(filtered, lowest)
|
|
}
|
|
|
|
return filtered
|
|
}
|
|
|
|
// Helper function to parse bitrate string to int
|
|
func parseInt(bitrate string) int {
|
|
// Remove 'k' suffix and convert to int
|
|
if strings.HasSuffix(bitrate, "k") {
|
|
if val := strings.TrimSuffix(bitrate, "k"); val != "" {
|
|
if i, err := strconv.Atoi(val); err == nil {
|
|
return i
|
|
}
|
|
}
|
|
}
|
|
return 2000 // Default fallback
|
|
}
|
|
|
|
// copyFile copies a file from src to dst
|
|
func (t *Transcoder) copyFile(src, dst string) error {
|
|
input, err := os.Open(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer input.Close()
|
|
|
|
output, err := os.Create(dst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer output.Close()
|
|
|
|
_, err = io.Copy(output, input)
|
|
return err
|
|
}
|
|
|
|
// processJob handles the actual transcoding work
|
|
func (t *Transcoder) processJob(job Job) {
|
|
job.Status = "processing"
|
|
t.jobsMutex.Lock()
|
|
t.jobs[job.ID] = &job
|
|
t.jobsMutex.Unlock()
|
|
|
|
var err error
|
|
defer func() {
|
|
if err != nil {
|
|
job.Status = "failed"
|
|
job.Error = err.Error()
|
|
} else {
|
|
job.Status = "completed"
|
|
job.Progress = 100.0
|
|
}
|
|
job.CompletedAt = time.Now()
|
|
t.jobsMutex.Lock()
|
|
t.jobs[job.ID] = &job
|
|
t.jobsMutex.Unlock()
|
|
|
|
if job.Callback != nil {
|
|
job.Callback(err)
|
|
}
|
|
}()
|
|
|
|
// Create output directory
|
|
if err = os.MkdirAll(job.OutputDir, 0755); err != nil {
|
|
err = fmt.Errorf("failed to create output directory: %w", err)
|
|
return
|
|
}
|
|
|
|
// Create multiple quality versions if qualities are specified
|
|
if len(job.Qualities) > 0 {
|
|
err = t.CreateMultiQualityVersions(job.InputPath, job.OutputDir, job.Qualities)
|
|
if err != nil {
|
|
err = fmt.Errorf("multi-quality transcoding failed: %w", err)
|
|
return
|
|
}
|
|
} else {
|
|
// Fallback to single quality for backward compatibility
|
|
outputPath := filepath.Join(job.OutputDir, "stream.mp4")
|
|
err = t.CreateStreamingVersion(job.InputPath, outputPath)
|
|
if err != nil {
|
|
err = fmt.Errorf("transcoding failed: %w", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
job.Progress = 100.0
|
|
}
|
|
|
|
// GetTranscodedPath returns the path to a transcoded file if it exists
|
|
func (t *Transcoder) GetTranscodedPath(fileHash string) string {
|
|
if !t.enabled {
|
|
return ""
|
|
}
|
|
|
|
streamPath := filepath.Join(t.workDir, fileHash, "stream.mp4")
|
|
if _, err := os.Stat(streamPath); err == nil {
|
|
return streamPath
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// GetQualityPath returns the path to a specific quality version
|
|
func (t *Transcoder) GetQualityPath(fileHash, quality string) string {
|
|
if !t.enabled {
|
|
return ""
|
|
}
|
|
|
|
qualityPath := filepath.Join(t.workDir, fileHash, fmt.Sprintf("%s.mp4", quality))
|
|
if _, err := os.Stat(qualityPath); err == nil {
|
|
return qualityPath
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// GetAvailableQualities returns a list of available quality versions for a file
|
|
func (t *Transcoder) GetAvailableQualities(fileHash string) []Quality {
|
|
if !t.enabled {
|
|
return nil
|
|
}
|
|
|
|
var availableQualities []Quality
|
|
transcodedDir := filepath.Join(t.workDir, fileHash)
|
|
|
|
// Check which quality files exist
|
|
for _, quality := range DefaultQualities {
|
|
qualityPath := filepath.Join(transcodedDir, fmt.Sprintf("%s.mp4", quality.Name))
|
|
if _, err := os.Stat(qualityPath); err == nil {
|
|
availableQualities = append(availableQualities, quality)
|
|
}
|
|
}
|
|
|
|
return availableQualities
|
|
}
|
|
|
|
// GetAllJobs returns information about all jobs (for admin monitoring)
|
|
func (t *Transcoder) GetAllJobs() map[string]*Job {
|
|
if !t.enabled {
|
|
return make(map[string]*Job)
|
|
}
|
|
|
|
t.jobsMutex.RLock()
|
|
defer t.jobsMutex.RUnlock()
|
|
|
|
// Create a copy of the jobs map
|
|
jobs := make(map[string]*Job)
|
|
for id, job := range t.jobs {
|
|
// Create a copy of the job
|
|
jobCopy := *job
|
|
jobs[id] = &jobCopy
|
|
}
|
|
|
|
return jobs
|
|
}
|
|
|
|
// Close shuts down the transcoder
|
|
func (t *Transcoder) Close() {
|
|
if t.enabled && t.queue != nil {
|
|
close(t.queue)
|
|
}
|
|
} |