From 6731bdfb43d289ffafb3cbf0a92ab1fc2dedc862 Mon Sep 17 00:00:00 2001 From: Omri SirComp Date: Thu, 21 May 2026 16:22:37 +0300 Subject: [PATCH] fix(analyzer): bound file analysis workers --- pkg/analyzer/analyzer.go | 60 +++++++++++++++++++++++++++-------- pkg/analyzer/analyzer_test.go | 13 ++++++++ 2 files changed, 60 insertions(+), 13 deletions(-) diff --git a/pkg/analyzer/analyzer.go b/pkg/analyzer/analyzer.go index 83ac9f8a359..949ee05226c 100644 --- a/pkg/analyzer/analyzer.go +++ b/pkg/analyzer/analyzer.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "regexp" + "runtime" "sort" "strings" "sync" @@ -37,6 +38,8 @@ const ( crossplane = "crossplane" knative = "knative" sizeMb = 1048576 + + maxAnalyzerWorkers = 128 ) // move the openApi regex to public to be used on file.go @@ -373,19 +376,34 @@ func Analyze(a *Analyzer) (model.AnalyzedPaths, error) { a.Types, a.ExcludeTypes = typeLower(a.Types, a.ExcludeTypes) - // Start the workers - for _, file := range files { - wg.Add(1) - // analyze the files concurrently - a := &analyzerInfo{ - typesFlag: a.Types, - excludeTypesFlag: a.ExcludeTypes, - filePath: file, - fallbackMinifiedFileLOC: a.FallbackMinifiedFileLOC, - } - go a.worker(results, unwanted, locCount, fileInfo, &wg) + // Start a bounded worker pool. Large repositories can contain tens of + // thousands of candidate files, so one goroutine per file can exhaust + // runtime threads while workers are blocked on file I/O. + filesToAnalyze := make(chan string) + workerCount := analyzerWorkerCount(len(files)) + wg.Add(workerCount) + for range workerCount { + go func() { + defer wg.Done() + for file := range filesToAnalyze { + fileAnalyzer := &analyzerInfo{ + typesFlag: a.Types, + excludeTypesFlag: a.ExcludeTypes, + filePath: file, + fallbackMinifiedFileLOC: a.FallbackMinifiedFileLOC, + } + fileAnalyzer.worker(results, unwanted, locCount, fileInfo) + } + }() } + go func() { + for _, file := range files { + filesToAnalyze <- file + } + close(filesToAnalyze) + }() + go func() { // close channel results when the worker has finished writing into it defer func() { @@ -419,14 +437,12 @@ func (a *analyzerInfo) worker( //nolint: gocyclo unwanted chan<- string, locCount chan<- int, fileInfo chan<- fileTypeInfo, - wg *sync.WaitGroup, ) { defer func() { if err := recover(); err != nil { log.Warn().Msgf("Recovered from analyzing panic for file %s with error: %#v", a.filePath, err.(error).Error()) unwanted <- a.filePath } - wg.Done() }() ext, errExt := utils.GetExtension(a.filePath) @@ -521,6 +537,24 @@ func needsOverride(check bool, returnType, key, ext string) bool { return false } +func analyzerWorkerCount(fileCount int) int { + if fileCount < 1 { + return 0 + } + + workers := runtime.GOMAXPROCS(0) * 2 + if workers < 1 { + workers = 1 + } + if workers > maxAnalyzerWorkers { + workers = maxAnalyzerWorkers + } + if fileCount < workers { + return fileCount + } + return workers +} + // checkContent will determine the file type by content when worker was unable to // determine by ext, if no type was determined checkContent adds it to unwanted channel func (a *analyzerInfo) checkContent( diff --git a/pkg/analyzer/analyzer_test.go b/pkg/analyzer/analyzer_test.go index ea7adcf7f69..e6ad5db3cd8 100644 --- a/pkg/analyzer/analyzer_test.go +++ b/pkg/analyzer/analyzer_test.go @@ -2,6 +2,7 @@ package analyzer import ( "path/filepath" + "runtime" "sort" "testing" @@ -734,3 +735,15 @@ type platformFileStats struct { dirCount int totalLOC int } + +func TestAnalyzerWorkerCount(t *testing.T) { + oldMaxProcs := runtime.GOMAXPROCS(1) + defer runtime.GOMAXPROCS(oldMaxProcs) + + require.Equal(t, 0, analyzerWorkerCount(0)) + require.Equal(t, 2, analyzerWorkerCount(10)) + + runtime.GOMAXPROCS(maxAnalyzerWorkers) + require.Equal(t, 5, analyzerWorkerCount(5)) + require.Equal(t, maxAnalyzerWorkers, analyzerWorkerCount(maxAnalyzerWorkers+1)) +}