Add missing internal/storage directory
Some checks are pending
CI Pipeline / Run Tests (push) Waiting to run
CI Pipeline / Lint Code (push) Waiting to run
CI Pipeline / Security Scan (push) Waiting to run
CI Pipeline / E2E Tests (push) Blocked by required conditions

This commit is contained in:
Enki 2025-08-27 15:49:49 -07:00
parent 13c50cce17
commit 4171b5a48f
2 changed files with 1402 additions and 0 deletions

1070
internal/storage/backend.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,332 @@
package storage
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"sync"
"time"
)
// ParallelChunkProcessor handles parallel chunk operations for improved performance
type ParallelChunkProcessor struct {
backend *Backend
workerPool chan struct{} // Semaphore for limiting concurrent operations
maxWorkers int
}
// NewParallelChunkProcessor creates a new parallel chunk processor
func NewParallelChunkProcessor(backend *Backend, maxWorkers int) *ParallelChunkProcessor {
if maxWorkers <= 0 {
maxWorkers = runtime.NumCPU() * 2 // Default to 2x CPU cores
}
return &ParallelChunkProcessor{
backend: backend,
workerPool: make(chan struct{}, maxWorkers),
maxWorkers: maxWorkers,
}
}
// ChunkResult represents the result of processing a chunk
type ChunkResult struct {
Index int
Data []byte
Error error
}
// AssembleFileParallel reassembles a file using parallel chunk loading
func (p *ParallelChunkProcessor) AssembleFileParallel(fileHash string, writer io.Writer) error {
// Get chunk information
chunks, err := p.backend.GetFileChunks(fileHash)
if err != nil {
return fmt.Errorf("failed to get chunks: %w", err)
}
if len(chunks) == 0 {
return fmt.Errorf("no chunks found for file %s", fileHash)
}
// For small number of chunks, use sequential processing
if len(chunks) <= 3 {
return p.backend.AssembleFile(fileHash, writer)
}
// Create result channels
results := make(chan ChunkResult, len(chunks))
var wg sync.WaitGroup
// Launch workers to fetch chunks in parallel
for i, chunk := range chunks {
wg.Add(1)
go func(index int, chunkInfo ChunkInfo) {
defer wg.Done()
// Acquire worker slot
p.workerPool <- struct{}{}
defer func() { <-p.workerPool }()
// Fetch chunk data
data, err := p.backend.GetChunkData(chunkInfo.ChunkHash)
results <- ChunkResult{
Index: index,
Data: data,
Error: err,
}
}(i, chunk)
}
// Close results channel when all workers are done
go func() {
wg.Wait()
close(results)
}()
// Collect results in order
chunkData := make([][]byte, len(chunks))
for result := range results {
if result.Error != nil {
return fmt.Errorf("failed to get chunk data for index %d: %w", result.Index, result.Error)
}
if result.Data == nil {
return fmt.Errorf("chunk %d not found", result.Index)
}
chunkData[result.Index] = result.Data
}
// Write chunks in order
for i, data := range chunkData {
if _, err := writer.Write(data); err != nil {
return fmt.Errorf("failed to write chunk %d: %w", i, err)
}
}
// Update access statistics
_, err = p.backend.db.Exec(`
UPDATE files SET access_count = access_count + 1, last_access = CURRENT_TIMESTAMP
WHERE hash = ?
`, fileHash)
if err != nil {
// Log but don't fail on stats update
fmt.Printf("Warning: failed to update access stats for %s: %v\n", fileHash, err)
}
return nil
}
// StoreFileParallel stores a file using parallel chunk processing
func (p *ParallelChunkProcessor) StoreFileParallel(reader io.Reader, originalName, contentType, ownerPubkey, accessLevel string) (*FileMetadata, error) {
// For small files, use regular storage
tempFile, err := createTempFileFromReader(reader)
if err != nil {
return nil, err
}
defer tempFile.cleanup()
// If file is small, use regular storage
if tempFile.size < int64(p.maxWorkers)*p.backend.chunkSize {
return p.backend.StoreFileWithOwner(tempFile.reader(), originalName, contentType, ownerPubkey, accessLevel)
}
// Calculate file hash
fileHash := tempFile.hash()
// Check if file already exists
if existing, _ := p.backend.GetFileMetadata(fileHash); existing != nil {
return existing, nil
}
// Calculate chunk count
chunkCount := int((tempFile.size + p.backend.chunkSize - 1) / p.backend.chunkSize)
chunks := make([]ChunkInfo, chunkCount)
// Process chunks in parallel
var wg sync.WaitGroup
chunkResults := make(chan struct {
index int
chunk ChunkInfo
err error
}, chunkCount)
for i := 0; i < chunkCount; i++ {
wg.Add(1)
go func(chunkIndex int) {
defer wg.Done()
// Acquire worker slot
p.workerPool <- struct{}{}
defer func() { <-p.workerPool }()
// Calculate chunk boundaries
offset := int64(chunkIndex) * p.backend.chunkSize
size := p.backend.chunkSize
if offset+size > tempFile.size {
size = tempFile.size - offset
}
// Read chunk data
chunkData, err := tempFile.readChunk(offset, size)
if err != nil {
chunkResults <- struct {
index int
chunk ChunkInfo
err error
}{chunkIndex, ChunkInfo{}, err}
return
}
// Store chunk
chunk, err := p.storeChunk(fileHash, chunkIndex, chunkData, offset)
chunkResults <- struct {
index int
chunk ChunkInfo
err error
}{chunkIndex, chunk, err}
}(i)
}
// Close results channel when all workers are done
go func() {
wg.Wait()
close(chunkResults)
}()
// Collect results
for result := range chunkResults {
if result.err != nil {
return nil, fmt.Errorf("failed to store chunk %d: %w", result.index, result.err)
}
chunks[result.index] = result.chunk
}
// Store metadata in database
return p.storeFileMetadata(fileHash, originalName, contentType, ownerPubkey, accessLevel, tempFile.size, chunkCount, chunks)
}
// Helper functions
type tempFile struct {
path string
size int64
file *os.File
}
func createTempFileFromReader(reader io.Reader) (*tempFile, error) {
file, err := os.CreateTemp("", "parallel_upload_*")
if err != nil {
return nil, fmt.Errorf("failed to create temp file: %w", err)
}
size, err := io.Copy(file, reader)
if err != nil {
file.Close()
os.Remove(file.Name())
return nil, fmt.Errorf("failed to copy to temp file: %w", err)
}
return &tempFile{
path: file.Name(),
size: size,
file: file,
}, nil
}
func (tf *tempFile) reader() io.Reader {
tf.file.Seek(0, io.SeekStart)
return tf.file
}
func (tf *tempFile) hash() string {
tf.file.Seek(0, io.SeekStart)
hasher := sha256.New()
io.Copy(hasher, tf.file)
return hex.EncodeToString(hasher.Sum(nil))
}
func (tf *tempFile) readChunk(offset, size int64) ([]byte, error) {
data := make([]byte, size)
n, err := tf.file.ReadAt(data, offset)
if err != nil && err != io.EOF {
return nil, err
}
return data[:n], nil
}
func (tf *tempFile) cleanup() {
if tf.file != nil {
tf.file.Close()
os.Remove(tf.path)
}
}
func (p *ParallelChunkProcessor) storeChunk(fileHash string, chunkIndex int, data []byte, offset int64) (ChunkInfo, error) {
// Calculate chunk hash
hasher := sha256.New()
hasher.Write(data)
chunkHash := hex.EncodeToString(hasher.Sum(nil))
// Store chunk to disk
chunkPath := filepath.Join(p.backend.chunkDir, chunkHash)
if err := os.WriteFile(chunkPath, data, 0644); err != nil {
return ChunkInfo{}, fmt.Errorf("failed to write chunk: %w", err)
}
return ChunkInfo{
FileHash: fileHash,
ChunkIndex: chunkIndex,
ChunkHash: chunkHash,
Size: int64(len(data)),
Offset: offset,
}, nil
}
func (p *ParallelChunkProcessor) storeFileMetadata(fileHash, originalName, contentType, ownerPubkey, accessLevel string, size int64, chunkCount int, chunks []ChunkInfo) (*FileMetadata, error) {
// Start transaction
tx, err := p.backend.db.Begin()
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
// Insert file metadata
_, err = tx.Exec(`
INSERT INTO files (hash, original_name, size, chunk_count, content_type, storage_type, owner_pubkey, access_level)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`, fileHash, originalName, size, chunkCount, contentType, "torrent", ownerPubkey, accessLevel)
if err != nil {
return nil, fmt.Errorf("failed to insert file metadata: %w", err)
}
// Insert chunk metadata
for _, chunk := range chunks {
_, err = tx.Exec(`
INSERT INTO chunks (file_hash, chunk_index, chunk_hash, size, offset)
VALUES (?, ?, ?, ?, ?)
`, chunk.FileHash, chunk.ChunkIndex, chunk.ChunkHash, chunk.Size, chunk.Offset)
if err != nil {
return nil, fmt.Errorf("failed to insert chunk metadata: %w", err)
}
}
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("failed to commit transaction: %w", err)
}
return &FileMetadata{
Hash: fileHash,
OriginalName: originalName,
Size: size,
ChunkCount: chunkCount,
ContentType: contentType,
StorageType: "torrent",
OwnerPubkey: ownerPubkey,
AccessLevel: accessLevel,
CreatedAt: time.Now(),
AccessCount: 0,
LastAccess: time.Now(),
}, nil
}