Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 170 additions & 61 deletions common/rst/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/thinkparq/beegfs-go/common/filesystem"
Expand Down Expand Up @@ -84,8 +85,7 @@ func (c *JobBuilderClient) ExecuteJobBuilderRequest(ctx context.Context, workReq
// Each client should at least have some input since there may be costs associated with the
// requests as in s3.
maxRequests := 1000

walkChanSize := cap(jobSubmissionChan)
walkChanSize := min(cap(jobSubmissionChan), maxRequests+1) // maxRequests +1 is for ResumeToken when there is more work
var walkChan <-chan *filesystem.StreamPathResult
walkPaths := filesystem.StreamPathsLexicographically
if cfg.GetUpdate() {
Expand Down Expand Up @@ -125,7 +125,15 @@ func (c *JobBuilderClient) ExecuteJobBuilderRequest(ctx context.Context, workReq
return c.executeJobBuilderRequest(ctx, workRequest, walkChan, jobSubmissionChan, cfg)
}

func (r *JobBuilderClient) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Duration, error) {
func (c *JobBuilderClient) IncludeInBulkRequest(ctx context.Context, request *beeremote.JobRequest) bool {
return false
}

func (c *JobBuilderClient) BuildBulkRequest(ctx context.Context, emit EmitBulkRequestFn) (submitBulkRequest SubmitBulkRequestFn, appendBulkRequest AppendBulkRequestFn, err error) {
return nil, nil, ErrUnsupportedOpForRST
}

func (c *JobBuilderClient) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Duration, error) {
return true, 0, nil
}

Expand Down Expand Up @@ -170,32 +178,44 @@ func (c *JobBuilderClient) executeJobBuilderRequest(
jobSubmissionChan chan<- *beeremote.JobRequest,
cfg *flex.JobRequestCfg,
) (bool, error) {
builder := request.GetBuilder()

var walkingLocalPath bool
var remotePathDir string
var remotePathIsGlob bool
var isPathDir bool
if cfg.Download {
walkingLocalPath = walkLocalPathInsteadOfRemote(cfg)
remotePathDir, remotePathIsGlob = GetDownloadRemotePathDirectory(cfg.RemotePath)
stat, err := c.mountPoint.Lstat(cfg.Path)
isPathDir = err == nil && stat.IsDir()
isStatusError := func(status *beeremote.JobRequest_GenerationStatus) bool {
return status != nil && (status.State == beeremote.JobRequest_GenerationStatus_ERROR || status.State == beeremote.JobRequest_GenerationStatus_FAILED_PRECONDITION)
}

type bulkRequestState struct {
submit SubmitBulkRequestFn
append AppendBulkRequestFn
appendMu sync.Mutex
jobsWithErrors atomic.Int32
jobsSubmitted atomic.Int32
}
builder := request.GetBuilder()
walkLocal := walkLocalPathInsteadOfRemote(cfg)
getPaths := c.getPathsFn(cfg, walkLocal)
reschedule := false
builderStateMu := sync.Mutex{}
bulkRequestStates := make(map[uint32]*bulkRequestState)
bulkRequestStatesMu := sync.Mutex{}

g, walkCtx := errgroup.WithContext(ctx)
maxWorkers := runtime.GOMAXPROCS(0)
walkDoneChan := make(chan struct{}, maxWorkers)
defer close(walkDoneChan)
createJobRequests := func() error {
var err error
var inMountPath string
var remotePath string
jobsWithErrors := int32(0)
jobsSubmitted := int32(0)
defer func() {
builderStateMu.Lock()
builder.Submitted += jobsSubmitted
builder.Errors += jobsWithErrors
builderStateMu.Unlock()
}()

var walkPath string
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-walkCtx.Done():
return walkCtx.Err()
case walkResp, ok := <-walkChan:
if !ok {
select {
Expand All @@ -216,84 +236,113 @@ func (c *JobBuilderClient) executeJobBuilderRequest(
builderStateMu.Unlock()
return nil
}
walkPath = walkResp.Path
}

if cfg.Download {
if walkingLocalPath {
// Walking cfg.Path to support stub file download and files with a defined rst.
inMountPath = walkResp.Path
} else {
remotePath = walkResp.Path
inMountPath, err = GetDownloadInMountPath(cfg.Path, remotePath, remotePathDir, remotePathIsGlob, isPathDir, cfg.Flatten)
if err != nil {
// This should never happen since both remotePath and remotePathDir
// come directly from cfg.RemotePath, so any error here indicates a
// bug in the walking logic.
return err
}

// Ensure the local directory structure supports the object downloads
if err := c.mountPoint.CreateDir(filepath.Dir(inMountPath), 0755); err != nil {
return err
}
}
} else {
inMountPath = walkResp.Path
remotePath = inMountPath
}
inMountPath, remotePath, err := getPaths(walkPath)
if err != nil {
return err
}

if cfg.GetUpdate() {
if stat, statErr := c.mountPoint.Lstat(inMountPath); statErr == nil && stat.IsDir() {
dirErr := updateDirRstConfig(ctx, cfg.RemoteStorageTarget, inMountPath)
builderStateMu.Lock()
builder.Submitted++
dirErr := updateDirRstConfig(walkCtx, cfg.RemoteStorageTarget, inMountPath)
jobsSubmitted++
if dirErr != nil {
builder.Errors++
jobsWithErrors++
}
builderStateMu.Unlock()
continue
}
}

jobRequests, err := BuildJobRequests(ctx, c.rstMap, c.mountPoint, inMountPath, remotePath, cfg)
jobRequests, err := BuildJobRequests(walkCtx, c.rstMap, c.mountPoint, inMountPath, remotePath, cfg)
if err != nil {
// BuildJobRequest should only return fatal errors, or if there are no RSTs
// specified/configured on an entry and there is no other way to return the
// error other then aborting the builder job entirely.
return err
}

errorCount := 0
for _, jobRequest := range jobRequests {
jobWithError := false
status := jobRequest.GetGenerationStatus()
if status != nil && (status.State == beeremote.JobRequest_GenerationStatus_ERROR || status.State == beeremote.JobRequest_GenerationStatus_FAILED_PRECONDITION) {
errorCount++
if isStatusError(status) {
jobWithError = true
} else if status == nil || status.State == beeremote.JobRequest_GenerationStatus_UNSPECIFIED {
client := c.rstMap[jobRequest.GetRemoteStorageTarget()]
if client.IncludeInBulkRequest(walkCtx, jobRequest) {
rstId := jobRequest.GetRemoteStorageTarget()

bulkRequestStatesMu.Lock()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(non-blocking): to avoid lock contention in a hot path consider making this a RWMutex as after the initial population of bulkRequestStates the map is only read.

Assisted-by: Claude:claude-opus-4-6

state, ok := bulkRequestStates[rstId]
if !ok {

emit := func(ctx context.Context, request *beeremote.JobRequest) {
jobWithError := false
status := request.GetGenerationStatus()
if isStatusError(status) {
jobWithError = true
}
select {
case <-ctx.Done():
return
case jobSubmissionChan <- request:
if jobWithError {
state.jobsWithErrors.Add(1)
}
state.jobsSubmitted.Add(1)
}
}

submitBulkRequest, appendBulkRequest, err := client.BuildBulkRequest(walkCtx, emit)
if err != nil {
bulkRequestStatesMu.Unlock()
return fmt.Errorf("job builder request was aborted: %w", err)
}
if submitBulkRequest == nil || appendBulkRequest == nil {
bulkRequestStatesMu.Unlock()
return fmt.Errorf("job builder request was aborted: RST client %T for target %d returned invalid bulk request callbacks: submit and append functions must both be non-nil", client, rstId)
}

state = &bulkRequestState{
submit: submitBulkRequest,
append: appendBulkRequest,
}
bulkRequestStates[rstId] = state
}
bulkRequestStatesMu.Unlock()

state.appendMu.Lock()
state.append(walkCtx, jobRequest)
state.appendMu.Unlock()
continue
}
}

select {
case <-ctx.Done():
case <-walkCtx.Done():
return walkCtx.Err()
case jobSubmissionChan <- jobRequest:
if jobWithError {
jobsWithErrors++
}
jobsSubmitted++
}
}

builderStateMu.Lock()
builder.Submitted += int32(len(jobRequests))
builder.Errors += int32(errorCount)
builderStateMu.Unlock()
}
}

// Start worker(s) that process walk paths and enqueue job requests. Begin with one and add more
// (up to GOMAXPROCS) when the job submission channel stays near empty, indicating the consumer is
// draining faster than we can fill it. This keeps throughput balanced without over saturating
// the system.
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
workers := 1
lowThresholdTicks := 0
g.Go(createJobRequests)
for {
select {
case <-ctx.Done():
case <-walkCtx.Done():
return nil
case <-walkDoneChan:
return nil
Expand All @@ -317,9 +366,35 @@ func (c *JobBuilderClient) executeJobBuilderRequest(
}
}
})
if err := g.Wait(); err != nil {

err := g.Wait()

if len(bulkRequestStates) > 0 {
// Finalize and submit provider bulk requests.
wg := sync.WaitGroup{}
for _, state := range bulkRequestStates {
wg.Go(func() {
state.submit(ctx)
})
}
wg.Wait()

jobsWithErrors := int32(0)
jobsSubmitted := int32(0)
for _, state := range bulkRequestStates {
jobsWithErrors += state.jobsWithErrors.Load()
jobsSubmitted += state.jobsSubmitted.Load()
}
builderStateMu.Lock()
builder.Submitted += jobsSubmitted
builder.Errors += jobsWithErrors
builderStateMu.Unlock()
}

if err != nil {
return false, fmt.Errorf("job builder request was aborted: %w", err)
}

if reschedule {
return true, nil
}
Expand All @@ -329,7 +404,7 @@ func (c *JobBuilderClient) executeJobBuilderRequest(
totalErrors := builder.GetErrors()
if totalSubmitted == 0 {
if cfg.Download {
if walkingLocalPath {
if walkLocal {
errMessage = fmt.Sprintf("walking local path since --%s was not provided; No matches found in path: %s", RemotePathFlag, cfg.Path)
} else {
errMessage = fmt.Sprintf("no matches found in remote path: %s", cfg.RemotePath)
Expand All @@ -353,3 +428,37 @@ func (c *JobBuilderClient) executeJobBuilderRequest(
func walkLocalPathInsteadOfRemote(cfg *flex.JobRequestCfg) bool {
return cfg.RemotePath == ""
}

func (c *JobBuilderClient) getPathsFn(cfg *flex.JobRequestCfg, walkingLocalPath bool) func(string) (string, string, error) {
var remotePathDir string
var remotePathIsGlob bool
var isPathDir bool
if cfg.Download {
remotePathDir, remotePathIsGlob = GetDownloadRemotePathDirectory(cfg.RemotePath)
stat, err := c.mountPoint.Lstat(cfg.Path)
isPathDir = err == nil && stat.IsDir()
}

return func(walkPath string) (inMountPath, remotePath string, err error) {
if cfg.Download {
if walkingLocalPath {
// Walking cfg.Path to support stub file download and files with a defined rst.
inMountPath = walkPath
} else {
remotePath = walkPath
// GetDownloadInMountPath should never return an error happen since remotePath and
// remotePathDir are derived from cfg.RemotePath, so any error here indicates a bug
// in the walking logic.
inMountPath, err = GetDownloadInMountPath(cfg.Path, remotePath, remotePathDir, remotePathIsGlob, isPathDir, cfg.Flatten)
if err == nil {
// Ensure the local directory structure supports the object downloads
err = c.mountPoint.CreateDir(filepath.Dir(inMountPath), 0755)
}
}
} else {
inMountPath = walkPath
remotePath = inMountPath
}
return
}
}
Loading
Loading