mirror of
https://github.com/ivuorinen/gibidify.git
synced 2026-01-26 03:24:05 +00:00
chore: tweaks, simplification, tests
This commit is contained in:
314
cli/processor.go
314
cli/processor.go
@@ -1,314 +0,0 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/ivuorinen/gibidify/config"
|
||||
"github.com/ivuorinen/gibidify/fileproc"
|
||||
"github.com/ivuorinen/gibidify/utils"
|
||||
)
|
||||
|
||||
// Processor handles the main file processing logic.
|
||||
type Processor struct {
|
||||
flags *Flags
|
||||
backpressure *fileproc.BackpressureManager
|
||||
resourceMonitor *fileproc.ResourceMonitor
|
||||
ui *UIManager
|
||||
}
|
||||
|
||||
// NewProcessor creates a new processor with the given flags.
|
||||
func NewProcessor(flags *Flags) *Processor {
|
||||
ui := NewUIManager()
|
||||
|
||||
// Configure UI based on flags
|
||||
ui.SetColorOutput(!flags.NoColors)
|
||||
ui.SetProgressOutput(!flags.NoProgress)
|
||||
|
||||
return &Processor{
|
||||
flags: flags,
|
||||
backpressure: fileproc.NewBackpressureManager(),
|
||||
resourceMonitor: fileproc.NewResourceMonitor(),
|
||||
ui: ui,
|
||||
}
|
||||
}
|
||||
|
||||
// Process executes the main file processing workflow.
|
||||
func (p *Processor) Process(ctx context.Context) error {
|
||||
// Create overall processing context with timeout
|
||||
overallCtx, overallCancel := p.resourceMonitor.CreateOverallProcessingContext(ctx)
|
||||
defer overallCancel()
|
||||
|
||||
// Configure file type registry
|
||||
p.configureFileTypes()
|
||||
|
||||
// Print startup info with colors
|
||||
p.ui.PrintHeader("🚀 Starting gibidify")
|
||||
p.ui.PrintInfo("Format: %s", p.flags.Format)
|
||||
p.ui.PrintInfo("Source: %s", p.flags.SourceDir)
|
||||
p.ui.PrintInfo("Destination: %s", p.flags.Destination)
|
||||
p.ui.PrintInfo("Workers: %d", p.flags.Concurrency)
|
||||
|
||||
// Log resource monitoring configuration
|
||||
p.resourceMonitor.LogResourceInfo()
|
||||
p.backpressure.LogBackpressureInfo()
|
||||
|
||||
// Collect files with progress indication
|
||||
p.ui.PrintInfo("📁 Collecting files...")
|
||||
files, err := p.collectFiles()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Show collection results
|
||||
p.ui.PrintSuccess("Found %d files to process", len(files))
|
||||
|
||||
// Pre-validate file collection against resource limits
|
||||
if err := p.validateFileCollection(files); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Process files with overall timeout
|
||||
return p.processFiles(overallCtx, files)
|
||||
}
|
||||
|
||||
// configureFileTypes configures the file type registry.
|
||||
func (p *Processor) configureFileTypes() {
|
||||
if config.GetFileTypesEnabled() {
|
||||
fileproc.ConfigureFromSettings(
|
||||
config.GetCustomImageExtensions(),
|
||||
config.GetCustomBinaryExtensions(),
|
||||
config.GetCustomLanguages(),
|
||||
config.GetDisabledImageExtensions(),
|
||||
config.GetDisabledBinaryExtensions(),
|
||||
config.GetDisabledLanguageExtensions(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// collectFiles collects all files to be processed.
|
||||
func (p *Processor) collectFiles() ([]string, error) {
|
||||
files, err := fileproc.CollectFiles(p.flags.SourceDir)
|
||||
if err != nil {
|
||||
return nil, utils.WrapError(err, utils.ErrorTypeProcessing, utils.CodeProcessingCollection, "error collecting files")
|
||||
}
|
||||
logrus.Infof("Found %d files to process", len(files))
|
||||
return files, nil
|
||||
}
|
||||
|
||||
// validateFileCollection validates the collected files against resource limits.
|
||||
func (p *Processor) validateFileCollection(files []string) error {
|
||||
if !config.GetResourceLimitsEnabled() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check file count limit
|
||||
maxFiles := config.GetMaxFiles()
|
||||
if len(files) > maxFiles {
|
||||
return utils.NewStructuredError(
|
||||
utils.ErrorTypeValidation,
|
||||
utils.CodeResourceLimitFiles,
|
||||
fmt.Sprintf("file count (%d) exceeds maximum limit (%d)", len(files), maxFiles),
|
||||
"",
|
||||
map[string]interface{}{
|
||||
"file_count": len(files),
|
||||
"max_files": maxFiles,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// Check total size limit (estimate)
|
||||
maxTotalSize := config.GetMaxTotalSize()
|
||||
totalSize := int64(0)
|
||||
oversizedFiles := 0
|
||||
|
||||
for _, filePath := range files {
|
||||
if fileInfo, err := os.Stat(filePath); err == nil {
|
||||
totalSize += fileInfo.Size()
|
||||
if totalSize > maxTotalSize {
|
||||
return utils.NewStructuredError(
|
||||
utils.ErrorTypeValidation,
|
||||
utils.CodeResourceLimitTotalSize,
|
||||
fmt.Sprintf("total file size (%d bytes) would exceed maximum limit (%d bytes)", totalSize, maxTotalSize),
|
||||
"",
|
||||
map[string]interface{}{
|
||||
"total_size": totalSize,
|
||||
"max_total_size": maxTotalSize,
|
||||
"files_checked": len(files),
|
||||
},
|
||||
)
|
||||
}
|
||||
} else {
|
||||
oversizedFiles++
|
||||
}
|
||||
}
|
||||
|
||||
if oversizedFiles > 0 {
|
||||
logrus.Warnf("Could not stat %d files during pre-validation", oversizedFiles)
|
||||
}
|
||||
|
||||
logrus.Infof("Pre-validation passed: %d files, %d MB total", len(files), totalSize/1024/1024)
|
||||
return nil
|
||||
}
|
||||
|
||||
// processFiles processes the collected files.
|
||||
func (p *Processor) processFiles(ctx context.Context, files []string) error {
|
||||
outFile, err := p.createOutputFile()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
utils.LogError("Error closing output file", outFile.Close())
|
||||
}()
|
||||
|
||||
// Initialize back-pressure and channels
|
||||
p.ui.PrintInfo("⚙️ Initializing processing...")
|
||||
p.backpressure.LogBackpressureInfo()
|
||||
fileCh, writeCh := p.backpressure.CreateChannels()
|
||||
writerDone := make(chan struct{})
|
||||
|
||||
// Start writer
|
||||
go fileproc.StartWriter(outFile, writeCh, writerDone, p.flags.Format, p.flags.Prefix, p.flags.Suffix)
|
||||
|
||||
// Start workers
|
||||
var wg sync.WaitGroup
|
||||
p.startWorkers(ctx, &wg, fileCh, writeCh)
|
||||
|
||||
// Start progress bar
|
||||
p.ui.StartProgress(len(files), "📝 Processing files")
|
||||
|
||||
// Send files to workers
|
||||
if err := p.sendFiles(ctx, files, fileCh); err != nil {
|
||||
p.ui.FinishProgress()
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait for completion
|
||||
p.waitForCompletion(&wg, writeCh, writerDone)
|
||||
p.ui.FinishProgress()
|
||||
|
||||
p.logFinalStats()
|
||||
p.ui.PrintSuccess("Processing completed. Output saved to %s", p.flags.Destination)
|
||||
return nil
|
||||
}
|
||||
|
||||
// createOutputFile creates the output file.
|
||||
func (p *Processor) createOutputFile() (*os.File, error) {
|
||||
// Destination path has been validated in CLI flags validation for path traversal attempts
|
||||
outFile, err := os.Create(p.flags.Destination) // #nosec G304 - destination is validated in flags.validate()
|
||||
if err != nil {
|
||||
return nil, utils.WrapError(err, utils.ErrorTypeIO, utils.CodeIOFileCreate, "failed to create output file").WithFilePath(p.flags.Destination)
|
||||
}
|
||||
return outFile, nil
|
||||
}
|
||||
|
||||
// startWorkers starts the worker goroutines.
|
||||
func (p *Processor) startWorkers(ctx context.Context, wg *sync.WaitGroup, fileCh chan string, writeCh chan fileproc.WriteRequest) {
|
||||
for range p.flags.Concurrency {
|
||||
wg.Add(1)
|
||||
go p.worker(ctx, wg, fileCh, writeCh)
|
||||
}
|
||||
}
|
||||
|
||||
// worker is the worker goroutine function.
|
||||
func (p *Processor) worker(ctx context.Context, wg *sync.WaitGroup, fileCh chan string, writeCh chan fileproc.WriteRequest) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case filePath, ok := <-fileCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
p.processFile(ctx, filePath, writeCh)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processFile processes a single file with resource monitoring.
|
||||
func (p *Processor) processFile(ctx context.Context, filePath string, writeCh chan fileproc.WriteRequest) {
|
||||
// Check for emergency stop
|
||||
if p.resourceMonitor.IsEmergencyStopActive() {
|
||||
logrus.Warnf("Emergency stop active, skipping file: %s", filePath)
|
||||
return
|
||||
}
|
||||
|
||||
absRoot, err := utils.GetAbsolutePath(p.flags.SourceDir)
|
||||
if err != nil {
|
||||
utils.LogError("Failed to get absolute path", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Use the resource monitor-aware processing
|
||||
fileproc.ProcessFileWithMonitor(ctx, filePath, writeCh, absRoot, p.resourceMonitor)
|
||||
|
||||
// Update progress bar
|
||||
p.ui.UpdateProgress(1)
|
||||
}
|
||||
|
||||
// sendFiles sends files to the worker channels with back-pressure handling.
|
||||
func (p *Processor) sendFiles(ctx context.Context, files []string, fileCh chan string) error {
|
||||
defer close(fileCh)
|
||||
|
||||
for _, fp := range files {
|
||||
// Check if we should apply back-pressure
|
||||
if p.backpressure.ShouldApplyBackpressure(ctx) {
|
||||
p.backpressure.ApplyBackpressure(ctx)
|
||||
}
|
||||
|
||||
// Wait for channel space if needed
|
||||
p.backpressure.WaitForChannelSpace(ctx, fileCh, nil)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case fileCh <- fp:
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForCompletion waits for all workers to complete.
|
||||
func (p *Processor) waitForCompletion(wg *sync.WaitGroup, writeCh chan fileproc.WriteRequest, writerDone chan struct{}) {
|
||||
wg.Wait()
|
||||
close(writeCh)
|
||||
<-writerDone
|
||||
}
|
||||
|
||||
// logFinalStats logs the final back-pressure and resource monitoring statistics.
|
||||
func (p *Processor) logFinalStats() {
|
||||
// Log back-pressure stats
|
||||
backpressureStats := p.backpressure.GetStats()
|
||||
if backpressureStats.Enabled {
|
||||
logrus.Infof("Back-pressure stats: processed=%d files, memory=%dMB/%dMB",
|
||||
backpressureStats.FilesProcessed, backpressureStats.CurrentMemoryUsage/1024/1024, backpressureStats.MaxMemoryUsage/1024/1024)
|
||||
}
|
||||
|
||||
// Log resource monitoring stats
|
||||
resourceStats := p.resourceMonitor.GetMetrics()
|
||||
if config.GetResourceLimitsEnabled() {
|
||||
logrus.Infof("Resource stats: processed=%d files, totalSize=%dMB, avgFileSize=%.2fKB, rate=%.2f files/sec",
|
||||
resourceStats.FilesProcessed, resourceStats.TotalSizeProcessed/1024/1024,
|
||||
resourceStats.AverageFileSize/1024, resourceStats.ProcessingRate)
|
||||
|
||||
if len(resourceStats.ViolationsDetected) > 0 {
|
||||
logrus.Warnf("Resource violations detected: %v", resourceStats.ViolationsDetected)
|
||||
}
|
||||
|
||||
if resourceStats.DegradationActive {
|
||||
logrus.Warnf("Processing completed with degradation mode active")
|
||||
}
|
||||
|
||||
if resourceStats.EmergencyStopActive {
|
||||
logrus.Errorf("Processing completed with emergency stop active")
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up resource monitor
|
||||
p.resourceMonitor.Close()
|
||||
}
|
||||
77
cli/processor_collection.go
Normal file
77
cli/processor_collection.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/ivuorinen/gibidify/config"
|
||||
"github.com/ivuorinen/gibidify/fileproc"
|
||||
"github.com/ivuorinen/gibidify/utils"
|
||||
)
|
||||
|
||||
// collectFiles collects all files to be processed.
|
||||
func (p *Processor) collectFiles() ([]string, error) {
|
||||
files, err := fileproc.CollectFiles(p.flags.SourceDir)
|
||||
if err != nil {
|
||||
return nil, utils.WrapError(err, utils.ErrorTypeProcessing, utils.CodeProcessingCollection, "error collecting files")
|
||||
}
|
||||
logrus.Infof("Found %d files to process", len(files))
|
||||
return files, nil
|
||||
}
|
||||
|
||||
// validateFileCollection validates the collected files against resource limits.
|
||||
func (p *Processor) validateFileCollection(files []string) error {
|
||||
if !config.GetResourceLimitsEnabled() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check file count limit
|
||||
maxFiles := config.GetMaxFiles()
|
||||
if len(files) > maxFiles {
|
||||
return utils.NewStructuredError(
|
||||
utils.ErrorTypeValidation,
|
||||
utils.CodeResourceLimitFiles,
|
||||
fmt.Sprintf("file count (%d) exceeds maximum limit (%d)", len(files), maxFiles),
|
||||
"",
|
||||
map[string]interface{}{
|
||||
"file_count": len(files),
|
||||
"max_files": maxFiles,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// Check total size limit (estimate)
|
||||
maxTotalSize := config.GetMaxTotalSize()
|
||||
totalSize := int64(0)
|
||||
oversizedFiles := 0
|
||||
|
||||
for _, filePath := range files {
|
||||
if fileInfo, err := os.Stat(filePath); err == nil {
|
||||
totalSize += fileInfo.Size()
|
||||
if totalSize > maxTotalSize {
|
||||
return utils.NewStructuredError(
|
||||
utils.ErrorTypeValidation,
|
||||
utils.CodeResourceLimitTotalSize,
|
||||
fmt.Sprintf("total file size (%d bytes) would exceed maximum limit (%d bytes)", totalSize, maxTotalSize),
|
||||
"",
|
||||
map[string]interface{}{
|
||||
"total_size": totalSize,
|
||||
"max_total_size": maxTotalSize,
|
||||
"files_checked": len(files),
|
||||
},
|
||||
)
|
||||
}
|
||||
} else {
|
||||
oversizedFiles++
|
||||
}
|
||||
}
|
||||
|
||||
if oversizedFiles > 0 {
|
||||
logrus.Warnf("Could not stat %d files during pre-validation", oversizedFiles)
|
||||
}
|
||||
|
||||
logrus.Infof("Pre-validation passed: %d files, %d MB total", len(files), totalSize/1024/1024)
|
||||
return nil
|
||||
}
|
||||
100
cli/processor_processing.go
Normal file
100
cli/processor_processing.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/ivuorinen/gibidify/fileproc"
|
||||
"github.com/ivuorinen/gibidify/utils"
|
||||
)
|
||||
|
||||
// Process executes the main file processing workflow.
|
||||
func (p *Processor) Process(ctx context.Context) error {
|
||||
// Create overall processing context with timeout
|
||||
overallCtx, overallCancel := p.resourceMonitor.CreateOverallProcessingContext(ctx)
|
||||
defer overallCancel()
|
||||
|
||||
// Configure file type registry
|
||||
p.configureFileTypes()
|
||||
|
||||
// Print startup info with colors
|
||||
p.ui.PrintHeader("🚀 Starting gibidify")
|
||||
p.ui.PrintInfo("Format: %s", p.flags.Format)
|
||||
p.ui.PrintInfo("Source: %s", p.flags.SourceDir)
|
||||
p.ui.PrintInfo("Destination: %s", p.flags.Destination)
|
||||
p.ui.PrintInfo("Workers: %d", p.flags.Concurrency)
|
||||
|
||||
// Log resource monitoring configuration
|
||||
p.resourceMonitor.LogResourceInfo()
|
||||
p.backpressure.LogBackpressureInfo()
|
||||
|
||||
// Collect files with progress indication
|
||||
p.ui.PrintInfo("📁 Collecting files...")
|
||||
files, err := p.collectFiles()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Show collection results
|
||||
p.ui.PrintSuccess("Found %d files to process", len(files))
|
||||
|
||||
// Pre-validate file collection against resource limits
|
||||
if err := p.validateFileCollection(files); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Process files with overall timeout
|
||||
return p.processFiles(overallCtx, files)
|
||||
}
|
||||
|
||||
// processFiles processes the collected files.
|
||||
func (p *Processor) processFiles(ctx context.Context, files []string) error {
|
||||
outFile, err := p.createOutputFile()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
utils.LogError("Error closing output file", outFile.Close())
|
||||
}()
|
||||
|
||||
// Initialize back-pressure and channels
|
||||
p.ui.PrintInfo("⚙️ Initializing processing...")
|
||||
p.backpressure.LogBackpressureInfo()
|
||||
fileCh, writeCh := p.backpressure.CreateChannels()
|
||||
writerDone := make(chan struct{})
|
||||
|
||||
// Start writer
|
||||
go fileproc.StartWriter(outFile, writeCh, writerDone, p.flags.Format, p.flags.Prefix, p.flags.Suffix)
|
||||
|
||||
// Start workers
|
||||
var wg sync.WaitGroup
|
||||
p.startWorkers(ctx, &wg, fileCh, writeCh)
|
||||
|
||||
// Start progress bar
|
||||
p.ui.StartProgress(len(files), "📝 Processing files")
|
||||
|
||||
// Send files to workers
|
||||
if err := p.sendFiles(ctx, files, fileCh); err != nil {
|
||||
p.ui.FinishProgress()
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait for completion
|
||||
p.waitForCompletion(&wg, writeCh, writerDone)
|
||||
p.ui.FinishProgress()
|
||||
|
||||
p.logFinalStats()
|
||||
p.ui.PrintSuccess("Processing completed. Output saved to %s", p.flags.Destination)
|
||||
return nil
|
||||
}
|
||||
|
||||
// createOutputFile creates the output file.
|
||||
func (p *Processor) createOutputFile() (*os.File, error) {
|
||||
// Destination path has been validated in CLI flags validation for path traversal attempts
|
||||
outFile, err := os.Create(p.flags.Destination) // #nosec G304 - destination is validated in flags.validate()
|
||||
if err != nil {
|
||||
return nil, utils.WrapError(err, utils.ErrorTypeIO, utils.CodeIOFileCreate, "failed to create output file").WithFilePath(p.flags.Destination)
|
||||
}
|
||||
return outFile, nil
|
||||
}
|
||||
40
cli/processor_stats.go
Normal file
40
cli/processor_stats.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/ivuorinen/gibidify/config"
|
||||
)
|
||||
|
||||
// logFinalStats logs the final back-pressure and resource monitoring statistics.
|
||||
func (p *Processor) logFinalStats() {
|
||||
// Log back-pressure stats
|
||||
backpressureStats := p.backpressure.GetStats()
|
||||
if backpressureStats.Enabled {
|
||||
logrus.Infof("Back-pressure stats: processed=%d files, memory=%dMB/%dMB",
|
||||
backpressureStats.FilesProcessed, backpressureStats.CurrentMemoryUsage/1024/1024, backpressureStats.MaxMemoryUsage/1024/1024)
|
||||
}
|
||||
|
||||
// Log resource monitoring stats
|
||||
resourceStats := p.resourceMonitor.GetMetrics()
|
||||
if config.GetResourceLimitsEnabled() {
|
||||
logrus.Infof("Resource stats: processed=%d files, totalSize=%dMB, avgFileSize=%.2fKB, rate=%.2f files/sec",
|
||||
resourceStats.FilesProcessed, resourceStats.TotalSizeProcessed/1024/1024,
|
||||
resourceStats.AverageFileSize/1024, resourceStats.ProcessingRate)
|
||||
|
||||
if len(resourceStats.ViolationsDetected) > 0 {
|
||||
logrus.Warnf("Resource violations detected: %v", resourceStats.ViolationsDetected)
|
||||
}
|
||||
|
||||
if resourceStats.DegradationActive {
|
||||
logrus.Warnf("Processing completed with degradation mode active")
|
||||
}
|
||||
|
||||
if resourceStats.EmergencyStopActive {
|
||||
logrus.Errorf("Processing completed with emergency stop active")
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up resource monitor
|
||||
p.resourceMonitor.Close()
|
||||
}
|
||||
44
cli/processor_types.go
Normal file
44
cli/processor_types.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"github.com/ivuorinen/gibidify/config"
|
||||
"github.com/ivuorinen/gibidify/fileproc"
|
||||
)
|
||||
|
||||
// Processor handles the main file processing logic.
|
||||
type Processor struct {
|
||||
flags *Flags
|
||||
backpressure *fileproc.BackpressureManager
|
||||
resourceMonitor *fileproc.ResourceMonitor
|
||||
ui *UIManager
|
||||
}
|
||||
|
||||
// NewProcessor creates a new processor with the given flags.
|
||||
func NewProcessor(flags *Flags) *Processor {
|
||||
ui := NewUIManager()
|
||||
|
||||
// Configure UI based on flags
|
||||
ui.SetColorOutput(!flags.NoColors)
|
||||
ui.SetProgressOutput(!flags.NoProgress)
|
||||
|
||||
return &Processor{
|
||||
flags: flags,
|
||||
backpressure: fileproc.NewBackpressureManager(),
|
||||
resourceMonitor: fileproc.NewResourceMonitor(),
|
||||
ui: ui,
|
||||
}
|
||||
}
|
||||
|
||||
// configureFileTypes configures the file type registry.
|
||||
func (p *Processor) configureFileTypes() {
|
||||
if config.GetFileTypesEnabled() {
|
||||
fileproc.ConfigureFromSettings(
|
||||
config.GetCustomImageExtensions(),
|
||||
config.GetCustomBinaryExtensions(),
|
||||
config.GetCustomLanguages(),
|
||||
config.GetDisabledImageExtensions(),
|
||||
config.GetDisabledBinaryExtensions(),
|
||||
config.GetDisabledLanguageExtensions(),
|
||||
)
|
||||
}
|
||||
}
|
||||
85
cli/processor_workers.go
Normal file
85
cli/processor_workers.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/ivuorinen/gibidify/fileproc"
|
||||
"github.com/ivuorinen/gibidify/utils"
|
||||
)
|
||||
|
||||
// startWorkers starts the worker goroutines.
|
||||
func (p *Processor) startWorkers(ctx context.Context, wg *sync.WaitGroup, fileCh chan string, writeCh chan fileproc.WriteRequest) {
|
||||
for range p.flags.Concurrency {
|
||||
wg.Add(1)
|
||||
go p.worker(ctx, wg, fileCh, writeCh)
|
||||
}
|
||||
}
|
||||
|
||||
// worker is the worker goroutine function.
|
||||
func (p *Processor) worker(ctx context.Context, wg *sync.WaitGroup, fileCh chan string, writeCh chan fileproc.WriteRequest) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case filePath, ok := <-fileCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
p.processFile(ctx, filePath, writeCh)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processFile processes a single file with resource monitoring.
|
||||
func (p *Processor) processFile(ctx context.Context, filePath string, writeCh chan fileproc.WriteRequest) {
|
||||
// Check for emergency stop
|
||||
if p.resourceMonitor.IsEmergencyStopActive() {
|
||||
logrus.Warnf("Emergency stop active, skipping file: %s", filePath)
|
||||
return
|
||||
}
|
||||
|
||||
absRoot, err := utils.GetAbsolutePath(p.flags.SourceDir)
|
||||
if err != nil {
|
||||
utils.LogError("Failed to get absolute path", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Use the resource monitor-aware processing
|
||||
fileproc.ProcessFileWithMonitor(ctx, filePath, writeCh, absRoot, p.resourceMonitor)
|
||||
|
||||
// Update progress bar
|
||||
p.ui.UpdateProgress(1)
|
||||
}
|
||||
|
||||
// sendFiles sends files to the worker channels with back-pressure handling.
|
||||
func (p *Processor) sendFiles(ctx context.Context, files []string, fileCh chan string) error {
|
||||
defer close(fileCh)
|
||||
|
||||
for _, fp := range files {
|
||||
// Check if we should apply back-pressure
|
||||
if p.backpressure.ShouldApplyBackpressure(ctx) {
|
||||
p.backpressure.ApplyBackpressure(ctx)
|
||||
}
|
||||
|
||||
// Wait for channel space if needed
|
||||
p.backpressure.WaitForChannelSpace(ctx, fileCh, nil)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case fileCh <- fp:
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForCompletion waits for all workers to complete.
|
||||
func (p *Processor) waitForCompletion(wg *sync.WaitGroup, writeCh chan fileproc.WriteRequest, writerDone chan struct{}) {
|
||||
wg.Wait()
|
||||
close(writeCh)
|
||||
<-writerDone
|
||||
}
|
||||
Reference in New Issue
Block a user