Files
gibidify/fileproc/backpressure.go
Ismo Vuorinen 95b7ef6dd3 chore: modernize workflows, security scanning, and linting configuration (#50)
* build: update Go 1.25, CI workflows, and build tooling

- Upgrade to Go 1.25
- Add benchmark targets to Makefile
- Implement parallel gosec execution
- Lock tool versions for reproducibility
- Add shellcheck directives to scripts
- Update CI workflows with improved caching

* refactor: migrate from golangci-lint to revive

- Replace golangci-lint with revive for linting
- Configure comprehensive revive rules
- Fix all EditorConfig violations
- Add yamllint and yamlfmt support
- Remove deprecated .golangci.yml

* refactor: rename utils to shared and deduplicate code

- Rename utils package to shared
- Add shared constants package
- Deduplicate constants across packages
- Address CodeRabbit review feedback

* fix: resolve SonarQube issues and add safety guards

- Fix all 73 SonarQube OPEN issues
- Add nil guards for resourceMonitor, backpressure, metricsCollector
- Implement io.Closer for headerFileReader
- Propagate errors from processing helpers
- Add metrics and templates packages
- Improve error handling across codebase

* test: improve test infrastructure and coverage

- Add benchmarks for cli, fileproc, metrics
- Improve test coverage for cli, fileproc, config
- Refactor tests with helper functions
- Add shared test constants
- Fix test function naming conventions
- Reduce cognitive complexity in benchmark tests

* docs: update documentation and configuration examples

- Update CLAUDE.md with current project state
- Refresh README with new features
- Add usage and configuration examples
- Add SonarQube project configuration
- Consolidate config.example.yaml

* fix: resolve shellcheck warnings in scripts

- Use ./*.go instead of *.go to prevent dash-prefixed filenames
  from being interpreted as options (SC2035)
- Remove unreachable return statement after exit (SC2317)
- Remove obsolete gibidiutils/ directory reference

* chore(deps): upgrade go dependencies

* chore(lint): megalinter fixes

* fix: improve test coverage and fix file descriptor leaks

- Add defer r.Close() to fix pipe file descriptor leaks in benchmark tests
- Refactor TestProcessorConfigureFileTypes with helper functions and assertions
- Refactor TestProcessorLogFinalStats with output capture and keyword verification
- Use shared constants instead of literal strings (TestFilePNG, FormatMarkdown, etc.)
- Reduce cognitive complexity by extracting helper functions

* fix: align test comments with function names

Remove underscores from test comments to match actual function names:
- benchmark/benchmark_test.go (2 fixes)
- fileproc/filetypes_config_test.go (4 fixes)
- fileproc/filetypes_registry_test.go (6 fixes)
- fileproc/processor_test.go (6 fixes)
- fileproc/resource_monitor_types_test.go (4 fixes)
- fileproc/writer_test.go (3 fixes)

* fix: various test improvements and bug fixes

- Remove duplicate maxCacheSize check in filetypes_registry_test.go
- Shorten long comment in processor_test.go to stay under 120 chars
- Remove flaky time.Sleep in collector_test.go, use >= 0 assertion
- Close pipe reader in benchmark_test.go to fix file descriptor leak
- Use ContinueOnError in flags_test.go to match ResetFlags behavior
- Add nil check for p.ui in processor_workers.go before UpdateProgress
- Fix resource_monitor_validation_test.go by setting hardMemoryLimitBytes directly

* chore(yaml): add missing document start markers

Add --- document start to YAML files to satisfy yamllint:
- .github/workflows/codeql.yml
- .github/workflows/build-test-publish.yml
- .github/workflows/security.yml
- .github/actions/setup/action.yml

* fix: guard nil resourceMonitor and fix test deadlock

- Guard resourceMonitor before CreateFileProcessingContext call
- Add ui.UpdateProgress on emergency stop and path error returns
- Fix potential deadlock in TestProcessFile using wg.Go with defer close
2025-12-10 19:07:11 +02:00

222 lines
6.4 KiB
Go

// Package fileproc provides back-pressure management for memory optimization.
package fileproc
import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/ivuorinen/gibidify/config"
"github.com/ivuorinen/gibidify/shared"
)
// BackpressureManager manages memory usage and applies back-pressure when needed.
type BackpressureManager struct {
enabled bool
maxMemoryUsage int64
memoryCheckInterval int
maxPendingFiles int
maxPendingWrites int
filesProcessed int64
mu sync.RWMutex
memoryWarningLogged bool
lastMemoryCheck time.Time
}
// NewBackpressureManager creates a new back-pressure manager with configuration.
func NewBackpressureManager() *BackpressureManager {
return &BackpressureManager{
enabled: config.BackpressureEnabled(),
maxMemoryUsage: config.MaxMemoryUsage(),
memoryCheckInterval: config.MemoryCheckInterval(),
maxPendingFiles: config.MaxPendingFiles(),
maxPendingWrites: config.MaxPendingWrites(),
lastMemoryCheck: time.Now(),
}
}
// CreateChannels creates properly sized channels based on back-pressure configuration.
func (bp *BackpressureManager) CreateChannels() (chan string, chan WriteRequest) {
var fileCh chan string
var writeCh chan WriteRequest
logger := shared.GetLogger()
if bp.enabled {
// Use buffered channels with configured limits
fileCh = make(chan string, bp.maxPendingFiles)
writeCh = make(chan WriteRequest, bp.maxPendingWrites)
logger.Debugf("Created buffered channels: files=%d, writes=%d", bp.maxPendingFiles, bp.maxPendingWrites)
} else {
// Use unbuffered channels (default behavior)
fileCh = make(chan string)
writeCh = make(chan WriteRequest)
logger.Debug("Created unbuffered channels (back-pressure disabled)")
}
return fileCh, writeCh
}
// ShouldApplyBackpressure checks if back-pressure should be applied.
func (bp *BackpressureManager) ShouldApplyBackpressure(ctx context.Context) bool {
// Check for context cancellation first
select {
case <-ctx.Done():
return false // No need for backpressure if canceled
default:
}
if !bp.enabled {
return false
}
// Check if we should evaluate memory usage
filesProcessed := atomic.AddInt64(&bp.filesProcessed, 1)
// Guard against zero or negative interval to avoid modulo-by-zero panic
interval := bp.memoryCheckInterval
if interval <= 0 {
interval = 1
}
if int(filesProcessed)%interval != 0 {
return false
}
// Get current memory usage
var m runtime.MemStats
runtime.ReadMemStats(&m)
currentMemory := shared.SafeUint64ToInt64WithDefault(m.Alloc, 0)
bp.mu.Lock()
defer bp.mu.Unlock()
bp.lastMemoryCheck = time.Now()
// Check if we're over the memory limit
logger := shared.GetLogger()
if currentMemory > bp.maxMemoryUsage {
if !bp.memoryWarningLogged {
logger.Warnf(
"Memory usage (%d bytes) exceeds limit (%d bytes), applying back-pressure",
currentMemory, bp.maxMemoryUsage,
)
bp.memoryWarningLogged = true
}
return true
}
// Reset warning flag if we're back under the limit
if bp.memoryWarningLogged && currentMemory < bp.maxMemoryUsage*8/10 { // 80% of limit
logger.Infof("Memory usage normalized (%d bytes), removing back-pressure", currentMemory)
bp.memoryWarningLogged = false
}
return false
}
// ApplyBackpressure applies back-pressure by triggering garbage collection and adding delay.
func (bp *BackpressureManager) ApplyBackpressure(ctx context.Context) {
if !bp.enabled {
return
}
// Force garbage collection to free up memory
runtime.GC()
// Add a small delay to allow memory to be freed
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Millisecond):
// Small delay to allow GC to complete
}
// Log memory usage after GC
var m runtime.MemStats
runtime.ReadMemStats(&m)
logger := shared.GetLogger()
logger.Debugf("Applied back-pressure: memory after GC = %d bytes", m.Alloc)
}
// Stats returns current back-pressure statistics.
func (bp *BackpressureManager) Stats() BackpressureStats {
bp.mu.RLock()
defer bp.mu.RUnlock()
var m runtime.MemStats
runtime.ReadMemStats(&m)
return BackpressureStats{
Enabled: bp.enabled,
FilesProcessed: atomic.LoadInt64(&bp.filesProcessed),
CurrentMemoryUsage: shared.SafeUint64ToInt64WithDefault(m.Alloc, 0),
MaxMemoryUsage: bp.maxMemoryUsage,
MemoryWarningActive: bp.memoryWarningLogged,
LastMemoryCheck: bp.lastMemoryCheck,
MaxPendingFiles: bp.maxPendingFiles,
MaxPendingWrites: bp.maxPendingWrites,
}
}
// BackpressureStats represents back-pressure manager statistics.
type BackpressureStats struct {
Enabled bool `json:"enabled"`
FilesProcessed int64 `json:"files_processed"`
CurrentMemoryUsage int64 `json:"current_memory_usage"`
MaxMemoryUsage int64 `json:"max_memory_usage"`
MemoryWarningActive bool `json:"memory_warning_active"`
LastMemoryCheck time.Time `json:"last_memory_check"`
MaxPendingFiles int `json:"max_pending_files"`
MaxPendingWrites int `json:"max_pending_writes"`
}
// WaitForChannelSpace waits for space in channels if they're getting full.
func (bp *BackpressureManager) WaitForChannelSpace(ctx context.Context, fileCh chan string, writeCh chan WriteRequest) {
if !bp.enabled {
return
}
logger := shared.GetLogger()
// Check if file channel is getting full (>90% capacity)
fileCap := cap(fileCh)
if fileCap > 0 && len(fileCh) > fileCap*9/10 {
logger.Debugf("File channel is %d%% full, waiting for space", len(fileCh)*100/fileCap)
// Wait a bit for the channel to drain
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Millisecond):
}
}
// Check if write channel is getting full (>90% capacity)
writeCap := cap(writeCh)
if writeCap > 0 && len(writeCh) > writeCap*9/10 {
logger.Debugf("Write channel is %d%% full, waiting for space", len(writeCh)*100/writeCap)
// Wait a bit for the channel to drain
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Millisecond):
}
}
}
// LogBackpressureInfo logs back-pressure configuration and status.
func (bp *BackpressureManager) LogBackpressureInfo() {
logger := shared.GetLogger()
if bp.enabled {
logger.Infof(
"Back-pressure enabled: maxMemory=%dMB, fileBuffer=%d, writeBuffer=%d, checkInterval=%d",
bp.maxMemoryUsage/int64(shared.BytesPerMB), bp.maxPendingFiles, bp.maxPendingWrites, bp.memoryCheckInterval,
)
} else {
logger.Info("Back-pressure disabled")
}
}