Files
gibidify/cli/processor_workers.go
Ismo Vuorinen 95b7ef6dd3 chore: modernize workflows, security scanning, and linting configuration (#50)
* build: update Go 1.25, CI workflows, and build tooling

- Upgrade to Go 1.25
- Add benchmark targets to Makefile
- Implement parallel gosec execution
- Lock tool versions for reproducibility
- Add shellcheck directives to scripts
- Update CI workflows with improved caching

* refactor: migrate from golangci-lint to revive

- Replace golangci-lint with revive for linting
- Configure comprehensive revive rules
- Fix all EditorConfig violations
- Add yamllint and yamlfmt support
- Remove deprecated .golangci.yml

* refactor: rename utils to shared and deduplicate code

- Rename utils package to shared
- Add shared constants package
- Deduplicate constants across packages
- Address CodeRabbit review feedback

* fix: resolve SonarQube issues and add safety guards

- Fix all 73 SonarQube OPEN issues
- Add nil guards for resourceMonitor, backpressure, metricsCollector
- Implement io.Closer for headerFileReader
- Propagate errors from processing helpers
- Add metrics and templates packages
- Improve error handling across codebase

* test: improve test infrastructure and coverage

- Add benchmarks for cli, fileproc, metrics
- Improve test coverage for cli, fileproc, config
- Refactor tests with helper functions
- Add shared test constants
- Fix test function naming conventions
- Reduce cognitive complexity in benchmark tests

* docs: update documentation and configuration examples

- Update CLAUDE.md with current project state
- Refresh README with new features
- Add usage and configuration examples
- Add SonarQube project configuration
- Consolidate config.example.yaml

* fix: resolve shellcheck warnings in scripts

- Use ./*.go instead of *.go to prevent dash-prefixed filenames
  from being interpreted as options (SC2035)
- Remove unreachable return statement after exit (SC2317)
- Remove obsolete gibidiutils/ directory reference

* chore(deps): upgrade go dependencies

* chore(lint): megalinter fixes

* fix: improve test coverage and fix file descriptor leaks

- Add defer r.Close() to fix pipe file descriptor leaks in benchmark tests
- Refactor TestProcessorConfigureFileTypes with helper functions and assertions
- Refactor TestProcessorLogFinalStats with output capture and keyword verification
- Use shared constants instead of literal strings (TestFilePNG, FormatMarkdown, etc.)
- Reduce cognitive complexity by extracting helper functions

* fix: align test comments with function names

Remove underscores from test comments to match actual function names:
- benchmark/benchmark_test.go (2 fixes)
- fileproc/filetypes_config_test.go (4 fixes)
- fileproc/filetypes_registry_test.go (6 fixes)
- fileproc/processor_test.go (6 fixes)
- fileproc/resource_monitor_types_test.go (4 fixes)
- fileproc/writer_test.go (3 fixes)

* fix: various test improvements and bug fixes

- Remove duplicate maxCacheSize check in filetypes_registry_test.go
- Shorten long comment in processor_test.go to stay under 120 chars
- Remove flaky time.Sleep in collector_test.go, use >= 0 assertion
- Close pipe reader in benchmark_test.go to fix file descriptor leak
- Use ContinueOnError in flags_test.go to match ResetFlags behavior
- Add nil check for p.ui in processor_workers.go before UpdateProgress
- Fix resource_monitor_validation_test.go by setting hardMemoryLimitBytes directly

* chore(yaml): add missing document start markers

Add --- document start to YAML files to satisfy yamllint:
- .github/workflows/codeql.yml
- .github/workflows/build-test-publish.yml
- .github/workflows/security.yml
- .github/actions/setup/action.yml

* fix: guard nil resourceMonitor and fix test deadlock

- Guard resourceMonitor before CreateFileProcessingContext call
- Add ui.UpdateProgress on emergency stop and path error returns
- Fix potential deadlock in TestProcessFile using wg.Go with defer close
2025-12-10 19:07:11 +02:00

221 lines
5.6 KiB
Go

// Package cli provides command-line interface functionality for gibidify.
package cli
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/ivuorinen/gibidify/fileproc"
"github.com/ivuorinen/gibidify/metrics"
"github.com/ivuorinen/gibidify/shared"
)
// startWorkers starts the worker goroutines.
func (p *Processor) startWorkers(
ctx context.Context,
wg *sync.WaitGroup,
fileCh chan string,
writeCh chan fileproc.WriteRequest,
) {
for range p.flags.Concurrency {
wg.Add(1)
go p.worker(ctx, wg, fileCh, writeCh)
}
}
// worker is the worker goroutine function.
func (p *Processor) worker(
ctx context.Context,
wg *sync.WaitGroup,
fileCh chan string,
writeCh chan fileproc.WriteRequest,
) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case filePath, ok := <-fileCh:
if !ok {
return
}
p.processFile(ctx, filePath, writeCh)
}
}
}
// processFile processes a single file with resource monitoring and metrics collection.
func (p *Processor) processFile(ctx context.Context, filePath string, writeCh chan fileproc.WriteRequest) {
// Create file processing context with timeout (resourceMonitor may be nil)
fileCtx, fileCancel := ctx, func() {}
if p.resourceMonitor != nil {
fileCtx, fileCancel = p.resourceMonitor.CreateFileProcessingContext(ctx)
}
defer fileCancel()
// Track concurrency
if p.metricsCollector != nil {
p.metricsCollector.IncrementConcurrency()
defer p.metricsCollector.DecrementConcurrency()
}
// Check for emergency stop
if p.resourceMonitor != nil && p.resourceMonitor.IsEmergencyStopActive() {
logger := shared.GetLogger()
logger.Warnf("Emergency stop active, skipping file: %s", filePath)
// Record skipped file
p.recordFileResult(filePath, 0, "", false, true, "emergency stop active", nil)
if p.ui != nil {
p.ui.UpdateProgress(1)
}
return
}
absRoot, err := shared.AbsolutePath(p.flags.SourceDir)
if err != nil {
shared.LogError("Failed to get absolute path", err)
// Record error
p.recordFileResult(filePath, 0, "", false, false, "", err)
if p.ui != nil {
p.ui.UpdateProgress(1)
}
return
}
// Use the resource monitor-aware processing with metrics tracking
fileSize, format, success, processErr := p.processFileWithMetrics(fileCtx, filePath, writeCh, absRoot)
// Record the processing result (skipped=false, skipReason="" since processFileWithMetrics never skips)
p.recordFileResult(filePath, fileSize, format, success, false, "", processErr)
// Update progress bar with metrics
if p.ui != nil {
p.ui.UpdateProgress(1)
}
// Show real-time stats in verbose mode
if p.flags.Verbose && p.metricsCollector != nil {
currentMetrics := p.metricsCollector.CurrentMetrics()
if currentMetrics.ProcessedFiles%10 == 0 && p.metricsReporter != nil {
logger := shared.GetLogger()
logger.Info(p.metricsReporter.ReportProgress())
}
}
}
// sendFiles sends files to the worker channels with back-pressure handling.
func (p *Processor) sendFiles(ctx context.Context, files []string, fileCh chan string) error {
defer close(fileCh)
for _, fp := range files {
// Check if we should apply back-pressure
if p.backpressure.ShouldApplyBackpressure(ctx) {
p.backpressure.ApplyBackpressure(ctx)
}
// Wait for channel space if needed
p.backpressure.WaitForChannelSpace(ctx, fileCh, nil)
if err := shared.CheckContextCancellation(ctx, shared.CLIMsgFileProcessingWorker); err != nil {
return fmt.Errorf("context check failed: %w", err)
}
select {
case fileCh <- fp:
case <-ctx.Done():
if err := shared.CheckContextCancellation(ctx, shared.CLIMsgFileProcessingWorker); err != nil {
return fmt.Errorf("context cancellation during channel send: %w", err)
}
return errors.New("context canceled during channel send")
}
}
return nil
}
// processFileWithMetrics wraps the file processing with detailed metrics collection.
func (p *Processor) processFileWithMetrics(
ctx context.Context,
filePath string,
writeCh chan fileproc.WriteRequest,
absRoot string,
) (fileSize int64, format string, success bool, err error) {
// Get file info
fileInfo, statErr := os.Stat(filePath)
if statErr != nil {
return 0, "", false, fmt.Errorf("getting file info for %s: %w", filePath, statErr)
}
fileSize = fileInfo.Size()
// Detect format from file extension
format = filepath.Ext(filePath)
if format != "" && format[0] == '.' {
format = format[1:] // Remove the dot
}
// Use the existing resource monitor-aware processing
err = fileproc.ProcessFileWithMonitor(ctx, filePath, writeCh, absRoot, p.resourceMonitor)
// Check if processing was successful
select {
case <-ctx.Done():
return fileSize, format, false, fmt.Errorf("file processing worker canceled: %w", ctx.Err())
default:
if err != nil {
return fileSize, format, false, fmt.Errorf("processing file %s: %w", filePath, err)
}
return fileSize, format, true, nil
}
}
// recordFileResult records the result of file processing in metrics.
func (p *Processor) recordFileResult(
filePath string,
fileSize int64,
format string,
success bool,
skipped bool,
skipReason string,
err error,
) {
if p.metricsCollector == nil {
return // No metrics collector, skip recording
}
result := metrics.FileProcessingResult{
FilePath: filePath,
FileSize: fileSize,
Format: format,
Success: success,
Error: err,
Skipped: skipped,
SkipReason: skipReason,
}
p.metricsCollector.RecordFileProcessed(result)
}
// waitForCompletion waits for all workers to complete.
func (p *Processor) waitForCompletion(
wg *sync.WaitGroup,
writeCh chan fileproc.WriteRequest,
writerDone chan struct{},
) {
wg.Wait()
close(writeCh)
<-writerDone
}