Files
gibidify/fileproc/processor.go
Ismo Vuorinen 3f65b813bd feat: update go to 1.25, add permissions and envs (#49)
* chore(ci): update go to 1.25, add permissions and envs
* fix(ci): update pr-lint.yml
* chore: update go, fix linting
* fix: tests and linting
* fix(lint): lint fixes, renovate should now pass
* fix: updates, security upgrades
* chore: workflow updates, lint
* fix: more lint, checkmake, and other fixes
* fix: more lint, convert scripts to POSIX compliant
* fix: simplify codeql workflow
* tests: increase test coverage, fix found issues
* fix(lint): editorconfig checking, add to linters
* fix(lint): shellcheck, add to linters
* fix(lint): apply cr comment suggestions
* fix(ci): remove step-security/harden-runner
* fix(lint): remove duplication, apply cr fixes
* fix(ci): tests in CI/CD pipeline
* chore(lint): deduplication of strings
* fix(lint): apply cr comment suggestions
* fix(ci): actionlint
* fix(lint): apply cr comment suggestions
* chore: lint, add deps management
2025-10-10 12:14:42 +03:00

373 lines
10 KiB
Go

// Package fileproc provides functions for processing files.
package fileproc
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/sirupsen/logrus"
"github.com/ivuorinen/gibidify/config"
"github.com/ivuorinen/gibidify/gibidiutils"
)
const (
// StreamChunkSize is the size of chunks when streaming large files (64KB).
StreamChunkSize = 65536
// StreamThreshold is the file size above which we use streaming (1MB).
StreamThreshold = 1048576
// MaxMemoryBuffer is the maximum memory to use for buffering content (10MB).
MaxMemoryBuffer = 10485760
)
// WriteRequest represents the content to be written.
type WriteRequest struct {
Path string
Content string
IsStream bool
Reader io.Reader
}
// multiReaderCloser wraps an io.Reader with a Close method that closes underlying closers.
type multiReaderCloser struct {
reader io.Reader
closers []io.Closer
}
func (m *multiReaderCloser) Read(p []byte) (n int, err error) {
return m.reader.Read(p)
}
func (m *multiReaderCloser) Close() error {
var firstErr error
for _, c := range m.closers {
if err := c.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
// FileProcessor handles file processing operations.
type FileProcessor struct {
rootPath string
sizeLimit int64
resourceMonitor *ResourceMonitor
}
// NewFileProcessor creates a new file processor.
func NewFileProcessor(rootPath string) *FileProcessor {
return &FileProcessor{
rootPath: rootPath,
sizeLimit: config.GetFileSizeLimit(),
resourceMonitor: NewResourceMonitor(),
}
}
// NewFileProcessorWithMonitor creates a new file processor with a shared resource monitor.
func NewFileProcessorWithMonitor(rootPath string, monitor *ResourceMonitor) *FileProcessor {
return &FileProcessor{
rootPath: rootPath,
sizeLimit: config.GetFileSizeLimit(),
resourceMonitor: monitor,
}
}
// checkContextCancellation checks if context is cancelled and logs an error if so.
// Returns true if context is cancelled, false otherwise.
func (p *FileProcessor) checkContextCancellation(ctx context.Context, filePath, stage string) bool {
select {
case <-ctx.Done():
// Format stage with leading space if provided
stageMsg := stage
if stage != "" {
stageMsg = " " + stage
}
gibidiutils.LogErrorf(
gibidiutils.NewStructuredError(
gibidiutils.ErrorTypeValidation,
gibidiutils.CodeResourceLimitTimeout,
fmt.Sprintf("file processing cancelled%s", stageMsg),
filePath,
nil,
),
"File processing cancelled%s: %s",
stageMsg,
filePath,
)
return true
default:
return false
}
}
// ProcessFile reads the file at filePath and sends a formatted output to outCh.
// It automatically chooses between loading the entire file or streaming based on file size.
func ProcessFile(filePath string, outCh chan<- WriteRequest, rootPath string) {
processor := NewFileProcessor(rootPath)
ctx := context.Background()
processor.ProcessWithContext(ctx, filePath, outCh)
}
// ProcessFileWithMonitor processes a file using a shared resource monitor.
func ProcessFileWithMonitor(
ctx context.Context,
filePath string,
outCh chan<- WriteRequest,
rootPath string,
monitor *ResourceMonitor,
) {
processor := NewFileProcessorWithMonitor(rootPath, monitor)
processor.ProcessWithContext(ctx, filePath, outCh)
}
// Process handles file processing with the configured settings.
func (p *FileProcessor) Process(filePath string, outCh chan<- WriteRequest) {
ctx := context.Background()
p.ProcessWithContext(ctx, filePath, outCh)
}
// ProcessWithContext handles file processing with context and resource monitoring.
func (p *FileProcessor) ProcessWithContext(ctx context.Context, filePath string, outCh chan<- WriteRequest) {
// Create file processing context with timeout
fileCtx, fileCancel := p.resourceMonitor.CreateFileProcessingContext(ctx)
defer fileCancel()
// Wait for rate limiting
if err := p.resourceMonitor.WaitForRateLimit(fileCtx); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
gibidiutils.LogErrorf(
gibidiutils.NewStructuredError(
gibidiutils.ErrorTypeValidation,
gibidiutils.CodeResourceLimitTimeout,
"file processing timeout during rate limiting",
filePath,
nil,
),
"File processing timeout during rate limiting: %s",
filePath,
)
}
return
}
// Validate file and check resource limits
fileInfo, err := p.validateFileWithLimits(fileCtx, filePath)
if err != nil {
return // Error already logged
}
// Acquire read slot for concurrent processing
if err := p.resourceMonitor.AcquireReadSlot(fileCtx); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
gibidiutils.LogErrorf(
gibidiutils.NewStructuredError(
gibidiutils.ErrorTypeValidation,
gibidiutils.CodeResourceLimitTimeout,
"file processing timeout waiting for read slot",
filePath,
nil,
),
"File processing timeout waiting for read slot: %s",
filePath,
)
}
return
}
defer p.resourceMonitor.ReleaseReadSlot()
// Check hard memory limits before processing
if err := p.resourceMonitor.CheckHardMemoryLimit(); err != nil {
gibidiutils.LogErrorf(err, "Hard memory limit check failed for file: %s", filePath)
return
}
// Get relative path
relPath := p.getRelativePath(filePath)
// Process file with timeout
processStart := time.Now()
defer func() {
// Record successful processing
p.resourceMonitor.RecordFileProcessed(fileInfo.Size())
logrus.Debugf("File processed in %v: %s", time.Since(processStart), filePath)
}()
// Choose processing strategy based on file size
if fileInfo.Size() <= StreamThreshold {
p.processInMemoryWithContext(fileCtx, filePath, relPath, outCh)
} else {
p.processStreamingWithContext(fileCtx, filePath, relPath, outCh)
}
}
// validateFileWithLimits checks if the file can be processed with resource limits.
func (p *FileProcessor) validateFileWithLimits(ctx context.Context, filePath string) (os.FileInfo, error) {
// Check context cancellation
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
fileInfo, err := os.Stat(filePath)
if err != nil {
structErr := gibidiutils.WrapError(
err, gibidiutils.ErrorTypeFileSystem, gibidiutils.CodeFSAccess,
"failed to stat file",
).WithFilePath(filePath)
gibidiutils.LogErrorf(structErr, "Failed to stat file %s", filePath)
return nil, structErr
}
// Check traditional size limit
if fileInfo.Size() > p.sizeLimit {
filesizeContext := map[string]interface{}{
"file_size": fileInfo.Size(),
"size_limit": p.sizeLimit,
}
gibidiutils.LogErrorf(
gibidiutils.NewStructuredError(
gibidiutils.ErrorTypeValidation,
gibidiutils.CodeValidationSize,
fmt.Sprintf("file size (%d bytes) exceeds limit (%d bytes)", fileInfo.Size(), p.sizeLimit),
filePath,
filesizeContext,
),
"Skipping large file %s", filePath,
)
return nil, fmt.Errorf("file too large")
}
// Check resource limits
if err := p.resourceMonitor.ValidateFileProcessing(filePath, fileInfo.Size()); err != nil {
gibidiutils.LogErrorf(err, "Resource limit validation failed for file: %s", filePath)
return nil, err
}
return fileInfo, nil
}
// getRelativePath computes the path relative to rootPath.
func (p *FileProcessor) getRelativePath(filePath string) string {
relPath, err := filepath.Rel(p.rootPath, filePath)
if err != nil {
return filePath // Fallback
}
return relPath
}
// processInMemoryWithContext loads the entire file into memory with context awareness.
func (p *FileProcessor) processInMemoryWithContext(
ctx context.Context,
filePath, relPath string,
outCh chan<- WriteRequest,
) {
// Check context before reading
if p.checkContextCancellation(ctx, filePath, "") {
return
}
// #nosec G304 - filePath is validated by walker
content, err := os.ReadFile(filePath)
if err != nil {
structErr := gibidiutils.WrapError(
err, gibidiutils.ErrorTypeProcessing, gibidiutils.CodeProcessingFileRead,
"failed to read file",
).WithFilePath(filePath)
gibidiutils.LogErrorf(structErr, "Failed to read file %s", filePath)
return
}
// Check context again after reading
if p.checkContextCancellation(ctx, filePath, "after read") {
return
}
// Check context before sending output
if p.checkContextCancellation(ctx, filePath, "before output") {
return
}
outCh <- WriteRequest{
Path: relPath,
Content: p.formatContent(relPath, string(content)),
IsStream: false,
}
}
// processStreamingWithContext creates a streaming reader for large files with context awareness.
func (p *FileProcessor) processStreamingWithContext(
ctx context.Context,
filePath, relPath string,
outCh chan<- WriteRequest,
) {
// Check context before creating reader
if p.checkContextCancellation(ctx, filePath, "before streaming") {
return
}
reader := p.createStreamReaderWithContext(ctx, filePath, relPath)
if reader == nil {
return // Error already logged
}
// Check context before sending output
if p.checkContextCancellation(ctx, filePath, "before streaming output") {
// Close the reader to prevent file descriptor leak
if closer, ok := reader.(io.Closer); ok {
_ = closer.Close()
}
return
}
outCh <- WriteRequest{
Path: relPath,
Content: "", // Empty since content is in Reader
IsStream: true,
Reader: reader,
}
}
// createStreamReaderWithContext creates a reader that combines header and file content with context awareness.
func (p *FileProcessor) createStreamReaderWithContext(ctx context.Context, filePath, relPath string) io.Reader {
// Check context before opening file
if p.checkContextCancellation(ctx, filePath, "before opening file") {
return nil
}
// #nosec G304 - filePath is validated by walker
file, err := os.Open(filePath)
if err != nil {
structErr := gibidiutils.WrapError(
err, gibidiutils.ErrorTypeProcessing, gibidiutils.CodeProcessingFileRead,
"failed to open file for streaming",
).WithFilePath(filePath)
gibidiutils.LogErrorf(structErr, "Failed to open file for streaming %s", filePath)
return nil
}
header := p.formatHeader(relPath)
// Wrap in multiReaderCloser to ensure file is closed even on cancellation
return &multiReaderCloser{
reader: io.MultiReader(header, file),
closers: []io.Closer{file},
}
}
// formatContent formats the file content with header.
func (p *FileProcessor) formatContent(relPath, content string) string {
return fmt.Sprintf("\n---\n%s\n%s\n", relPath, content)
}
// formatHeader creates a reader for the file header.
func (p *FileProcessor) formatHeader(relPath string) io.Reader {
return strings.NewReader(fmt.Sprintf("\n---\n%s\n", relPath))
}