Some checks failed
CI Pipeline / Run Tests (push) Has been cancelled
CI Pipeline / Lint Code (push) Has been cancelled
CI Pipeline / Security Scan (push) Has been cancelled
CI Pipeline / Build Docker Images (push) Has been cancelled
CI Pipeline / E2E Tests (push) Has been cancelled
1357 lines
39 KiB
Go
1357 lines
39 KiB
Go
package admin
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.sovbit.dev/enki/torrentGateway/internal/profile"
|
|
"git.sovbit.dev/enki/torrentGateway/internal/storage"
|
|
"github.com/gorilla/mux"
|
|
)
|
|
|
|
// GatewayInterface defines the methods needed from the gateway
|
|
type GatewayInterface interface {
|
|
GetDB() *sql.DB
|
|
GetStorage() *storage.Backend
|
|
CleanupOldFiles(olderThan time.Duration) (map[string]interface{}, error)
|
|
CleanupOrphanedChunks() (map[string]interface{}, error)
|
|
CleanupInactiveUsers(days int) (map[string]interface{}, error)
|
|
ReconstructTorrentFile(fileHash, fileName string) (string, error)
|
|
}
|
|
|
|
// TranscodingManager interface for transcoding operations
|
|
type TranscodingManager interface {
|
|
GetAllJobs() map[string]interface{}
|
|
GetJobProgress(fileHash string) (float64, bool)
|
|
GetTranscodingStatus(fileHash string) string
|
|
QueueVideoForTranscoding(fileHash, fileName, filePath string, fileSize int64)
|
|
GetFailedJobsCount() (int, error)
|
|
ClearFailedJobs() error
|
|
PauseQueue() error
|
|
ResumeQueue() error
|
|
GetSystemHealth() map[string]interface{}
|
|
}
|
|
|
|
// AdminHandlers provides admin-related HTTP handlers
|
|
type AdminHandlers struct {
|
|
adminAuth *AdminAuth
|
|
gateway GatewayInterface
|
|
profileFetcher *profile.ProfileFetcher
|
|
transcodingManager TranscodingManager
|
|
}
|
|
|
|
// NewAdminHandlers creates new admin handlers
|
|
func NewAdminHandlers(adminAuth *AdminAuth, gateway GatewayInterface, transcodingManager TranscodingManager, defaultRelays []string) *AdminHandlers {
|
|
return &AdminHandlers{
|
|
adminAuth: adminAuth,
|
|
gateway: gateway,
|
|
transcodingManager: transcodingManager,
|
|
profileFetcher: profile.NewProfileFetcher(defaultRelays),
|
|
}
|
|
}
|
|
|
|
// AdminStatsResponse represents admin statistics
|
|
type AdminStatsResponse struct {
|
|
TotalFiles int `json:"total_files"`
|
|
TotalUsers int `json:"total_users"`
|
|
TotalStorage int64 `json:"total_storage"`
|
|
BannedUsers int `json:"banned_users"`
|
|
PendingReports int `json:"pending_reports"`
|
|
RecentUploads int `json:"recent_uploads_24h"`
|
|
ErrorRate float64 `json:"error_rate"`
|
|
}
|
|
|
|
// AdminUser represents a user in admin view
|
|
type AdminUser struct {
|
|
Pubkey string `json:"pubkey"`
|
|
DisplayName string `json:"display_name"`
|
|
FileCount int `json:"file_count"`
|
|
StorageUsed int64 `json:"storage_used"`
|
|
LastLogin time.Time `json:"last_login"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
IsBanned bool `json:"is_banned"`
|
|
Profile *profile.ProfileMetadata `json:"profile,omitempty"`
|
|
}
|
|
|
|
// AdminFile represents a file in admin view
|
|
type AdminFile struct {
|
|
Hash string `json:"hash"`
|
|
Name string `json:"name"`
|
|
Size int64 `json:"size"`
|
|
StorageType string `json:"storage_type"`
|
|
AccessLevel string `json:"access_level"`
|
|
OwnerPubkey string `json:"owner_pubkey"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
AccessCount int `json:"access_count"`
|
|
ReportCount int `json:"report_count"`
|
|
OwnerProfile *profile.ProfileMetadata `json:"owner_profile,omitempty"`
|
|
}
|
|
|
|
// AdminStatsHandler returns system statistics for admins
|
|
func (ah *AdminHandlers) AdminStatsHandler(w http.ResponseWriter, r *http.Request) {
|
|
adminPubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Unauthorized",
|
|
})
|
|
return
|
|
}
|
|
|
|
// Get total files
|
|
var totalFiles int
|
|
err = ah.gateway.GetDB().QueryRow("SELECT COUNT(*) FROM files").Scan(&totalFiles)
|
|
if err != nil {
|
|
http.Error(w, "Failed to get file count", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Get total users
|
|
var totalUsers int
|
|
err = ah.gateway.GetDB().QueryRow("SELECT COUNT(*) FROM users").Scan(&totalUsers)
|
|
if err != nil {
|
|
http.Error(w, "Failed to get user count", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Get total storage
|
|
var totalStorage int64
|
|
err = ah.gateway.GetDB().QueryRow("SELECT COALESCE(SUM(size), 0) FROM files").Scan(&totalStorage)
|
|
if err != nil {
|
|
http.Error(w, "Failed to get storage total", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Get banned users count
|
|
var bannedUsers int
|
|
err = ah.gateway.GetDB().QueryRow("SELECT COUNT(*) FROM banned_users").Scan(&bannedUsers)
|
|
if err != nil {
|
|
http.Error(w, "Failed to get banned users count", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Get pending reports
|
|
var pendingReports int
|
|
err = ah.gateway.GetDB().QueryRow("SELECT COUNT(*) FROM content_reports WHERE status = 'pending'").Scan(&pendingReports)
|
|
if err != nil {
|
|
http.Error(w, "Failed to get pending reports count", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Get recent uploads (24h)
|
|
var recentUploads int
|
|
err = ah.gateway.GetDB().QueryRow("SELECT COUNT(*) FROM files WHERE created_at > datetime('now', '-1 day')").Scan(&recentUploads)
|
|
if err != nil {
|
|
http.Error(w, "Failed to get recent uploads count", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Log admin action
|
|
ah.adminAuth.LogAdminAction(adminPubkey, "view_stats", "", "Admin viewed system statistics")
|
|
|
|
response := AdminStatsResponse{
|
|
TotalFiles: totalFiles,
|
|
TotalUsers: totalUsers,
|
|
TotalStorage: totalStorage,
|
|
BannedUsers: bannedUsers,
|
|
PendingReports: pendingReports,
|
|
RecentUploads: recentUploads,
|
|
ErrorRate: 0.0, // TODO: Implement error rate tracking
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// AdminUsersHandler returns list of users for admin management
|
|
func (ah *AdminHandlers) AdminUsersHandler(w http.ResponseWriter, r *http.Request) {
|
|
adminPubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Unauthorized",
|
|
})
|
|
return
|
|
}
|
|
|
|
// Parse query parameters
|
|
limit, _ := strconv.Atoi(r.URL.Query().Get("limit"))
|
|
if limit <= 0 || limit > 100 {
|
|
limit = 50
|
|
}
|
|
offset, _ := strconv.Atoi(r.URL.Query().Get("offset"))
|
|
|
|
query := `
|
|
SELECT u.pubkey, COALESCE(u.display_name, '') as display_name, u.file_count, u.storage_used, u.last_login, u.created_at,
|
|
EXISTS(SELECT 1 FROM banned_users WHERE pubkey = u.pubkey) as is_banned
|
|
FROM users u
|
|
ORDER BY u.created_at DESC
|
|
LIMIT ? OFFSET ?
|
|
`
|
|
|
|
rows, err := ah.gateway.GetDB().Query(query, limit, offset)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Failed to query users",
|
|
})
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
var users []AdminUser
|
|
for rows.Next() {
|
|
var user AdminUser
|
|
err := rows.Scan(&user.Pubkey, &user.DisplayName, &user.FileCount,
|
|
&user.StorageUsed, &user.LastLogin, &user.CreatedAt, &user.IsBanned)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Failed to scan user",
|
|
})
|
|
return
|
|
}
|
|
users = append(users, user)
|
|
}
|
|
|
|
// Skip profile fetching - let frontend handle it asynchronously
|
|
|
|
// Log admin action
|
|
ah.adminAuth.LogAdminAction(adminPubkey, "view_users", "", "Admin viewed user list")
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(users)
|
|
}
|
|
|
|
// AdminFilesHandler returns list of files for admin management
|
|
func (ah *AdminHandlers) AdminFilesHandler(w http.ResponseWriter, r *http.Request) {
|
|
adminPubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Unauthorized",
|
|
})
|
|
return
|
|
}
|
|
|
|
// Parse query parameters
|
|
limit, _ := strconv.Atoi(r.URL.Query().Get("limit"))
|
|
if limit <= 0 || limit > 100 {
|
|
limit = 50
|
|
}
|
|
offset, _ := strconv.Atoi(r.URL.Query().Get("offset"))
|
|
|
|
storageType := r.URL.Query().Get("storage_type")
|
|
accessLevel := r.URL.Query().Get("access_level")
|
|
|
|
// Build query with filters
|
|
query := `
|
|
SELECT f.hash, f.original_name, f.size, f.storage_type, f.access_level,
|
|
COALESCE(f.owner_pubkey, '') as owner_pubkey, f.created_at, f.access_count,
|
|
COALESCE((SELECT COUNT(*) FROM content_reports WHERE file_hash = f.hash), 0) as report_count
|
|
FROM files f
|
|
WHERE 1=1
|
|
`
|
|
args := []interface{}{}
|
|
|
|
if storageType != "" {
|
|
query += " AND f.storage_type = ?"
|
|
args = append(args, storageType)
|
|
}
|
|
if accessLevel != "" {
|
|
query += " AND f.access_level = ?"
|
|
args = append(args, accessLevel)
|
|
}
|
|
|
|
query += " ORDER BY f.created_at DESC LIMIT ? OFFSET ?"
|
|
args = append(args, limit, offset)
|
|
|
|
rows, err := ah.gateway.GetDB().Query(query, args...)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Failed to query files",
|
|
})
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
var files []AdminFile
|
|
for rows.Next() {
|
|
var file AdminFile
|
|
err := rows.Scan(&file.Hash, &file.Name, &file.Size, &file.StorageType,
|
|
&file.AccessLevel, &file.OwnerPubkey, &file.CreatedAt,
|
|
&file.AccessCount, &file.ReportCount)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Failed to scan file",
|
|
})
|
|
return
|
|
}
|
|
files = append(files, file)
|
|
}
|
|
|
|
// Skip profile fetching - let frontend handle it asynchronously
|
|
|
|
// Log admin action
|
|
ah.adminAuth.LogAdminAction(adminPubkey, "view_files", "", "Admin viewed file list")
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(files)
|
|
}
|
|
|
|
// AdminDeleteFileHandler deletes a file with admin privileges
|
|
func (ah *AdminHandlers) AdminDeleteFileHandler(w http.ResponseWriter, r *http.Request) {
|
|
adminPubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Unauthorized",
|
|
})
|
|
return
|
|
}
|
|
|
|
vars := mux.Vars(r)
|
|
fileHash := vars["hash"]
|
|
|
|
if fileHash == "" {
|
|
http.Error(w, "Missing file hash", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Get reason from request body
|
|
var reqBody struct {
|
|
Reason string `json:"reason"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil {
|
|
http.Error(w, "Invalid request body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Get file info before deletion for logging
|
|
metadata, err := ah.gateway.GetStorage().GetFileMetadata(fileHash)
|
|
if err != nil {
|
|
http.Error(w, "File not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// Admin can delete any file
|
|
err = ah.gateway.GetStorage().AdminDeleteFile(fileHash)
|
|
if err != nil {
|
|
http.Error(w, "Failed to delete file", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Log admin action
|
|
reason := reqBody.Reason
|
|
if reason == "" {
|
|
reason = "Admin deletion"
|
|
}
|
|
ah.adminAuth.LogAdminAction(adminPubkey, "delete_file", fileHash,
|
|
fmt.Sprintf("Deleted file '%s' (owner: %s) - %s", metadata.OriginalName, metadata.OwnerPubkey, reason))
|
|
|
|
response := map[string]interface{}{
|
|
"success": true,
|
|
"message": "File deleted successfully",
|
|
"hash": fileHash,
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// BanUserRequest represents a user ban request
|
|
type BanUserRequest struct {
|
|
Reason string `json:"reason"`
|
|
}
|
|
|
|
// AdminBanUserHandler bans a user
|
|
func (ah *AdminHandlers) AdminBanUserHandler(w http.ResponseWriter, r *http.Request) {
|
|
adminPubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Unauthorized",
|
|
})
|
|
return
|
|
}
|
|
|
|
vars := mux.Vars(r)
|
|
userPubkey := vars["pubkey"]
|
|
|
|
if userPubkey == "" {
|
|
http.Error(w, "Missing user pubkey", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
var req BanUserRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
http.Error(w, "Invalid request body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Ban the user
|
|
err = ah.adminAuth.BanUser(userPubkey, adminPubkey, req.Reason)
|
|
if err != nil {
|
|
http.Error(w, fmt.Sprintf("Failed to ban user: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
response := map[string]interface{}{
|
|
"success": true,
|
|
"message": "User banned successfully",
|
|
"pubkey": userPubkey,
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// AdminUnbanUserHandler unbans a user
|
|
func (ah *AdminHandlers) AdminUnbanUserHandler(w http.ResponseWriter, r *http.Request) {
|
|
adminPubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Unauthorized",
|
|
})
|
|
return
|
|
}
|
|
|
|
vars := mux.Vars(r)
|
|
userPubkey := vars["pubkey"]
|
|
|
|
if userPubkey == "" {
|
|
http.Error(w, "Missing user pubkey", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
var req struct {
|
|
Reason string `json:"reason"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
http.Error(w, "Invalid request body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Unban the user
|
|
err = ah.adminAuth.UnbanUser(userPubkey, adminPubkey, req.Reason)
|
|
if err != nil {
|
|
http.Error(w, fmt.Sprintf("Failed to unban user: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
response := map[string]interface{}{
|
|
"success": true,
|
|
"message": "User unbanned successfully",
|
|
"pubkey": userPubkey,
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// AdminReportsHandler returns content reports
|
|
func (ah *AdminHandlers) AdminReportsHandler(w http.ResponseWriter, r *http.Request) {
|
|
adminPubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Unauthorized",
|
|
})
|
|
return
|
|
}
|
|
|
|
// Parse query parameters
|
|
limit, _ := strconv.Atoi(r.URL.Query().Get("limit"))
|
|
if limit <= 0 || limit > 100 {
|
|
limit = 50
|
|
}
|
|
offset, _ := strconv.Atoi(r.URL.Query().Get("offset"))
|
|
status := r.URL.Query().Get("status")
|
|
|
|
query := `
|
|
SELECT cr.id, cr.file_hash, cr.reporter_pubkey, cr.reason, cr.status, cr.created_at,
|
|
f.original_name, f.size, f.owner_pubkey
|
|
FROM content_reports cr
|
|
LEFT JOIN files f ON cr.file_hash = f.hash
|
|
WHERE 1=1
|
|
`
|
|
args := []interface{}{}
|
|
|
|
if status != "" {
|
|
query += " AND cr.status = ?"
|
|
args = append(args, status)
|
|
}
|
|
|
|
query += " ORDER BY cr.created_at DESC LIMIT ? OFFSET ?"
|
|
args = append(args, limit, offset)
|
|
|
|
rows, err := ah.gateway.GetDB().Query(query, args...)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Failed to query reports",
|
|
})
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
var reports []map[string]interface{}
|
|
for rows.Next() {
|
|
var report ContentReport
|
|
var fileName, ownerPubkey sql.NullString
|
|
var fileSize sql.NullInt64
|
|
|
|
err := rows.Scan(&report.ID, &report.FileHash, &report.ReporterPubkey,
|
|
&report.Reason, &report.Status, &report.CreatedAt,
|
|
&fileName, &fileSize, &ownerPubkey)
|
|
if err != nil {
|
|
http.Error(w, "Failed to scan report", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
reportData := map[string]interface{}{
|
|
"id": report.ID,
|
|
"file_hash": report.FileHash,
|
|
"reporter_pubkey": report.ReporterPubkey,
|
|
"reason": report.Reason,
|
|
"status": report.Status,
|
|
"created_at": report.CreatedAt,
|
|
"file_name": fileName.String,
|
|
"file_size": fileSize.Int64,
|
|
"file_owner": ownerPubkey.String,
|
|
}
|
|
reports = append(reports, reportData)
|
|
}
|
|
|
|
// Log admin action
|
|
ah.adminAuth.LogAdminAction(adminPubkey, "view_reports", "", "Admin viewed content reports")
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(reports)
|
|
}
|
|
|
|
// AdminCleanupHandler triggers cleanup operations
|
|
func (ah *AdminHandlers) AdminCleanupHandler(w http.ResponseWriter, r *http.Request) {
|
|
adminPubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": "Unauthorized",
|
|
})
|
|
return
|
|
}
|
|
|
|
var req struct {
|
|
Operation string `json:"operation"`
|
|
MaxAge string `json:"max_age,omitempty"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
http.Error(w, "Invalid request body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
var cleanupResult map[string]interface{}
|
|
var cleanupErr error
|
|
|
|
switch req.Operation {
|
|
case "old_files":
|
|
maxAge := "90d"
|
|
if req.MaxAge != "" {
|
|
maxAge = req.MaxAge
|
|
}
|
|
duration, err := time.ParseDuration(maxAge)
|
|
if err != nil {
|
|
http.Error(w, "Invalid max_age format", http.StatusBadRequest)
|
|
return
|
|
}
|
|
cleanupResult, cleanupErr = ah.gateway.CleanupOldFiles(duration)
|
|
|
|
case "orphaned_chunks":
|
|
cleanupResult, cleanupErr = ah.gateway.CleanupOrphanedChunks()
|
|
|
|
case "inactive_users":
|
|
days := 365
|
|
if req.MaxAge != "" {
|
|
if d, err := strconv.Atoi(req.MaxAge); err == nil {
|
|
days = d
|
|
}
|
|
}
|
|
cleanupResult, cleanupErr = ah.gateway.CleanupInactiveUsers(days)
|
|
|
|
default:
|
|
http.Error(w, "Invalid cleanup operation", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if cleanupErr != nil {
|
|
http.Error(w, fmt.Sprintf("Cleanup failed: %v", cleanupErr), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Log admin action
|
|
ah.adminAuth.LogAdminAction(adminPubkey, "cleanup", req.Operation,
|
|
fmt.Sprintf("Executed cleanup operation: %s", req.Operation))
|
|
|
|
response := map[string]interface{}{
|
|
"success": true,
|
|
"operation": req.Operation,
|
|
"result": cleanupResult,
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// AdminLogsHandler returns admin action logs
|
|
func (ah *AdminHandlers) AdminLogsHandler(w http.ResponseWriter, r *http.Request) {
|
|
_, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
// Parse query parameters
|
|
limit, _ := strconv.Atoi(r.URL.Query().Get("limit"))
|
|
if limit <= 0 || limit > 100 {
|
|
limit = 50
|
|
}
|
|
offset, _ := strconv.Atoi(r.URL.Query().Get("offset"))
|
|
|
|
actions, err := ah.adminAuth.GetAdminActions(limit, offset, "")
|
|
if err != nil {
|
|
http.Error(w, "Failed to get admin actions", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Log admin action (don't log viewing logs to avoid spam)
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(actions)
|
|
}
|
|
|
|
// TranscodingStatsHandler returns transcoding system statistics
|
|
func (ah *AdminHandlers) TranscodingStatsHandler(w http.ResponseWriter, r *http.Request) {
|
|
pubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
_ = pubkey // Use pubkey if needed
|
|
|
|
if ah.transcodingManager == nil {
|
|
http.Error(w, "Transcoding not enabled", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
// Get all jobs and system health
|
|
allJobs := ah.transcodingManager.GetAllJobs()
|
|
systemHealth := ah.transcodingManager.GetSystemHealth()
|
|
|
|
// Calculate stats from database
|
|
db := ah.gateway.GetDB()
|
|
var stats struct {
|
|
QueueLength int `json:"queue_length"`
|
|
ProcessingJobs int `json:"processing_jobs"`
|
|
CompletedToday int `json:"completed_today"`
|
|
FailedJobs int `json:"failed_jobs"`
|
|
SuccessRate float64 `json:"success_rate"`
|
|
AvgProcessingTime string `json:"avg_processing_time"`
|
|
TranscodedStorage string `json:"transcoded_storage"`
|
|
FFmpegStatus string `json:"ffmpeg_status"`
|
|
}
|
|
|
|
// Get active job counts
|
|
var queueCount, processingCount int
|
|
err = db.QueryRow(`
|
|
SELECT
|
|
COUNT(CASE WHEN status = 'queued' THEN 1 END) as queued,
|
|
COUNT(CASE WHEN status = 'processing' THEN 1 END) as processing
|
|
FROM transcoding_status
|
|
WHERE status IN ('queued', 'processing')
|
|
`).Scan(&queueCount, &processingCount)
|
|
|
|
if err != nil {
|
|
queueCount, processingCount = 0, 0
|
|
}
|
|
|
|
stats.QueueLength = queueCount
|
|
stats.ProcessingJobs = processingCount
|
|
|
|
// Get completed today
|
|
err = db.QueryRow(`
|
|
SELECT COUNT(*) FROM transcoding_status
|
|
WHERE status = 'completed' AND DATE(updated_at) = DATE('now')
|
|
`).Scan(&stats.CompletedToday)
|
|
if err != nil {
|
|
stats.CompletedToday = 0
|
|
}
|
|
|
|
// Get failed jobs
|
|
err = db.QueryRow(`
|
|
SELECT COUNT(*) FROM transcoding_status
|
|
WHERE status = 'failed'
|
|
`).Scan(&stats.FailedJobs)
|
|
if err != nil {
|
|
stats.FailedJobs = 0
|
|
}
|
|
|
|
// Calculate success rate (last 30 days)
|
|
var totalJobs, completedJobs int
|
|
err = db.QueryRow(`
|
|
SELECT
|
|
COUNT(*) as total,
|
|
COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed
|
|
FROM transcoding_status
|
|
WHERE updated_at >= DATE('now', '-30 days')
|
|
`).Scan(&totalJobs, &completedJobs)
|
|
|
|
if err == nil && totalJobs > 0 {
|
|
stats.SuccessRate = (float64(completedJobs) / float64(totalJobs)) * 100
|
|
} else {
|
|
stats.SuccessRate = 0
|
|
}
|
|
|
|
// Get average processing time
|
|
var avgSeconds sql.NullFloat64
|
|
err = db.QueryRow(`
|
|
SELECT AVG(
|
|
CASE
|
|
WHEN status = 'completed'
|
|
THEN (julianday(updated_at) - julianday(created_at)) * 86400
|
|
END
|
|
) FROM transcoding_status
|
|
WHERE status = 'completed' AND updated_at >= DATE('now', '-7 days')
|
|
`).Scan(&avgSeconds)
|
|
|
|
if err == nil && avgSeconds.Valid {
|
|
minutes := int(avgSeconds.Float64 / 60)
|
|
stats.AvgProcessingTime = fmt.Sprintf("%d min", minutes)
|
|
} else {
|
|
stats.AvgProcessingTime = "-- min"
|
|
}
|
|
|
|
// Add system health data
|
|
if healthData, ok := systemHealth["ffmpeg_status"]; ok {
|
|
stats.FFmpegStatus = fmt.Sprintf("%v", healthData)
|
|
} else {
|
|
stats.FFmpegStatus = "Unknown"
|
|
}
|
|
|
|
if storageData, ok := systemHealth["transcoded_storage_gb"]; ok {
|
|
stats.TranscodedStorage = fmt.Sprintf("%.1f GB", storageData)
|
|
} else {
|
|
stats.TranscodedStorage = "0 GB"
|
|
}
|
|
|
|
response := map[string]interface{}{
|
|
"stats": stats,
|
|
"active_jobs": allJobs,
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// TranscodingJobsHandler returns detailed job information
|
|
func (ah *AdminHandlers) TranscodingJobsHandler(w http.ResponseWriter, r *http.Request) {
|
|
pubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
_ = pubkey // Use pubkey if needed
|
|
|
|
if ah.transcodingManager == nil {
|
|
http.Error(w, "Transcoding not enabled", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
db := ah.gateway.GetDB()
|
|
filter := r.URL.Query().Get("filter") // all, completed, failed, today
|
|
|
|
var whereClause string
|
|
var args []interface{}
|
|
|
|
switch filter {
|
|
case "completed":
|
|
whereClause = "WHERE status = 'completed'"
|
|
case "failed":
|
|
whereClause = "WHERE status = 'failed'"
|
|
case "today":
|
|
whereClause = "WHERE DATE(updated_at) = DATE('now')"
|
|
default:
|
|
whereClause = "" // all jobs
|
|
}
|
|
|
|
query := fmt.Sprintf(`
|
|
SELECT file_hash, status, error_message, created_at, updated_at
|
|
FROM transcoding_status
|
|
%s
|
|
ORDER BY updated_at DESC
|
|
LIMIT 100
|
|
`, whereClause)
|
|
|
|
rows, err := db.Query(query, args...)
|
|
if err != nil {
|
|
http.Error(w, "Failed to query jobs", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
type JobHistoryEntry struct {
|
|
FileHash string `json:"file_hash"`
|
|
Status string `json:"status"`
|
|
Error string `json:"error,omitempty"`
|
|
CreatedAt string `json:"created_at"`
|
|
UpdatedAt string `json:"updated_at"`
|
|
Duration string `json:"duration,omitempty"`
|
|
Qualities string `json:"qualities,omitempty"`
|
|
}
|
|
|
|
var jobs []JobHistoryEntry
|
|
|
|
for rows.Next() {
|
|
var job JobHistoryEntry
|
|
var errorMsg sql.NullString
|
|
var createdAt, updatedAt string
|
|
|
|
err := rows.Scan(&job.FileHash, &job.Status, &errorMsg, &createdAt, &updatedAt)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
job.CreatedAt = createdAt
|
|
job.UpdatedAt = updatedAt
|
|
if errorMsg.Valid {
|
|
job.Error = errorMsg.String
|
|
}
|
|
|
|
// Calculate duration for completed jobs
|
|
if job.Status == "completed" {
|
|
// Simple duration calculation (would be better with proper time parsing)
|
|
job.Duration = "~5 min" // Placeholder
|
|
}
|
|
|
|
// Get available qualities (if completed)
|
|
if job.Status == "completed" && ah.transcodingManager != nil {
|
|
job.Qualities = "1080p, 720p, 480p" // Placeholder
|
|
}
|
|
|
|
jobs = append(jobs, job)
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(jobs)
|
|
}
|
|
|
|
// RetryFailedJobHandler retries a specific failed transcoding job
|
|
func (ah *AdminHandlers) RetryFailedJobHandler(w http.ResponseWriter, r *http.Request) {
|
|
pubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
if ah.transcodingManager == nil {
|
|
http.Error(w, "Transcoding not enabled", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
vars := mux.Vars(r)
|
|
jobID := vars["jobId"]
|
|
|
|
// Extract file hash from job ID (format: "transcode_{hash}")
|
|
if len(jobID) < 11 || !strings.HasPrefix(jobID, "transcode_") {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"status": "error",
|
|
"error": "Invalid job ID format",
|
|
})
|
|
return
|
|
}
|
|
|
|
fileHash := jobID[10:] // Remove "transcode_" prefix
|
|
|
|
// Get original file metadata to re-queue the job
|
|
var fileName string
|
|
var fileSize int64
|
|
var storageType string
|
|
err = ah.gateway.GetDB().QueryRow(`
|
|
SELECT original_name, size, storage_type FROM files WHERE hash = ?
|
|
`, fileHash).Scan(&fileName, &fileSize, &storageType)
|
|
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"status": "error",
|
|
"error": fmt.Sprintf("Failed to get file metadata: %v", err),
|
|
})
|
|
return
|
|
}
|
|
|
|
var filePath string
|
|
if storageType == "blob" {
|
|
filePath = filepath.Join("data", "blobs", fileHash)
|
|
} else if storageType == "torrent" {
|
|
// Reconstruct torrent file for transcoding
|
|
reconstructedPath, err := ah.gateway.ReconstructTorrentFile(fileHash, fileName)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"status": "error",
|
|
"error": fmt.Sprintf("Failed to reconstruct torrent file: %v", err),
|
|
})
|
|
return
|
|
}
|
|
filePath = reconstructedPath
|
|
} else {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"status": "error",
|
|
"error": fmt.Sprintf("Unsupported storage type: %s", storageType),
|
|
})
|
|
return
|
|
}
|
|
|
|
// Reset status to queued in database
|
|
_, err = ah.gateway.GetDB().Exec(`
|
|
UPDATE transcoding_status
|
|
SET status = 'queued', error_message = NULL, updated_at = datetime('now')
|
|
WHERE file_hash = ? AND status = 'failed'
|
|
`, fileHash)
|
|
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"status": "error",
|
|
"error": fmt.Sprintf("Failed to reset job status: %v", err),
|
|
})
|
|
return
|
|
}
|
|
|
|
// Re-queue the job with the transcoding manager
|
|
ah.transcodingManager.QueueVideoForTranscoding(fileHash, fileName, filePath, fileSize)
|
|
|
|
// Log admin action
|
|
ah.adminAuth.LogAdminAction(pubkey, "retry_transcoding_job", jobID, fmt.Sprintf("Retrying job: %s", jobID))
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]string{"status": "success", "message": "Job queued for retry"})
|
|
}
|
|
|
|
// RetryAllFailedJobsHandler retries all failed transcoding jobs
|
|
func (ah *AdminHandlers) RetryAllFailedJobsHandler(w http.ResponseWriter, r *http.Request) {
|
|
pubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
if ah.transcodingManager == nil {
|
|
http.Error(w, "Transcoding not enabled", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
// Get count of failed jobs before retry
|
|
failedCount, err := ah.transcodingManager.GetFailedJobsCount()
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"status": "error",
|
|
"error": fmt.Sprintf("Failed to get failed job count: %v", err),
|
|
})
|
|
return
|
|
}
|
|
|
|
if failedCount == 0 {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"status": "success",
|
|
"message": "No failed jobs to retry",
|
|
"count": 0,
|
|
})
|
|
return
|
|
}
|
|
|
|
// Get all failed jobs
|
|
rows, err := ah.gateway.GetDB().Query(`
|
|
SELECT file_hash FROM transcoding_status WHERE status = 'failed'
|
|
`)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"status": "error",
|
|
"error": fmt.Sprintf("Failed to get failed jobs: %v", err),
|
|
})
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
var failedHashes []string
|
|
for rows.Next() {
|
|
var fileHash string
|
|
if err := rows.Scan(&fileHash); err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"status": "error",
|
|
"error": fmt.Sprintf("Failed to scan file hash: %v", err),
|
|
})
|
|
return
|
|
}
|
|
failedHashes = append(failedHashes, fileHash)
|
|
}
|
|
|
|
// Retry each failed job
|
|
var errors []string
|
|
successCount := 0
|
|
for _, fileHash := range failedHashes {
|
|
// Get file metadata for this hash
|
|
var fileName string
|
|
var fileSize int64
|
|
var storageType string
|
|
err := ah.gateway.GetDB().QueryRow(`
|
|
SELECT original_name, size, storage_type FROM files WHERE hash = ?
|
|
`, fileHash).Scan(&fileName, &fileSize, &storageType)
|
|
|
|
if err != nil {
|
|
errors = append(errors, fmt.Sprintf("%s: failed to get metadata: %v", fileHash[:8], err))
|
|
continue
|
|
}
|
|
|
|
var filePath string
|
|
if storageType == "blob" {
|
|
filePath = filepath.Join("data", "blobs", fileHash)
|
|
} else if storageType == "torrent" {
|
|
// Reconstruct torrent file for transcoding
|
|
reconstructedPath, err := ah.gateway.ReconstructTorrentFile(fileHash, fileName)
|
|
if err != nil {
|
|
errors = append(errors, fmt.Sprintf("%s: failed to reconstruct: %v", fileHash[:8], err))
|
|
continue
|
|
}
|
|
filePath = reconstructedPath
|
|
} else {
|
|
errors = append(errors, fmt.Sprintf("%s: unsupported storage type: %s", fileHash[:8], storageType))
|
|
continue
|
|
}
|
|
|
|
// Reset status to queued in database
|
|
_, err = ah.gateway.GetDB().Exec(`
|
|
UPDATE transcoding_status
|
|
SET status = 'queued', error_message = NULL, updated_at = datetime('now')
|
|
WHERE file_hash = ? AND status = 'failed'
|
|
`, fileHash)
|
|
|
|
if err != nil {
|
|
errors = append(errors, fmt.Sprintf("%s: failed to reset status: %v", fileHash[:8], err))
|
|
continue
|
|
}
|
|
|
|
// Re-queue the job with the transcoding manager
|
|
ah.transcodingManager.QueueVideoForTranscoding(fileHash, fileName, filePath, fileSize)
|
|
successCount++
|
|
}
|
|
|
|
if len(errors) > 0 {
|
|
errorMsg := strings.Join(errors, "; ")
|
|
if len(errorMsg) > 500 {
|
|
errorMsg = errorMsg[:500] + "..."
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"status": "error",
|
|
"error": fmt.Sprintf("Some jobs failed to retry (%d/%d succeeded): %s", successCount, len(failedHashes), errorMsg),
|
|
})
|
|
return
|
|
}
|
|
|
|
// Log admin action
|
|
ah.adminAuth.LogAdminAction(pubkey, "retry_all_failed_transcoding_jobs", "", fmt.Sprintf("Retried %d failed jobs", failedCount))
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"status": "success",
|
|
"message": fmt.Sprintf("Queued %d failed jobs for retry", failedCount),
|
|
"count": failedCount,
|
|
})
|
|
}
|
|
|
|
// EnhancedStatsHandler returns comprehensive system statistics for admin dashboard
|
|
func (ah *AdminHandlers) EnhancedStatsHandler(w http.ResponseWriter, r *http.Request) {
|
|
_, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
stats := ah.gatherEnhancedStats()
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(stats)
|
|
}
|
|
|
|
// gatherEnhancedStats collects comprehensive system metrics
|
|
func (ah *AdminHandlers) gatherEnhancedStats() map[string]interface{} {
|
|
stats := make(map[string]interface{})
|
|
|
|
// Get database for queries
|
|
db := ah.gateway.GetDB()
|
|
|
|
// 📊 Real-Time Performance Metrics
|
|
stats["performance"] = ah.gatherPerformanceMetrics(db)
|
|
|
|
// 🎥 Video Streaming Analytics
|
|
stats["streaming"] = ah.gatherStreamingAnalytics(db)
|
|
|
|
// 🔄 Enhanced P2P Health Score
|
|
stats["p2p"] = ah.gatherP2PHealthMetrics()
|
|
|
|
// 📱 WebTorrent Integration Stats
|
|
stats["webtorrent"] = ah.gatherWebTorrentStats()
|
|
|
|
// 💾 Storage Efficiency Metrics
|
|
stats["storage"] = ah.gatherStorageEfficiencyMetrics(db)
|
|
|
|
// ⚡ System Health
|
|
stats["system"] = ah.gatherSystemHealthMetrics()
|
|
|
|
return stats
|
|
}
|
|
|
|
// gatherPerformanceMetrics collects real-time performance data
|
|
func (ah *AdminHandlers) gatherPerformanceMetrics(db *sql.DB) map[string]interface{} {
|
|
performance := make(map[string]interface{})
|
|
|
|
// Bandwidth monitoring (simplified calculation)
|
|
var totalSize int64
|
|
err := db.QueryRow("SELECT COALESCE(SUM(size), 0) FROM files WHERE created_at > date('now', '-1 day')").Scan(&totalSize)
|
|
if err == nil {
|
|
performance["bandwidth_24h"] = fmt.Sprintf("%.2f GB served", float64(totalSize)/(1024*1024*1024))
|
|
} else {
|
|
performance["bandwidth_24h"] = "0 GB served"
|
|
}
|
|
|
|
// Average response time (mock data for now)
|
|
performance["avg_response_time"] = "45ms"
|
|
|
|
// Peak concurrent users (mock data)
|
|
performance["peak_concurrent_users"] = 342
|
|
|
|
// Cache efficiency (mock calculation based on system performance)
|
|
performance["cache_efficiency"] = "84.2%"
|
|
|
|
return performance
|
|
}
|
|
|
|
// gatherStreamingAnalytics collects video streaming insights
|
|
func (ah *AdminHandlers) gatherStreamingAnalytics(db *sql.DB) map[string]interface{} {
|
|
streaming := make(map[string]interface{})
|
|
|
|
// Transcoding queue and stats
|
|
if ah.transcodingManager != nil {
|
|
jobs := ah.transcodingManager.GetAllJobs()
|
|
if jobsData, ok := jobs["jobs"].(map[string]interface{}); ok {
|
|
queueSize := 0
|
|
totalJobs := 0
|
|
successfulJobs := 0
|
|
|
|
for _, job := range jobsData {
|
|
if jobMap, ok := job.(map[string]interface{}); ok {
|
|
totalJobs++
|
|
if status, ok := jobMap["status"].(string); ok {
|
|
if status == "queued" || status == "processing" {
|
|
queueSize++
|
|
}
|
|
if status == "completed" {
|
|
successfulJobs++
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
streaming["transcoding_queue_size"] = queueSize
|
|
if totalJobs > 0 {
|
|
streaming["transcoding_success_rate"] = fmt.Sprintf("%.1f%%", float64(successfulJobs)*100/float64(totalJobs))
|
|
} else {
|
|
streaming["transcoding_success_rate"] = "100%"
|
|
}
|
|
} else {
|
|
streaming["transcoding_queue_size"] = 0
|
|
streaming["transcoding_success_rate"] = "100%"
|
|
}
|
|
} else {
|
|
streaming["transcoding_queue_size"] = 0
|
|
streaming["transcoding_success_rate"] = "N/A (disabled)"
|
|
}
|
|
|
|
// HLS streams active (mock data)
|
|
streaming["hls_streams_active"] = 23
|
|
|
|
// Quality distribution (mock data)
|
|
streaming["quality_distribution"] = map[string]string{
|
|
"1080p": "45%",
|
|
"720p": "35%",
|
|
"480p": "20%",
|
|
}
|
|
|
|
return streaming
|
|
}
|
|
|
|
// gatherP2PHealthMetrics collects detailed P2P health information
|
|
func (ah *AdminHandlers) gatherP2PHealthMetrics() map[string]interface{} {
|
|
p2p := make(map[string]interface{})
|
|
|
|
// P2P health score calculation (mock implementation)
|
|
p2p["health_score"] = 87
|
|
p2p["dht_node_count"] = 1249
|
|
p2p["webseed_hit_ratio"] = "91.2%"
|
|
p2p["torrent_completion_avg"] = "2.1 mins"
|
|
|
|
return p2p
|
|
}
|
|
|
|
// gatherWebTorrentStats collects WebTorrent client statistics
|
|
func (ah *AdminHandlers) gatherWebTorrentStats() map[string]interface{} {
|
|
webtorrent := make(map[string]interface{})
|
|
|
|
// WebTorrent peer statistics (mock data)
|
|
webtorrent["peers_active"] = 45
|
|
webtorrent["browser_client_types"] = map[string]int{
|
|
"Chrome": 60,
|
|
"Firefox": 25,
|
|
"Safari": 15,
|
|
}
|
|
|
|
return webtorrent
|
|
}
|
|
|
|
// gatherStorageEfficiencyMetrics collects storage optimization data
|
|
func (ah *AdminHandlers) gatherStorageEfficiencyMetrics(db *sql.DB) map[string]interface{} {
|
|
storage := make(map[string]interface{})
|
|
|
|
// Get blob vs torrent distribution
|
|
var blobCount, torrentCount int
|
|
db.QueryRow("SELECT COUNT(*) FROM files WHERE storage_type = 'blob'").Scan(&blobCount)
|
|
db.QueryRow("SELECT COUNT(*) FROM files WHERE storage_type = 'torrent'").Scan(&torrentCount)
|
|
|
|
total := blobCount + torrentCount
|
|
if total > 0 {
|
|
blobPercent := (blobCount * 100) / total
|
|
torrentPercent := 100 - blobPercent
|
|
storage["blob_vs_torrent_ratio"] = fmt.Sprintf("%d/%d", blobPercent, torrentPercent)
|
|
} else {
|
|
storage["blob_vs_torrent_ratio"] = "0/0"
|
|
}
|
|
|
|
// Deduplication savings (mock calculation)
|
|
storage["deduplication_savings"] = "23.4%"
|
|
|
|
// Transcoded storage ratio (mock data)
|
|
storage["transcoded_storage_ratio"] = "1.8x original"
|
|
|
|
return storage
|
|
}
|
|
|
|
// gatherSystemHealthMetrics collects system health information
|
|
func (ah *AdminHandlers) gatherSystemHealthMetrics() map[string]interface{} {
|
|
system := make(map[string]interface{})
|
|
|
|
// System uptime
|
|
uptime := time.Since(time.Now().Add(-24 * time.Hour)) // Mock 24h uptime
|
|
system["uptime"] = formatUptime(uptime)
|
|
|
|
// Memory usage (mock data)
|
|
system["memory_usage_mb"] = 512
|
|
|
|
// CPU usage (mock data)
|
|
system["cpu_usage_percent"] = 15.3
|
|
|
|
// Disk usage (mock data)
|
|
system["disk_usage_percent"] = 45.7
|
|
|
|
return system
|
|
}
|
|
|
|
// formatUptime formats duration to human readable uptime
|
|
func formatUptime(d time.Duration) string {
|
|
days := int(d.Hours()) / 24
|
|
hours := int(d.Hours()) % 24
|
|
|
|
if days > 0 {
|
|
return fmt.Sprintf("%dd %dh", days, hours)
|
|
}
|
|
return fmt.Sprintf("%dh", hours)
|
|
}
|
|
|
|
// ClearFailedJobsHandler clears all failed transcoding jobs
|
|
func (ah *AdminHandlers) ClearFailedJobsHandler(w http.ResponseWriter, r *http.Request) {
|
|
pubkey, err := ah.adminAuth.ValidateAdminRequest(r)
|
|
if err != nil {
|
|
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
if ah.transcodingManager == nil {
|
|
http.Error(w, "Transcoding not enabled", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
err = ah.transcodingManager.ClearFailedJobs()
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"status": "error",
|
|
"error": fmt.Sprintf("Failed to clear failed jobs: %v", err),
|
|
})
|
|
return
|
|
}
|
|
|
|
// Log admin action
|
|
ah.adminAuth.LogAdminAction(pubkey, "clear_failed_jobs", "all", "Cleared all failed transcoding jobs")
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]string{"status": "success", "message": "Failed jobs cleared"})
|
|
} |