diff --git a/internal/storage/backend.go b/internal/storage/backend.go new file mode 100644 index 0000000..6af0ccd --- /dev/null +++ b/internal/storage/backend.go @@ -0,0 +1,1070 @@ +package storage + +import ( + "crypto/sha256" + "database/sql" + "encoding/hex" + "fmt" + "io" + "log" + "os" + "path/filepath" + "strings" + "time" + + "torrentGateway/internal/config" + _ "github.com/mattn/go-sqlite3" +) + +// Backend provides unified storage for chunks, blobs, and metadata +type Backend struct { + db *sql.DB + chunkDir string + blobDir string + chunkSize int64 + config *config.Config +} + +// FileMetadata represents metadata about stored files +type FileMetadata struct { + Hash string + OriginalName string + Size int64 + ChunkCount int + ContentType string + StorageType string + OwnerPubkey string + AccessLevel string + CreatedAt time.Time + AccessCount int64 + LastAccess time.Time +} + +// ChunkInfo represents information about a file chunk +type ChunkInfo struct { + FileHash string + ChunkIndex int + ChunkHash string + Size int64 + Offset int64 +} + +// BlobInfo represents information about a Blossom blob +type BlobInfo struct { + Hash string + Size int64 + CreatedAt time.Time + MimeType string +} + +// NewBackend creates a new unified storage backend +func NewBackend(dbPath, chunkDir, blobDir string, chunkSize int64, cfg *config.Config) (*Backend, error) { + // Create directories + if err := os.MkdirAll(chunkDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create chunk directory: %w", err) + } + if err := os.MkdirAll(blobDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create blob directory: %w", err) + } + + // Ensure database directory exists + dbDir := filepath.Dir(dbPath) + if err := os.MkdirAll(dbDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create database directory: %w", err) + } + + // Open database + db, err := sql.Open("sqlite3", dbPath+"?cache=shared&mode=rwc") + if err != nil { + return nil, fmt.Errorf("failed to open database: %w", err) + } + + backend := &Backend{ + db: db, + chunkDir: chunkDir, + blobDir: blobDir, + chunkSize: chunkSize, + config: cfg, + } + + if err := backend.initializeSchema(); err != nil { + return nil, fmt.Errorf("failed to initialize database schema: %w", err) + } + + // Run migrations + if err := backend.runMigrations(); err != nil { + return nil, fmt.Errorf("failed to run migrations: %w", err) + } + + return backend, nil +} + +// initializeSchema creates the database tables +func (b *Backend) initializeSchema() error { + queries := []string{ + `CREATE TABLE IF NOT EXISTS files ( + hash TEXT PRIMARY KEY, + original_name TEXT NOT NULL, + size INTEGER NOT NULL, + chunk_count INTEGER NOT NULL, + content_type TEXT, + storage_type TEXT DEFAULT 'torrent', + owner_pubkey TEXT, + access_level TEXT DEFAULT 'public', + info_hash TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + access_count INTEGER DEFAULT 0, + last_access DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (owner_pubkey) REFERENCES users(pubkey) + )`, + `CREATE TABLE IF NOT EXISTS chunks ( + file_hash TEXT NOT NULL, + chunk_index INTEGER NOT NULL, + chunk_hash TEXT NOT NULL, + size INTEGER NOT NULL, + offset INTEGER NOT NULL, + PRIMARY KEY (file_hash, chunk_index), + FOREIGN KEY (file_hash) REFERENCES files(hash) + )`, + `CREATE TABLE IF NOT EXISTS blobs ( + hash TEXT PRIMARY KEY, + size INTEGER NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + mime_type TEXT, + access_count INTEGER DEFAULT 0, + last_access DATETIME DEFAULT CURRENT_TIMESTAMP + )`, + `CREATE TABLE IF NOT EXISTS users ( + pubkey TEXT PRIMARY KEY, + display_name TEXT, + profile_image TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + last_login DATETIME DEFAULT CURRENT_TIMESTAMP, + storage_used INTEGER DEFAULT 0, + file_count INTEGER DEFAULT 0 + )`, + `CREATE TABLE IF NOT EXISTS sessions ( + token TEXT PRIMARY KEY, + pubkey TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + expires_at DATETIME NOT NULL, + FOREIGN KEY (pubkey) REFERENCES users(pubkey) + )`, + `CREATE TABLE IF NOT EXISTS admin_actions ( + id INTEGER PRIMARY KEY, + admin_pubkey TEXT NOT NULL, + action_type TEXT NOT NULL, + target_id TEXT, + reason TEXT, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP + )`, + `CREATE TABLE IF NOT EXISTS banned_users ( + pubkey TEXT PRIMARY KEY, + banned_by TEXT NOT NULL, + reason TEXT, + banned_at DATETIME DEFAULT CURRENT_TIMESTAMP + )`, + `CREATE TABLE IF NOT EXISTS content_reports ( + id INTEGER PRIMARY KEY, + file_hash TEXT NOT NULL, + reporter_pubkey TEXT, + reason TEXT, + status TEXT DEFAULT 'pending', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (file_hash) REFERENCES files(hash) + )`, + `CREATE INDEX IF NOT EXISTS idx_files_created_at ON files(created_at)`, + `CREATE INDEX IF NOT EXISTS idx_files_owner_pubkey ON files(owner_pubkey)`, + `CREATE INDEX IF NOT EXISTS idx_files_storage_type ON files(storage_type)`, + `CREATE INDEX IF NOT EXISTS idx_files_access_level ON files(access_level)`, + `CREATE INDEX IF NOT EXISTS idx_files_size ON files(size)`, + `CREATE INDEX IF NOT EXISTS idx_files_last_access ON files(last_access)`, + `CREATE INDEX IF NOT EXISTS idx_files_owner_created ON files(owner_pubkey, created_at)`, + `CREATE INDEX IF NOT EXISTS idx_blobs_created_at ON blobs(created_at)`, + `CREATE INDEX IF NOT EXISTS idx_blobs_size ON blobs(size)`, + `CREATE INDEX IF NOT EXISTS idx_blobs_mime_type ON blobs(mime_type)`, + `CREATE INDEX IF NOT EXISTS idx_chunks_file_hash ON chunks(file_hash)`, + `CREATE INDEX IF NOT EXISTS idx_chunks_chunk_hash ON chunks(chunk_hash)`, + `CREATE INDEX IF NOT EXISTS idx_sessions_pubkey ON sessions(pubkey)`, + `CREATE INDEX IF NOT EXISTS idx_sessions_expires_at ON sessions(expires_at)`, + `CREATE INDEX IF NOT EXISTS idx_sessions_created_at ON sessions(created_at)`, + `CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at)`, + `CREATE INDEX IF NOT EXISTS idx_users_last_login ON users(last_login)`, + `CREATE INDEX IF NOT EXISTS idx_admin_actions_timestamp ON admin_actions(timestamp)`, + `CREATE INDEX IF NOT EXISTS idx_admin_actions_admin_pubkey ON admin_actions(admin_pubkey)`, + `CREATE INDEX IF NOT EXISTS idx_admin_actions_action_type ON admin_actions(action_type)`, + `CREATE INDEX IF NOT EXISTS idx_banned_users_banned_at ON banned_users(banned_at)`, + `CREATE INDEX IF NOT EXISTS idx_banned_users_banned_by ON banned_users(banned_by)`, + `CREATE INDEX IF NOT EXISTS idx_content_reports_status ON content_reports(status)`, + `CREATE INDEX IF NOT EXISTS idx_content_reports_file_hash ON content_reports(file_hash)`, + `CREATE INDEX IF NOT EXISTS idx_content_reports_created_at ON content_reports(created_at)`, + } + + for _, query := range queries { + if _, err := b.db.Exec(query); err != nil { + return fmt.Errorf("failed to execute schema query: %w", err) + } + } + + // Migration: Add storage_type column if it doesn't exist + _, err := b.db.Exec(`ALTER TABLE files ADD COLUMN storage_type TEXT DEFAULT 'torrent'`) + if err != nil && !isColumnExistsError(err) { + return fmt.Errorf("failed to add storage_type column: %w", err) + } + + // Migration: Add owner_pubkey column if it doesn't exist + _, err = b.db.Exec(`ALTER TABLE files ADD COLUMN owner_pubkey TEXT`) + if err != nil && !isColumnExistsError(err) { + return fmt.Errorf("failed to add owner_pubkey column: %w", err) + } + + // Migration: Add access_level column if it doesn't exist + _, err = b.db.Exec(`ALTER TABLE files ADD COLUMN access_level TEXT DEFAULT 'public'`) + if err != nil && !isColumnExistsError(err) { + return fmt.Errorf("failed to add access_level column: %w", err) + } + + // Migration: Add storage limit fields to users table + defaultStorageLimit, _ := b.config.GetDefaultUserStorageLimitBytes() + _, err = b.db.Exec(`ALTER TABLE users ADD COLUMN storage_limit INTEGER DEFAULT ?`, defaultStorageLimit) + if err != nil && !isColumnExistsError(err) { + return fmt.Errorf("failed to add storage_limit column: %w", err) + } + + _, err = b.db.Exec(`ALTER TABLE users ADD COLUMN account_tier TEXT DEFAULT 'free'`) + if err != nil && !isColumnExistsError(err) { + return fmt.Errorf("failed to add account_tier column: %w", err) + } + + _, err = b.db.Exec(`ALTER TABLE users ADD COLUMN subscription_expires DATETIME`) + if err != nil && !isColumnExistsError(err) { + return fmt.Errorf("failed to add subscription_expires column: %w", err) + } + + // Create indexes for new columns after they've been added + additionalIndexes := []string{ + `CREATE INDEX IF NOT EXISTS idx_users_storage_used ON users(storage_used)`, + `CREATE INDEX IF NOT EXISTS idx_users_account_tier ON users(account_tier)`, + } + + for _, query := range additionalIndexes { + if _, err := b.db.Exec(query); err != nil { + // Log but don't fail if index creation fails + log.Printf("Warning: failed to create index: %v", err) + } + } + + return nil +} + +// isColumnExistsError checks if the error is due to column already existing +func isColumnExistsError(err error) bool { + return err != nil && ( + strings.Contains(err.Error(), "duplicate column name") || + strings.Contains(err.Error(), "column already exists")) +} + +// StoreFile stores a file by splitting it into chunks +func (b *Backend) StoreFile(reader io.Reader, originalName, contentType string) (*FileMetadata, error) { + return b.StoreFileWithOwner(reader, originalName, contentType, "", "public") +} + +// StoreFileWithOwner stores a file with ownership information +func (b *Backend) StoreFileWithOwner(reader io.Reader, originalName, contentType, ownerPubkey, accessLevel string) (*FileMetadata, error) { + // Create temporary file to calculate hash and size + tempFile, err := os.CreateTemp("", "upload_*") + if err != nil { + return nil, fmt.Errorf("failed to create temp file: %w", err) + } + defer os.Remove(tempFile.Name()) + defer tempFile.Close() + + // Copy and hash the entire file + hasher := sha256.New() + size, err := io.Copy(io.MultiWriter(tempFile, hasher), reader) + if err != nil { + return nil, fmt.Errorf("failed to copy file: %w", err) + } + + fileHash := hex.EncodeToString(hasher.Sum(nil)) + + // Check if file already exists + if existing, _ := b.GetFileMetadata(fileHash); existing != nil { + return existing, nil + } + + // Seek back to beginning for chunking + if _, err := tempFile.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("failed to seek temp file: %w", err) + } + + // Store chunks + chunkCount := int((size + b.chunkSize - 1) / b.chunkSize) + chunks := make([]ChunkInfo, 0, chunkCount) + + for i := 0; i < chunkCount; i++ { + chunkData := make([]byte, b.chunkSize) + chunkSize, err := io.ReadFull(tempFile, chunkData) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + return nil, fmt.Errorf("failed to read chunk %d: %w", i, err) + } + + // Trim to actual size + chunkData = chunkData[:chunkSize] + + // Calculate chunk hash + chunkHasher := sha256.New() + chunkHasher.Write(chunkData) + chunkHash := hex.EncodeToString(chunkHasher.Sum(nil)) + + // Store chunk to disk + chunkPath := filepath.Join(b.chunkDir, chunkHash) + if err := os.WriteFile(chunkPath, chunkData, 0644); err != nil { + return nil, fmt.Errorf("failed to write chunk %s: %w", chunkHash, err) + } + + chunk := ChunkInfo{ + FileHash: fileHash, + ChunkIndex: i, + ChunkHash: chunkHash, + Size: int64(chunkSize), + Offset: int64(i) * b.chunkSize, + } + chunks = append(chunks, chunk) + } + + // Store metadata in database + tx, err := b.db.Begin() + if err != nil { + return nil, fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + // Insert file metadata + _, err = tx.Exec(` + INSERT INTO files (hash, original_name, size, chunk_count, content_type, storage_type, owner_pubkey, access_level) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `, fileHash, originalName, size, chunkCount, contentType, "torrent", ownerPubkey, accessLevel) + if err != nil { + return nil, fmt.Errorf("failed to insert file metadata: %w", err) + } + + // Insert chunk metadata + for _, chunk := range chunks { + _, err = tx.Exec(` + INSERT INTO chunks (file_hash, chunk_index, chunk_hash, size, offset) + VALUES (?, ?, ?, ?, ?) + `, chunk.FileHash, chunk.ChunkIndex, chunk.ChunkHash, chunk.Size, chunk.Offset) + if err != nil { + return nil, fmt.Errorf("failed to insert chunk metadata: %w", err) + } + } + + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("failed to commit transaction: %w", err) + } + + metadata := &FileMetadata{ + Hash: fileHash, + OriginalName: originalName, + Size: size, + ChunkCount: chunkCount, + ContentType: contentType, + StorageType: "torrent", + OwnerPubkey: ownerPubkey, + AccessLevel: accessLevel, + CreatedAt: time.Now(), + AccessCount: 0, + LastAccess: time.Now(), + } + + return metadata, nil +} + +// StoreBlobAsFile stores a blob and also records it in files table for unified access +func (b *Backend) StoreBlobAsFile(reader io.Reader, originalName, contentType string) (*FileMetadata, error) { + return b.StoreBlobAsFileWithOwner(reader, originalName, contentType, "", "public") +} + +// StoreBlobAsFileWithOwner stores a blob with ownership information +func (b *Backend) StoreBlobAsFileWithOwner(reader io.Reader, originalName, contentType, ownerPubkey, accessLevel string) (*FileMetadata, error) { + // Read and hash the blob + hasher := sha256.New() + tempFile, err := os.CreateTemp("", "blob_*") + if err != nil { + return nil, fmt.Errorf("failed to create temp file: %w", err) + } + defer os.Remove(tempFile.Name()) + defer tempFile.Close() + + // Copy and hash the entire file + size, err := io.Copy(io.MultiWriter(tempFile, hasher), reader) + if err != nil { + return nil, fmt.Errorf("failed to copy blob: %w", err) + } + + blobHash := hex.EncodeToString(hasher.Sum(nil)) + + // Check if file already exists + if existing, _ := b.GetFileMetadata(blobHash); existing != nil { + return existing, nil + } + + // Seek back to beginning for storage + if _, err := tempFile.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("failed to seek temp file: %w", err) + } + + // Store blob to disk + blobPath := filepath.Join(b.blobDir, blobHash) + blobFile, err := os.Create(blobPath) + if err != nil { + return nil, fmt.Errorf("failed to create blob file: %w", err) + } + defer blobFile.Close() + + if _, err := io.Copy(blobFile, tempFile); err != nil { + os.Remove(blobPath) + return nil, fmt.Errorf("failed to write blob file: %w", err) + } + + // Start transaction + tx, err := b.db.Begin() + if err != nil { + return nil, fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + // Insert blob metadata + _, err = tx.Exec(` + INSERT INTO blobs (hash, size, mime_type) + VALUES (?, ?, ?) + `, blobHash, size, contentType) + if err != nil { + return nil, fmt.Errorf("failed to insert blob metadata: %w", err) + } + + // Insert file metadata for unified access + _, err = tx.Exec(` + INSERT INTO files (hash, original_name, size, chunk_count, content_type, storage_type, owner_pubkey, access_level) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `, blobHash, originalName, size, 1, contentType, "blob", ownerPubkey, accessLevel) + if err != nil { + return nil, fmt.Errorf("failed to insert file metadata: %w", err) + } + + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("failed to commit transaction: %w", err) + } + + metadata := &FileMetadata{ + Hash: blobHash, + OriginalName: originalName, + Size: size, + ChunkCount: 1, + ContentType: contentType, + StorageType: "blob", + OwnerPubkey: ownerPubkey, + AccessLevel: accessLevel, + CreatedAt: time.Now(), + AccessCount: 0, + LastAccess: time.Now(), + } + + return metadata, nil +} + +// GetFileMetadata retrieves metadata for a file +func (b *Backend) GetFileMetadata(hash string) (*FileMetadata, error) { + var metadata FileMetadata + err := b.db.QueryRow(` + SELECT hash, original_name, size, chunk_count, content_type, + COALESCE(storage_type, 'torrent'), COALESCE(owner_pubkey, ''), + COALESCE(access_level, 'public'), created_at, access_count, last_access + FROM files WHERE hash = ? + `, hash).Scan( + &metadata.Hash, &metadata.OriginalName, &metadata.Size, + &metadata.ChunkCount, &metadata.ContentType, &metadata.StorageType, + &metadata.OwnerPubkey, &metadata.AccessLevel, &metadata.CreatedAt, + &metadata.AccessCount, &metadata.LastAccess, + ) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, fmt.Errorf("failed to get file metadata: %w", err) + } + + return &metadata, nil +} + +// GetFileChunks retrieves all chunks for a file +func (b *Backend) GetFileChunks(fileHash string) ([]ChunkInfo, error) { + rows, err := b.db.Query(` + SELECT file_hash, chunk_index, chunk_hash, size, offset + FROM chunks WHERE file_hash = ? ORDER BY chunk_index + `, fileHash) + if err != nil { + return nil, fmt.Errorf("failed to query chunks: %w", err) + } + defer rows.Close() + + var chunks []ChunkInfo + for rows.Next() { + var chunk ChunkInfo + if err := rows.Scan(&chunk.FileHash, &chunk.ChunkIndex, &chunk.ChunkHash, &chunk.Size, &chunk.Offset); err != nil { + return nil, fmt.Errorf("failed to scan chunk: %w", err) + } + chunks = append(chunks, chunk) + } + + return chunks, nil +} + +// GetChunkData retrieves raw data for a specific chunk +func (b *Backend) GetChunkData(chunkHash string) ([]byte, error) { + chunkPath := filepath.Join(b.chunkDir, chunkHash) + data, err := os.ReadFile(chunkPath) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, fmt.Errorf("failed to read chunk %s: %w", chunkHash, err) + } + return data, nil +} + +// AssembleFile reassembles a complete file from its chunks +func (b *Backend) AssembleFile(fileHash string, writer io.Writer) error { + chunks, err := b.GetFileChunks(fileHash) + if err != nil { + return fmt.Errorf("failed to get chunks: %w", err) + } + + for _, chunk := range chunks { + chunkData, err := b.GetChunkData(chunk.ChunkHash) + if err != nil { + return fmt.Errorf("failed to get chunk data: %w", err) + } + if chunkData == nil { + return fmt.Errorf("chunk %s not found", chunk.ChunkHash) + } + + if _, err := writer.Write(chunkData); err != nil { + return fmt.Errorf("failed to write chunk: %w", err) + } + } + + // Update access statistics + _, err = b.db.Exec(` + UPDATE files SET access_count = access_count + 1, last_access = CURRENT_TIMESTAMP + WHERE hash = ? + `, fileHash) + if err != nil { + // Log but don't fail on stats update + fmt.Printf("Warning: failed to update access stats for %s: %v\n", fileHash, err) + } + + return nil +} + +// StoreBlob stores a Blossom blob +func (b *Backend) StoreBlob(reader io.Reader, mimeType string) (string, error) { + // Read and hash the blob + hasher := sha256.New() + tempFile, err := os.CreateTemp("", "blob_*") + if err != nil { + return "", fmt.Errorf("failed to create temp file: %w", err) + } + defer os.Remove(tempFile.Name()) + defer tempFile.Close() + + size, err := io.Copy(io.MultiWriter(tempFile, hasher), reader) + if err != nil { + return "", fmt.Errorf("failed to copy blob: %w", err) + } + + blobHash := hex.EncodeToString(hasher.Sum(nil)) + blobPath := filepath.Join(b.blobDir, blobHash) + + // Check if blob already exists + if _, err := os.Stat(blobPath); err == nil { + return blobHash, nil + } + + // Move temp file to blob location + if _, err := tempFile.Seek(0, io.SeekStart); err != nil { + return "", fmt.Errorf("failed to seek temp file: %w", err) + } + + blobFile, err := os.Create(blobPath) + if err != nil { + return "", fmt.Errorf("failed to create blob file: %w", err) + } + defer blobFile.Close() + + if _, err := io.Copy(blobFile, tempFile); err != nil { + os.Remove(blobPath) + return "", fmt.Errorf("failed to copy to blob file: %w", err) + } + + // Store blob metadata + _, err = b.db.Exec(` + INSERT OR REPLACE INTO blobs (hash, size, mime_type) + VALUES (?, ?, ?) + `, blobHash, size, mimeType) + if err != nil { + os.Remove(blobPath) + return "", fmt.Errorf("failed to store blob metadata: %w", err) + } + + return blobHash, nil +} + +// GetBlobData retrieves blob data +func (b *Backend) GetBlobData(blobHash string) (io.ReadCloser, *BlobInfo, error) { + // Get blob metadata + var info BlobInfo + err := b.db.QueryRow(` + SELECT hash, size, created_at, mime_type + FROM blobs WHERE hash = ? + `, blobHash).Scan(&info.Hash, &info.Size, &info.CreatedAt, &info.MimeType) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil, nil + } + return nil, nil, fmt.Errorf("failed to get blob metadata: %w", err) + } + + // Open blob file + blobPath := filepath.Join(b.blobDir, blobHash) + file, err := os.Open(blobPath) + if err != nil { + if os.IsNotExist(err) { + return nil, nil, nil + } + return nil, nil, fmt.Errorf("failed to open blob file: %w", err) + } + + // Update access statistics + _, err = b.db.Exec(` + UPDATE blobs SET access_count = access_count + 1, last_access = CURRENT_TIMESTAMP + WHERE hash = ? + `, blobHash) + if err != nil { + // Log but don't fail on stats update + fmt.Printf("Warning: failed to update blob access stats for %s: %v\n", blobHash, err) + } + + return file, &info, nil +} + +// GetChunkHashes retrieves all chunk hashes for a file in order +func (b *Backend) GetChunkHashes(fileHash string) ([]string, error) { + rows, err := b.db.Query(` + SELECT chunk_hash + FROM chunks + WHERE file_hash = ? + ORDER BY chunk_index + `, fileHash) + if err != nil { + return nil, fmt.Errorf("failed to query chunks: %w", err) + } + defer rows.Close() + + var chunkHashes []string + for rows.Next() { + var chunkHash string + if err := rows.Scan(&chunkHash); err != nil { + return nil, fmt.Errorf("failed to scan chunk hash: %w", err) + } + chunkHashes = append(chunkHashes, chunkHash) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating chunks: %w", err) + } + + return chunkHashes, nil +} + +// GetUserFiles retrieves all files owned by a user +func (b *Backend) GetUserFiles(pubkey string) ([]*FileMetadata, error) { + rows, err := b.db.Query(` + SELECT hash, original_name, size, chunk_count, content_type, + COALESCE(storage_type, 'torrent'), COALESCE(owner_pubkey, ''), + COALESCE(access_level, 'public'), created_at, access_count, last_access + FROM files + WHERE owner_pubkey = ? + ORDER BY created_at DESC + `, pubkey) + if err != nil { + return nil, fmt.Errorf("failed to query user files: %w", err) + } + defer rows.Close() + + var files []*FileMetadata + for rows.Next() { + var metadata FileMetadata + err := rows.Scan( + &metadata.Hash, &metadata.OriginalName, &metadata.Size, + &metadata.ChunkCount, &metadata.ContentType, &metadata.StorageType, + &metadata.OwnerPubkey, &metadata.AccessLevel, &metadata.CreatedAt, + &metadata.AccessCount, &metadata.LastAccess, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan file metadata: %w", err) + } + files = append(files, &metadata) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating files: %w", err) + } + + return files, nil +} + +// GetUserStats calculates storage statistics for a user +func (b *Backend) GetUserStats(pubkey string) (int64, int, error) { + var storageUsed int64 + var fileCount int + + err := b.db.QueryRow(` + SELECT COALESCE(SUM(size), 0), COUNT(*) + FROM files + WHERE owner_pubkey = ? + `, pubkey).Scan(&storageUsed, &fileCount) + if err != nil { + return 0, 0, fmt.Errorf("failed to get user stats: %w", err) + } + + return storageUsed, fileCount, nil +} + +// DeleteUserFile deletes a file owned by a user +func (b *Backend) DeleteUserFile(hash, ownerPubkey string) error { + // Verify ownership + metadata, err := b.GetFileMetadata(hash) + if err != nil { + return fmt.Errorf("failed to get file metadata: %w", err) + } + if metadata == nil { + return fmt.Errorf("file not found") + } + if metadata.OwnerPubkey != ownerPubkey { + return fmt.Errorf("permission denied: not file owner") + } + + // Start transaction + tx, err := b.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + // Delete chunks if it's a torrent file + if metadata.StorageType == "torrent" { + // Get chunk hashes for deletion + chunkHashes, err := b.GetChunkHashes(hash) + if err != nil { + return fmt.Errorf("failed to get chunk hashes: %w", err) + } + + // Delete chunk files + for _, chunkHash := range chunkHashes { + chunkPath := filepath.Join(b.chunkDir, chunkHash) + if err := os.Remove(chunkPath); err != nil && !os.IsNotExist(err) { + log.Printf("Warning: failed to delete chunk file %s: %v", chunkHash, err) + } + } + + // Delete chunk metadata + _, err = tx.Exec(`DELETE FROM chunks WHERE file_hash = ?`, hash) + if err != nil { + return fmt.Errorf("failed to delete chunk metadata: %w", err) + } + } + + // Delete blob file if it's a blob + if metadata.StorageType == "blob" { + blobPath := filepath.Join(b.blobDir, hash) + if err := os.Remove(blobPath); err != nil && !os.IsNotExist(err) { + log.Printf("Warning: failed to delete blob file %s: %v", hash, err) + } + + // Delete blob metadata + _, err = tx.Exec(`DELETE FROM blobs WHERE hash = ?`, hash) + if err != nil { + return fmt.Errorf("failed to delete blob metadata: %w", err) + } + } + + // Delete file metadata + _, err = tx.Exec(`DELETE FROM files WHERE hash = ?`, hash) + if err != nil { + return fmt.Errorf("failed to delete file metadata: %w", err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} + +// AdminDeleteFile allows admin to delete any file without ownership check +func (b *Backend) AdminDeleteFile(hash string) error { + // Get file metadata + metadata, err := b.GetFileMetadata(hash) + if err != nil { + return fmt.Errorf("failed to get file metadata: %w", err) + } + if metadata == nil { + return fmt.Errorf("file not found") + } + + // Start transaction + tx, err := b.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + // Delete chunks if it's a torrent file + if metadata.StorageType == "torrent" { + // Get chunk hashes for deletion + chunkHashes, err := b.GetChunkHashes(hash) + if err != nil { + return fmt.Errorf("failed to get chunk hashes: %w", err) + } + + // Delete chunk files + for _, chunkHash := range chunkHashes { + chunkPath := filepath.Join(b.chunkDir, chunkHash) + if err := os.Remove(chunkPath); err != nil && !os.IsNotExist(err) { + log.Printf("Warning: failed to delete chunk file %s: %v", chunkHash, err) + } + } + + // Delete chunk metadata + _, err = tx.Exec(`DELETE FROM chunks WHERE file_hash = ?`, hash) + if err != nil { + return fmt.Errorf("failed to delete chunk metadata: %w", err) + } + } else if metadata.StorageType == "blob" { + // Delete blob file + blobPath := filepath.Join(b.blobDir, hash) + if err := os.Remove(blobPath); err != nil && !os.IsNotExist(err) { + log.Printf("Warning: failed to delete blob file %s: %v", hash, err) + } + + // Delete blob metadata + _, err = tx.Exec(`DELETE FROM blobs WHERE hash = ?`, hash) + if err != nil { + return fmt.Errorf("failed to delete blob metadata: %w", err) + } + } + + // Delete content reports + _, err = tx.Exec(`DELETE FROM content_reports WHERE file_hash = ?`, hash) + if err != nil { + return fmt.Errorf("failed to delete content reports: %w", err) + } + + // Delete file metadata + _, err = tx.Exec(`DELETE FROM files WHERE hash = ?`, hash) + if err != nil { + return fmt.Errorf("failed to delete file metadata: %w", err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} + +// UpdateFileAccess updates the access level of a file +func (b *Backend) UpdateFileAccess(hash, ownerPubkey, accessLevel string) error { + // Verify ownership + metadata, err := b.GetFileMetadata(hash) + if err != nil { + return fmt.Errorf("failed to get file metadata: %w", err) + } + if metadata == nil { + return fmt.Errorf("file not found") + } + if metadata.OwnerPubkey != ownerPubkey { + return fmt.Errorf("permission denied: not file owner") + } + + // Update access level + _, err = b.db.Exec(`UPDATE files SET access_level = ? WHERE hash = ?`, accessLevel, hash) + if err != nil { + return fmt.Errorf("failed to update file access level: %w", err) + } + + return nil +} + +// CheckFileAccess checks if a user can access a file based on access level +func (b *Backend) CheckFileAccess(hash, requestorPubkey string) (bool, error) { + metadata, err := b.GetFileMetadata(hash) + if err != nil { + return false, fmt.Errorf("failed to get file metadata: %w", err) + } + if metadata == nil { + return false, nil + } + + switch metadata.AccessLevel { + case "public": + return true, nil + case "private": + return metadata.OwnerPubkey == requestorPubkey, nil + case "followers": + // TODO: Implement follower checking via Nostr social graph + // For now, treat as private + return metadata.OwnerPubkey == requestorPubkey, nil + default: + return false, nil + } +} + +// GetDB returns the database connection for auth module +func (b *Backend) GetDB() *sql.DB { + return b.db +} + +// Close closes the storage backend +func (b *Backend) Close() error { + if b.db != nil { + return b.db.Close() + } + return nil +} + +// FileExistsByInfoHash checks if a file exists by its BitTorrent info hash +func (b *Backend) FileExistsByInfoHash(infoHash string) (bool, error) { + var count int + err := b.db.QueryRow(` + SELECT COUNT(*) FROM files WHERE info_hash = ? + `, infoHash).Scan(&count) + + if err != nil { + return false, err + } + + return count > 0, nil +} + +// GetFileHashByInfoHash returns the file hash for a given info hash +func (b *Backend) GetFileHashByInfoHash(infoHash string) (string, error) { + var fileHash string + err := b.db.QueryRow(` + SELECT hash FROM files WHERE info_hash = ? + `, infoHash).Scan(&fileHash) + + if err != nil { + return "", err + } + + return fileHash, nil +} + +// UpdateFileInfoHash updates the info_hash for a file +func (b *Backend) UpdateFileInfoHash(fileHash, infoHash string) error { + _, err := b.db.Exec(` + UPDATE files SET info_hash = ? WHERE hash = ? + `, infoHash, fileHash) + + if err != nil { + return fmt.Errorf("failed to update info_hash: %w", err) + } + + return nil +} + +// StoreNostrEvents stores Nostr event IDs for a file +func (b *Backend) StoreNostrEvents(fileHash, nip71EventID, nip35EventID string) error { + _, err := b.db.Exec(` + UPDATE files SET nip71_event_id = ?, nip35_event_id = ? WHERE hash = ? + `, nip71EventID, nip35EventID, fileHash) + + if err != nil { + return fmt.Errorf("failed to store nostr event IDs: %w", err) + } + + return nil +} + +// GetNostrEvents retrieves Nostr event IDs for a file +func (b *Backend) GetNostrEvents(fileHash string) (nip71EventID, nip35EventID string, err error) { + err = b.db.QueryRow(` + SELECT COALESCE(nip71_event_id, ''), COALESCE(nip35_event_id, '') FROM files WHERE hash = ? + `, fileHash).Scan(&nip71EventID, &nip35EventID) + + if err != nil { + return "", "", fmt.Errorf("failed to get nostr event IDs: %w", err) + } + + return nip71EventID, nip35EventID, nil +} + +// runMigrations applies database schema migrations +func (b *Backend) runMigrations() error { + // Check if info_hash column exists + var columnExists bool + err := b.db.QueryRow(` + SELECT COUNT(*) > 0 FROM pragma_table_info('files') WHERE name = 'info_hash' + `).Scan(&columnExists) + + if err != nil { + return fmt.Errorf("failed to check info_hash column: %w", err) + } + + // Add info_hash column if it doesn't exist + if !columnExists { + _, err = b.db.Exec(`ALTER TABLE files ADD COLUMN info_hash TEXT`) + if err != nil { + return fmt.Errorf("failed to add info_hash column: %w", err) + } + log.Printf("Added info_hash column to files table") + } + + // Check if nostr event ID columns exist + var nip71ColumnExists, nip35ColumnExists bool + err = b.db.QueryRow(` + SELECT COUNT(*) > 0 FROM pragma_table_info('files') WHERE name = 'nip71_event_id' + `).Scan(&nip71ColumnExists) + if err != nil { + return fmt.Errorf("failed to check nip71_event_id column: %w", err) + } + + err = b.db.QueryRow(` + SELECT COUNT(*) > 0 FROM pragma_table_info('files') WHERE name = 'nip35_event_id' + `).Scan(&nip35ColumnExists) + if err != nil { + return fmt.Errorf("failed to check nip35_event_id column: %w", err) + } + + // Add nostr event ID columns if they don't exist + if !nip71ColumnExists { + _, err = b.db.Exec(`ALTER TABLE files ADD COLUMN nip71_event_id TEXT`) + if err != nil { + return fmt.Errorf("failed to add nip71_event_id column: %w", err) + } + log.Printf("Added nip71_event_id column to files table") + } + + if !nip35ColumnExists { + _, err = b.db.Exec(`ALTER TABLE files ADD COLUMN nip35_event_id TEXT`) + if err != nil { + return fmt.Errorf("failed to add nip35_event_id column: %w", err) + } + log.Printf("Added nip35_event_id column to files table") + } + + return nil +} \ No newline at end of file diff --git a/internal/storage/parallel.go b/internal/storage/parallel.go new file mode 100644 index 0000000..a280f76 --- /dev/null +++ b/internal/storage/parallel.go @@ -0,0 +1,332 @@ +package storage + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "sync" + "time" +) + +// ParallelChunkProcessor handles parallel chunk operations for improved performance +type ParallelChunkProcessor struct { + backend *Backend + workerPool chan struct{} // Semaphore for limiting concurrent operations + maxWorkers int +} + +// NewParallelChunkProcessor creates a new parallel chunk processor +func NewParallelChunkProcessor(backend *Backend, maxWorkers int) *ParallelChunkProcessor { + if maxWorkers <= 0 { + maxWorkers = runtime.NumCPU() * 2 // Default to 2x CPU cores + } + + return &ParallelChunkProcessor{ + backend: backend, + workerPool: make(chan struct{}, maxWorkers), + maxWorkers: maxWorkers, + } +} + +// ChunkResult represents the result of processing a chunk +type ChunkResult struct { + Index int + Data []byte + Error error +} + +// AssembleFileParallel reassembles a file using parallel chunk loading +func (p *ParallelChunkProcessor) AssembleFileParallel(fileHash string, writer io.Writer) error { + // Get chunk information + chunks, err := p.backend.GetFileChunks(fileHash) + if err != nil { + return fmt.Errorf("failed to get chunks: %w", err) + } + + if len(chunks) == 0 { + return fmt.Errorf("no chunks found for file %s", fileHash) + } + + // For small number of chunks, use sequential processing + if len(chunks) <= 3 { + return p.backend.AssembleFile(fileHash, writer) + } + + // Create result channels + results := make(chan ChunkResult, len(chunks)) + var wg sync.WaitGroup + + // Launch workers to fetch chunks in parallel + for i, chunk := range chunks { + wg.Add(1) + go func(index int, chunkInfo ChunkInfo) { + defer wg.Done() + + // Acquire worker slot + p.workerPool <- struct{}{} + defer func() { <-p.workerPool }() + + // Fetch chunk data + data, err := p.backend.GetChunkData(chunkInfo.ChunkHash) + results <- ChunkResult{ + Index: index, + Data: data, + Error: err, + } + }(i, chunk) + } + + // Close results channel when all workers are done + go func() { + wg.Wait() + close(results) + }() + + // Collect results in order + chunkData := make([][]byte, len(chunks)) + for result := range results { + if result.Error != nil { + return fmt.Errorf("failed to get chunk data for index %d: %w", result.Index, result.Error) + } + if result.Data == nil { + return fmt.Errorf("chunk %d not found", result.Index) + } + chunkData[result.Index] = result.Data + } + + // Write chunks in order + for i, data := range chunkData { + if _, err := writer.Write(data); err != nil { + return fmt.Errorf("failed to write chunk %d: %w", i, err) + } + } + + // Update access statistics + _, err = p.backend.db.Exec(` + UPDATE files SET access_count = access_count + 1, last_access = CURRENT_TIMESTAMP + WHERE hash = ? + `, fileHash) + if err != nil { + // Log but don't fail on stats update + fmt.Printf("Warning: failed to update access stats for %s: %v\n", fileHash, err) + } + + return nil +} + +// StoreFileParallel stores a file using parallel chunk processing +func (p *ParallelChunkProcessor) StoreFileParallel(reader io.Reader, originalName, contentType, ownerPubkey, accessLevel string) (*FileMetadata, error) { + // For small files, use regular storage + tempFile, err := createTempFileFromReader(reader) + if err != nil { + return nil, err + } + defer tempFile.cleanup() + + // If file is small, use regular storage + if tempFile.size < int64(p.maxWorkers)*p.backend.chunkSize { + return p.backend.StoreFileWithOwner(tempFile.reader(), originalName, contentType, ownerPubkey, accessLevel) + } + + // Calculate file hash + fileHash := tempFile.hash() + + // Check if file already exists + if existing, _ := p.backend.GetFileMetadata(fileHash); existing != nil { + return existing, nil + } + + // Calculate chunk count + chunkCount := int((tempFile.size + p.backend.chunkSize - 1) / p.backend.chunkSize) + chunks := make([]ChunkInfo, chunkCount) + + // Process chunks in parallel + var wg sync.WaitGroup + chunkResults := make(chan struct { + index int + chunk ChunkInfo + err error + }, chunkCount) + + for i := 0; i < chunkCount; i++ { + wg.Add(1) + go func(chunkIndex int) { + defer wg.Done() + + // Acquire worker slot + p.workerPool <- struct{}{} + defer func() { <-p.workerPool }() + + // Calculate chunk boundaries + offset := int64(chunkIndex) * p.backend.chunkSize + size := p.backend.chunkSize + if offset+size > tempFile.size { + size = tempFile.size - offset + } + + // Read chunk data + chunkData, err := tempFile.readChunk(offset, size) + if err != nil { + chunkResults <- struct { + index int + chunk ChunkInfo + err error + }{chunkIndex, ChunkInfo{}, err} + return + } + + // Store chunk + chunk, err := p.storeChunk(fileHash, chunkIndex, chunkData, offset) + chunkResults <- struct { + index int + chunk ChunkInfo + err error + }{chunkIndex, chunk, err} + }(i) + } + + // Close results channel when all workers are done + go func() { + wg.Wait() + close(chunkResults) + }() + + // Collect results + for result := range chunkResults { + if result.err != nil { + return nil, fmt.Errorf("failed to store chunk %d: %w", result.index, result.err) + } + chunks[result.index] = result.chunk + } + + // Store metadata in database + return p.storeFileMetadata(fileHash, originalName, contentType, ownerPubkey, accessLevel, tempFile.size, chunkCount, chunks) +} + +// Helper functions + +type tempFile struct { + path string + size int64 + file *os.File +} + +func createTempFileFromReader(reader io.Reader) (*tempFile, error) { + file, err := os.CreateTemp("", "parallel_upload_*") + if err != nil { + return nil, fmt.Errorf("failed to create temp file: %w", err) + } + + size, err := io.Copy(file, reader) + if err != nil { + file.Close() + os.Remove(file.Name()) + return nil, fmt.Errorf("failed to copy to temp file: %w", err) + } + + return &tempFile{ + path: file.Name(), + size: size, + file: file, + }, nil +} + +func (tf *tempFile) reader() io.Reader { + tf.file.Seek(0, io.SeekStart) + return tf.file +} + +func (tf *tempFile) hash() string { + tf.file.Seek(0, io.SeekStart) + hasher := sha256.New() + io.Copy(hasher, tf.file) + return hex.EncodeToString(hasher.Sum(nil)) +} + +func (tf *tempFile) readChunk(offset, size int64) ([]byte, error) { + data := make([]byte, size) + n, err := tf.file.ReadAt(data, offset) + if err != nil && err != io.EOF { + return nil, err + } + return data[:n], nil +} + +func (tf *tempFile) cleanup() { + if tf.file != nil { + tf.file.Close() + os.Remove(tf.path) + } +} + +func (p *ParallelChunkProcessor) storeChunk(fileHash string, chunkIndex int, data []byte, offset int64) (ChunkInfo, error) { + // Calculate chunk hash + hasher := sha256.New() + hasher.Write(data) + chunkHash := hex.EncodeToString(hasher.Sum(nil)) + + // Store chunk to disk + chunkPath := filepath.Join(p.backend.chunkDir, chunkHash) + if err := os.WriteFile(chunkPath, data, 0644); err != nil { + return ChunkInfo{}, fmt.Errorf("failed to write chunk: %w", err) + } + + return ChunkInfo{ + FileHash: fileHash, + ChunkIndex: chunkIndex, + ChunkHash: chunkHash, + Size: int64(len(data)), + Offset: offset, + }, nil +} + +func (p *ParallelChunkProcessor) storeFileMetadata(fileHash, originalName, contentType, ownerPubkey, accessLevel string, size int64, chunkCount int, chunks []ChunkInfo) (*FileMetadata, error) { + // Start transaction + tx, err := p.backend.db.Begin() + if err != nil { + return nil, fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + // Insert file metadata + _, err = tx.Exec(` + INSERT INTO files (hash, original_name, size, chunk_count, content_type, storage_type, owner_pubkey, access_level) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `, fileHash, originalName, size, chunkCount, contentType, "torrent", ownerPubkey, accessLevel) + if err != nil { + return nil, fmt.Errorf("failed to insert file metadata: %w", err) + } + + // Insert chunk metadata + for _, chunk := range chunks { + _, err = tx.Exec(` + INSERT INTO chunks (file_hash, chunk_index, chunk_hash, size, offset) + VALUES (?, ?, ?, ?, ?) + `, chunk.FileHash, chunk.ChunkIndex, chunk.ChunkHash, chunk.Size, chunk.Offset) + if err != nil { + return nil, fmt.Errorf("failed to insert chunk metadata: %w", err) + } + } + + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("failed to commit transaction: %w", err) + } + + return &FileMetadata{ + Hash: fileHash, + OriginalName: originalName, + Size: size, + ChunkCount: chunkCount, + ContentType: contentType, + StorageType: "torrent", + OwnerPubkey: ownerPubkey, + AccessLevel: accessLevel, + CreatedAt: time.Now(), + AccessCount: 0, + LastAccess: time.Now(), + }, nil +} \ No newline at end of file