package storage import ( "crypto/sha256" "encoding/hex" "fmt" "io" "os" "path/filepath" "runtime" "sync" "time" ) // ParallelChunkProcessor handles parallel chunk operations for improved performance type ParallelChunkProcessor struct { backend *Backend workerPool chan struct{} // Semaphore for limiting concurrent operations maxWorkers int } // NewParallelChunkProcessor creates a new parallel chunk processor func NewParallelChunkProcessor(backend *Backend, maxWorkers int) *ParallelChunkProcessor { if maxWorkers <= 0 { maxWorkers = runtime.NumCPU() * 2 // Default to 2x CPU cores } return &ParallelChunkProcessor{ backend: backend, workerPool: make(chan struct{}, maxWorkers), maxWorkers: maxWorkers, } } // ChunkResult represents the result of processing a chunk type ChunkResult struct { Index int Data []byte Error error } // AssembleFileParallel reassembles a file using parallel chunk loading func (p *ParallelChunkProcessor) AssembleFileParallel(fileHash string, writer io.Writer) error { // Get chunk information chunks, err := p.backend.GetFileChunks(fileHash) if err != nil { return fmt.Errorf("failed to get chunks: %w", err) } if len(chunks) == 0 { return fmt.Errorf("no chunks found for file %s", fileHash) } // For small number of chunks, use sequential processing if len(chunks) <= 3 { return p.backend.AssembleFile(fileHash, writer) } // Create result channels results := make(chan ChunkResult, len(chunks)) var wg sync.WaitGroup // Launch workers to fetch chunks in parallel for i, chunk := range chunks { wg.Add(1) go func(index int, chunkInfo ChunkInfo) { defer wg.Done() // Acquire worker slot p.workerPool <- struct{}{} defer func() { <-p.workerPool }() // Fetch chunk data data, err := p.backend.GetChunkData(chunkInfo.ChunkHash) results <- ChunkResult{ Index: index, Data: data, Error: err, } }(i, chunk) } // Close results channel when all workers are done go func() { wg.Wait() close(results) }() // Collect results in order chunkData := make([][]byte, len(chunks)) for result := range results { if result.Error != nil { return fmt.Errorf("failed to get chunk data for index %d: %w", result.Index, result.Error) } if result.Data == nil { return fmt.Errorf("chunk %d not found", result.Index) } chunkData[result.Index] = result.Data } // Write chunks in order for i, data := range chunkData { if _, err := writer.Write(data); err != nil { return fmt.Errorf("failed to write chunk %d: %w", i, err) } } // Update access statistics _, err = p.backend.db.Exec(` UPDATE files SET access_count = access_count + 1, last_access = CURRENT_TIMESTAMP WHERE hash = ? `, fileHash) if err != nil { // Log but don't fail on stats update fmt.Printf("Warning: failed to update access stats for %s: %v\n", fileHash, err) } return nil } // StoreFileParallel stores a file using parallel chunk processing func (p *ParallelChunkProcessor) StoreFileParallel(reader io.Reader, originalName, contentType, ownerPubkey, accessLevel string) (*FileMetadata, error) { // For small files, use regular storage tempFile, err := createTempFileFromReader(reader) if err != nil { return nil, err } defer tempFile.cleanup() // If file is small, use regular storage if tempFile.size < int64(p.maxWorkers)*p.backend.chunkSize { return p.backend.StoreFileWithOwner(tempFile.reader(), originalName, contentType, ownerPubkey, accessLevel) } // Calculate file hash fileHash := tempFile.hash() // Check if file already exists if existing, _ := p.backend.GetFileMetadata(fileHash); existing != nil { return existing, nil } // Calculate chunk count chunkCount := int((tempFile.size + p.backend.chunkSize - 1) / p.backend.chunkSize) chunks := make([]ChunkInfo, chunkCount) // Process chunks in parallel var wg sync.WaitGroup chunkResults := make(chan struct { index int chunk ChunkInfo err error }, chunkCount) for i := 0; i < chunkCount; i++ { wg.Add(1) go func(chunkIndex int) { defer wg.Done() // Acquire worker slot p.workerPool <- struct{}{} defer func() { <-p.workerPool }() // Calculate chunk boundaries offset := int64(chunkIndex) * p.backend.chunkSize size := p.backend.chunkSize if offset+size > tempFile.size { size = tempFile.size - offset } // Read chunk data chunkData, err := tempFile.readChunk(offset, size) if err != nil { chunkResults <- struct { index int chunk ChunkInfo err error }{chunkIndex, ChunkInfo{}, err} return } // Store chunk chunk, err := p.storeChunk(fileHash, chunkIndex, chunkData, offset) chunkResults <- struct { index int chunk ChunkInfo err error }{chunkIndex, chunk, err} }(i) } // Close results channel when all workers are done go func() { wg.Wait() close(chunkResults) }() // Collect results for result := range chunkResults { if result.err != nil { return nil, fmt.Errorf("failed to store chunk %d: %w", result.index, result.err) } chunks[result.index] = result.chunk } // Store metadata in database return p.storeFileMetadata(fileHash, originalName, contentType, ownerPubkey, accessLevel, tempFile.size, chunkCount, chunks) } // Helper functions type tempFile struct { path string size int64 file *os.File } func createTempFileFromReader(reader io.Reader) (*tempFile, error) { file, err := os.CreateTemp("", "parallel_upload_*") if err != nil { return nil, fmt.Errorf("failed to create temp file: %w", err) } size, err := io.Copy(file, reader) if err != nil { file.Close() os.Remove(file.Name()) return nil, fmt.Errorf("failed to copy to temp file: %w", err) } return &tempFile{ path: file.Name(), size: size, file: file, }, nil } func (tf *tempFile) reader() io.Reader { tf.file.Seek(0, io.SeekStart) return tf.file } func (tf *tempFile) hash() string { tf.file.Seek(0, io.SeekStart) hasher := sha256.New() io.Copy(hasher, tf.file) return hex.EncodeToString(hasher.Sum(nil)) } func (tf *tempFile) readChunk(offset, size int64) ([]byte, error) { data := make([]byte, size) n, err := tf.file.ReadAt(data, offset) if err != nil && err != io.EOF { return nil, err } return data[:n], nil } func (tf *tempFile) cleanup() { if tf.file != nil { tf.file.Close() os.Remove(tf.path) } } func (p *ParallelChunkProcessor) storeChunk(fileHash string, chunkIndex int, data []byte, offset int64) (ChunkInfo, error) { // Calculate chunk hash hasher := sha256.New() hasher.Write(data) chunkHash := hex.EncodeToString(hasher.Sum(nil)) // Store chunk to disk chunkPath := filepath.Join(p.backend.chunkDir, chunkHash) if err := os.WriteFile(chunkPath, data, 0644); err != nil { return ChunkInfo{}, fmt.Errorf("failed to write chunk: %w", err) } return ChunkInfo{ FileHash: fileHash, ChunkIndex: chunkIndex, ChunkHash: chunkHash, Size: int64(len(data)), Offset: offset, }, nil } func (p *ParallelChunkProcessor) storeFileMetadata(fileHash, originalName, contentType, ownerPubkey, accessLevel string, size int64, chunkCount int, chunks []ChunkInfo) (*FileMetadata, error) { // Start transaction tx, err := p.backend.db.Begin() if err != nil { return nil, fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback() // Insert file metadata _, err = tx.Exec(` INSERT INTO files (hash, original_name, size, chunk_count, content_type, storage_type, owner_pubkey, access_level) VALUES (?, ?, ?, ?, ?, ?, ?, ?) `, fileHash, originalName, size, chunkCount, contentType, "torrent", ownerPubkey, accessLevel) if err != nil { return nil, fmt.Errorf("failed to insert file metadata: %w", err) } // Insert chunk metadata for _, chunk := range chunks { _, err = tx.Exec(` INSERT INTO chunks (file_hash, chunk_index, chunk_hash, size, offset) VALUES (?, ?, ?, ?, ?) `, chunk.FileHash, chunk.ChunkIndex, chunk.ChunkHash, chunk.Size, chunk.Offset) if err != nil { return nil, fmt.Errorf("failed to insert chunk metadata: %w", err) } } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("failed to commit transaction: %w", err) } return &FileMetadata{ Hash: fileHash, OriginalName: originalName, Size: size, ChunkCount: chunkCount, ContentType: contentType, StorageType: "torrent", OwnerPubkey: ownerPubkey, AccessLevel: accessLevel, CreatedAt: time.Now(), AccessCount: 0, LastAccess: time.Now(), }, nil }