Files
gibidify/fileproc/backpressure_concurrency_test.go
Ismo Vuorinen 3f65b813bd feat: update go to 1.25, add permissions and envs (#49)
* chore(ci): update go to 1.25, add permissions and envs
* fix(ci): update pr-lint.yml
* chore: update go, fix linting
* fix: tests and linting
* fix(lint): lint fixes, renovate should now pass
* fix: updates, security upgrades
* chore: workflow updates, lint
* fix: more lint, checkmake, and other fixes
* fix: more lint, convert scripts to POSIX compliant
* fix: simplify codeql workflow
* tests: increase test coverage, fix found issues
* fix(lint): editorconfig checking, add to linters
* fix(lint): shellcheck, add to linters
* fix(lint): apply cr comment suggestions
* fix(ci): remove step-security/harden-runner
* fix(lint): remove duplication, apply cr fixes
* fix(ci): tests in CI/CD pipeline
* chore(lint): deduplication of strings
* fix(lint): apply cr comment suggestions
* fix(ci): actionlint
* fix(lint): apply cr comment suggestions
* chore: lint, add deps management
2025-10-10 12:14:42 +03:00

196 lines
4.4 KiB
Go

package fileproc
import (
"context"
"sync"
"testing"
"time"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestBackpressureManagerConcurrency(t *testing.T) {
// Configure via viper instead of direct field access
origEnabled := viper.Get(testBackpressureEnabled)
t.Cleanup(func() {
if origEnabled != nil {
viper.Set(testBackpressureEnabled, origEnabled)
}
})
viper.Set(testBackpressureEnabled, true)
bm := NewBackpressureManager()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
// Multiple goroutines checking backpressure
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
bm.ShouldApplyBackpressure(ctx)
}()
}
// Multiple goroutines applying backpressure
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
bm.ApplyBackpressure(ctx)
}()
}
// Multiple goroutines getting stats
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
bm.GetStats()
}()
}
// Multiple goroutines creating channels
// Note: CreateChannels returns new channels each time, caller owns them
type channelResult struct {
fileCh chan string
writeCh chan WriteRequest
}
results := make(chan channelResult, 3)
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
fileCh, writeCh := bm.CreateChannels()
results <- channelResult{fileCh, writeCh}
}()
}
wg.Wait()
close(results)
// Verify channels are created and have expected properties
for result := range results {
assert.NotNil(t, result.fileCh)
assert.NotNil(t, result.writeCh)
// Close channels to prevent resource leak (caller owns them)
close(result.fileCh)
close(result.writeCh)
}
// Verify stats are consistent
stats := bm.GetStats()
assert.GreaterOrEqual(t, stats.FilesProcessed, int64(10))
}
func TestBackpressureManagerIntegration(t *testing.T) {
// Configure via viper instead of direct field access
origEnabled := viper.Get(testBackpressureEnabled)
origMaxFiles := viper.Get(testBackpressureMaxFiles)
origMaxWrites := viper.Get(testBackpressureMaxWrites)
origCheckInterval := viper.Get(testBackpressureMemoryCheck)
origMaxMemory := viper.Get(testBackpressureMaxMemory)
t.Cleanup(func() {
if origEnabled != nil {
viper.Set(testBackpressureEnabled, origEnabled)
}
if origMaxFiles != nil {
viper.Set(testBackpressureMaxFiles, origMaxFiles)
}
if origMaxWrites != nil {
viper.Set(testBackpressureMaxWrites, origMaxWrites)
}
if origCheckInterval != nil {
viper.Set(testBackpressureMemoryCheck, origCheckInterval)
}
if origMaxMemory != nil {
viper.Set(testBackpressureMaxMemory, origMaxMemory)
}
})
viper.Set(testBackpressureEnabled, true)
viper.Set(testBackpressureMaxFiles, 10)
viper.Set(testBackpressureMaxWrites, 10)
viper.Set(testBackpressureMemoryCheck, 10)
viper.Set(testBackpressureMaxMemory, 100*1024*1024) // 100MB
bm := NewBackpressureManager()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create channels - caller owns these channels and is responsible for closing them
fileCh, writeCh := bm.CreateChannels()
require.NotNil(t, fileCh)
require.NotNil(t, writeCh)
require.Greater(t, cap(fileCh), 0, "fileCh should be buffered")
require.Greater(t, cap(writeCh), 0, "writeCh should be buffered")
// Simulate file processing
var wg sync.WaitGroup
// Producer
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
// Check for backpressure
if bm.ShouldApplyBackpressure(ctx) {
bm.ApplyBackpressure(ctx)
}
// Wait for channel space if needed
bm.WaitForChannelSpace(ctx, fileCh, writeCh)
select {
case fileCh <- "file.txt":
// File sent
case <-ctx.Done():
return
}
}
}()
// Consumer
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
select {
case <-fileCh:
// Process file (do not manually increment filesProcessed)
case <-ctx.Done():
return
}
}
}()
// Wait for completion
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// Success
case <-time.After(5 * time.Second):
t.Fatal("Integration test timeout")
}
// Log final info
bm.LogBackpressureInfo()
// Check final stats
stats := bm.GetStats()
assert.GreaterOrEqual(t, stats.FilesProcessed, int64(100))
// Clean up - caller owns the channels, safe to close now that goroutines have finished
close(fileCh)
close(writeCh)
}