diff --git a/common/rst/builder.go b/common/rst/builder.go index 7372c1c09..6d6b50485 100644 --- a/common/rst/builder.go +++ b/common/rst/builder.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "path" "path/filepath" "runtime" "sync" @@ -59,7 +60,7 @@ func (c *JobBuilderClient) GenerateWorkRequests(ctx context.Context, lastJob *be return workRequests, nil } -func (c *JobBuilderClient) ExecuteJobBuilderRequest(ctx context.Context, workRequest *flex.WorkRequest, jobSubmissionChan chan<- *beeremote.JobRequest) (reschedule bool, err error) { +func (c *JobBuilderClient) ExecuteJobBuilderRequest(ctx context.Context, workRequest *flex.WorkRequest, jobSubmissionChan chan<- *beeremote.JobRequest) (reschedule bool, delay time.Duration, err error) { if !workRequest.HasBuilder() { err = ErrReqAndRSTTypeMismatch return @@ -68,6 +69,9 @@ func (c *JobBuilderClient) ExecuteJobBuilderRequest(ctx context.Context, workReq builder := workRequest.GetBuilder() cfg := builder.GetCfg() resumeToken := workRequest.GetExternalId() + if isWalkCompleteSentinel(resumeToken, workRequest.JobId) { + return c.executeJobBuilderRequest(ctx, workRequest, nil, jobSubmissionChan, cfg) + } var filter filesystem.FileInfoFilter filterExpr := cfg.GetFilterExpr() @@ -84,8 +88,7 @@ func (c *JobBuilderClient) ExecuteJobBuilderRequest(ctx context.Context, workReq // Each client should at least have some input since there may be costs associated with the // requests as in s3. maxRequests := 1000 - - walkChanSize := cap(jobSubmissionChan) + walkChanSize := min(cap(jobSubmissionChan), maxRequests+1) // maxRequests +1 is for ResumeToken when there is more work var walkChan <-chan *filesystem.StreamPathResult walkPaths := filesystem.StreamPathsLexicographically if cfg.GetUpdate() { @@ -94,7 +97,7 @@ func (c *JobBuilderClient) ExecuteJobBuilderRequest(ctx context.Context, workReq if cfg.Download { if filter != nil { - return false, fmt.Errorf("filter expressions (--%s) are not supported for downloads yet", filesystem.FilterExprFlag) + return false, 0, fmt.Errorf("filter expressions (--%s) are not supported for downloads yet", filesystem.FilterExprFlag) } if walkLocalPathInsteadOfRemote(cfg) { @@ -125,7 +128,23 @@ func (c *JobBuilderClient) ExecuteJobBuilderRequest(ctx context.Context, workReq return c.executeJobBuilderRequest(ctx, workRequest, walkChan, jobSubmissionChan, cfg) } -func (r *JobBuilderClient) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Duration, error) { +func (c *JobBuilderClient) IncludeInBulkRequest(ctx context.Context, request *beeremote.JobRequest) (include bool, operation string) { + return false, "" +} + +func (c *JobBuilderClient) ExecuteBulkRequest(ctx context.Context, stateMountPath string, operation string, requests []*beeremote.JobRequest) (reschedule bool, delay time.Duration, err error) { + return false, 0, ErrUnsupportedOpForRST +} + +func (c *JobBuilderClient) CompleteBulkRequest(ctx context.Context, stateMountPath string, operation string) error { + return ErrUnsupportedOpForRST +} + +func (c *JobBuilderClient) CancelBulkRequest(ctx context.Context, stateMountPath string, operation string, reason error) error { + return ErrUnsupportedOpForRST +} + +func (c *JobBuilderClient) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Duration, error) { return true, 0, nil } @@ -163,173 +182,260 @@ func (c *JobBuilderClient) GenerateExternalId(ctx context.Context, cfg *flex.Job return "", ErrUnsupportedOpForRST } +func isStatusError(status *beeremote.JobRequest_GenerationStatus) bool { + return status != nil && (status.State == beeremote.JobRequest_GenerationStatus_ERROR || status.State == beeremote.JobRequest_GenerationStatus_FAILED_PRECONDITION) +} + func (c *JobBuilderClient) executeJobBuilderRequest( ctx context.Context, request *flex.WorkRequest, walkChan <-chan *filesystem.StreamPathResult, jobSubmissionChan chan<- *beeremote.JobRequest, cfg *flex.JobRequestCfg, -) (bool, error) { - builder := request.GetBuilder() +) (bool, time.Duration, error) { - var walkingLocalPath bool - var remotePathDir string - var remotePathIsGlob bool - var isPathDir bool - if cfg.Download { - walkingLocalPath = walkLocalPathInsteadOfRemote(cfg) - remotePathDir, remotePathIsGlob = GetDownloadRemotePathDirectory(cfg.RemotePath) - stat, err := c.mountPoint.Lstat(cfg.Path) - isPathDir = err == nil && stat.IsDir() + builder := request.GetBuilder() + walkLocal := walkLocalPathInsteadOfRemote(cfg) + builderStateMu := sync.Mutex{} + reschedule := false + var rescheduleDelay time.Duration + + bulkManagers := make(map[string]*bulkManager) + bulkManagersMu := sync.Mutex{} + for _, bulkOperation := range builder.GetBulkOperations() { + key := fmt.Sprintf("%d-%s", bulkOperation.RstId, bulkOperation.Operation) + manager := newBulkManager(request.JobId, bulkOperation) + bulkManagers[key] = manager + } + abortBuilderJob := func(reason error) (bool, time.Duration, error) { + var bulkManagerErrs []error + for _, manager := range bulkManagers { + client := c.rstMap[manager.rstId] + bulkManagerErrs = append(bulkManagerErrs, client.CancelBulkRequest(ctx, manager.StateMountPath, manager.Operation, reason)) + } + err := errors.Join(reason, errors.Join(bulkManagerErrs...)) + err = fmt.Errorf("job builder request was aborted: %w", err) + return false, 0, err } - reschedule := false - builderStateMu := sync.Mutex{} - maxWorkers := runtime.GOMAXPROCS(0) - walkDoneChan := make(chan struct{}, maxWorkers) - defer close(walkDoneChan) - createJobRequests := func() error { - var err error - var inMountPath string - var remotePath string - for { - select { - case <-ctx.Done(): - return ctx.Err() - case walkResp, ok := <-walkChan: - if !ok { - select { - case walkDoneChan <- struct{}{}: - default: + if walkChan != nil { + getPaths := c.getPathsFn(cfg, walkLocal) + + g, walkCtx := errgroup.WithContext(ctx) + maxWorkers := runtime.GOMAXPROCS(0) + walkDoneChan := make(chan struct{}, maxWorkers) + defer close(walkDoneChan) + createJobRequests := func() error { + jobsWithErrors := int32(0) + jobsSubmitted := int32(0) + defer func() { + builderStateMu.Lock() + builder.Submitted += jobsSubmitted + builder.Errors += jobsWithErrors + builderStateMu.Unlock() + }() + + var walkPath string + for { + select { + case <-walkCtx.Done(): + return walkCtx.Err() + case walkResp, ok := <-walkChan: + if !ok { + select { + case walkDoneChan <- struct{}{}: + default: + } + return nil } - return nil + + if walkResp.Err != nil { + return walkResp.Err + } + + if walkResp.ResumeToken != "" { + builderStateMu.Lock() + reschedule = true + rescheduleDelay = time.Millisecond + request.SetExternalId(walkResp.ResumeToken) + builderStateMu.Unlock() + return nil + } + walkPath = walkResp.Path } - if walkResp.Err != nil { - return walkResp.Err + inMountPath, remotePath, err := getPaths(walkPath) + if err != nil { + return err } - if walkResp.ResumeToken != "" { - builderStateMu.Lock() - reschedule = true - request.SetExternalId(walkResp.ResumeToken) - builderStateMu.Unlock() - return nil + if cfg.GetUpdate() { + if stat, statErr := c.mountPoint.Lstat(inMountPath); statErr == nil && stat.IsDir() { + dirErr := updateDirRstConfig(walkCtx, cfg.RemoteStorageTarget, inMountPath) + jobsSubmitted++ + if dirErr != nil { + jobsWithErrors++ + } + continue + } } - if cfg.Download { - if walkingLocalPath { - // Walking cfg.Path to support stub file download and files with a defined rst. - inMountPath = walkResp.Path - } else { - remotePath = walkResp.Path - inMountPath, err = GetDownloadInMountPath(cfg.Path, remotePath, remotePathDir, remotePathIsGlob, isPathDir, cfg.Flatten) - if err != nil { - // This should never happen since both remotePath and remotePathDir - // come directly from cfg.RemotePath, so any error here indicates a - // bug in the walking logic. - return err + jobRequests, err := BuildJobRequests(walkCtx, c.rstMap, c.mountPoint, inMountPath, remotePath, cfg) + if err != nil { + // BuildJobRequest should only return fatal errors, or if there are no RSTs + // specified/configured on an entry and there is no other way to return the + // error other then aborting the builder job entirely. + return err + } + + for _, jobRequest := range jobRequests { + jobWithError := false + status := jobRequest.GetGenerationStatus() + if isStatusError(status) { + jobWithError = true + } else if status == nil { + client := c.rstMap[jobRequest.GetRemoteStorageTarget()] + include, operation := client.IncludeInBulkRequest(walkCtx, jobRequest) + if include { + managerKey := fmt.Sprintf("%d-%s", jobRequest.RemoteStorageTarget, operation) + + bulkManagersMu.Lock() + manager, ok := bulkManagers[managerKey] + if !ok { + manager = newBulkManager(request.JobId, &flex.BuilderJob_BulkOperation{ + RstId: jobRequest.RemoteStorageTarget, + Operation: operation, + }) + bulkManagers[managerKey] = manager + } + bulkManagersMu.Unlock() + + manager.AddRequest(jobRequest) + continue } + } - // Ensure the local directory structure supports the object downloads - if err := c.mountPoint.CreateDir(filepath.Dir(inMountPath), 0755); err != nil { - return err + select { + case <-walkCtx.Done(): + return walkCtx.Err() + case jobSubmissionChan <- jobRequest: + if jobWithError { + jobsWithErrors++ } + jobsSubmitted++ } - } else { - inMountPath = walkResp.Path - remotePath = inMountPath } } + } + + // Start worker(s) that process walk paths and enqueue job requests. Begin with one and add more + // (up to GOMAXPROCS) when the job submission channel stays near empty, indicating the consumer is + // draining faster than we can fill it. This keeps throughput balanced without over saturating + // the system. + g.Go(func() error { + workers := 1 + lowThresholdTicks := 0 + g.Go(createJobRequests) + for { + select { + case <-walkCtx.Done(): + return nil + case <-walkDoneChan: + return nil + case <-time.After(100 * time.Millisecond): + size := len(jobSubmissionChan) + if workers < maxWorkers && size <= 2*workers { + if size <= workers { + lowThresholdTicks += 3 + } else { + lowThresholdTicks++ + } - if cfg.GetUpdate() { - if stat, statErr := c.mountPoint.Lstat(inMountPath); statErr == nil && stat.IsDir() { - dirErr := updateDirRstConfig(ctx, cfg.RemoteStorageTarget, inMountPath) - builderStateMu.Lock() - builder.Submitted++ - if dirErr != nil { - builder.Errors++ + if lowThresholdTicks >= 3 { + g.Go(createJobRequests) + workers++ + lowThresholdTicks = 0 + } + } else { + lowThresholdTicks = 0 } - builderStateMu.Unlock() - continue } } + }) + + err := g.Wait() + if err != nil { + return abortBuilderJob(err) + } + + if !reschedule { + request.SetExternalId(makeWalkCompleteSentinel(request.JobId)) + } + } - jobRequests, err := BuildJobRequests(ctx, c.rstMap, c.mountPoint, inMountPath, remotePath, cfg) + bulkOperations := []*flex.BuilderJob_BulkOperation{} + g, bulkCtx := errgroup.WithContext(ctx) + for _, manager := range bulkManagers { + g.Go(func() error { + client := c.rstMap[manager.rstId] + // bulkReschedule, bulkDelay, err := client.ManageBulkRequest(bulkCtx, request.JobId, manager.operation, manager.jobRequests) + bulkReschedule, bulkDelay, err := client.ExecuteBulkRequest(bulkCtx, manager.StateMountPath, manager.Operation, manager.JobRequests) if err != nil { - // BuildJobRequest should only return fatal errors, or if there are no RSTs - // specified/configured on an entry and there is no other way to return the - // error other then aborting the builder job entirely. return err } - errorCount := 0 - for _, jobRequest := range jobRequests { - status := jobRequest.GetGenerationStatus() - if status != nil && (status.State == beeremote.JobRequest_GenerationStatus_ERROR || status.State == beeremote.JobRequest_GenerationStatus_FAILED_PRECONDITION) { - errorCount++ - } - select { - case <-ctx.Done(): - case jobSubmissionChan <- jobRequest: - } - } + jobsSubmitted, jobsWithErrors := manager.SubmitJobRequests(bulkCtx, jobSubmissionChan) builderStateMu.Lock() - builder.Submitted += int32(len(jobRequests)) - builder.Errors += int32(errorCount) - builderStateMu.Unlock() - } - } - - // Start worker(s) that process walk paths and enqueue job requests. Begin with one and add more - // (up to GOMAXPROCS) when the job submission channel stays near empty, indicating the consumer is - // draining faster than we can fill it. This keeps throughput balanced without over saturating - // the system. - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - workers := 1 - lowThresholdTicks := 0 - g.Go(createJobRequests) - for { - select { - case <-ctx.Done(): - return nil - case <-walkDoneChan: - return nil - case <-time.After(100 * time.Millisecond): - size := len(jobSubmissionChan) - if workers < maxWorkers && size <= 2*workers { - if size <= workers { - lowThresholdTicks += 3 - } else { - lowThresholdTicks++ - } - - if lowThresholdTicks >= 3 { - g.Go(createJobRequests) - workers++ - lowThresholdTicks = 0 - } + builder.Submitted += jobsSubmitted + builder.Errors += jobsWithErrors + if bulkReschedule { + reschedule = true + if rescheduleDelay == 0 { + rescheduleDelay = bulkDelay } else { - lowThresholdTicks = 0 + rescheduleDelay = min(rescheduleDelay, bulkDelay) } } - } - }) - if err := g.Wait(); err != nil { - return false, fmt.Errorf("job builder request was aborted: %w", err) + builderStateMu.Unlock() + + return nil + }) + bulkOperations = append(bulkOperations, manager.GetBulkOperation()) } + err := g.Wait() + builder.SetBulkOperations(bulkOperations) + if err != nil { + return abortBuilderJob(err) + } + if reschedule { - return true, nil + return true, rescheduleDelay, nil + } + + managerErrIndex := 0 + managerErrs := make([]error, len(bulkManagers)) + wg := sync.WaitGroup{} + for _, manager := range bulkManagers { + index := managerErrIndex + mgr := manager + wg.Go(func() { + client := c.rstMap[mgr.rstId] + if err := client.CompleteBulkRequest(ctx, mgr.StateMountPath, mgr.Operation); err != nil { + managerErrs[index] = err + } + }) + managerErrIndex++ } + wg.Wait() + bulkManagerErr := errors.Join(managerErrs...) var errMessage string totalSubmitted := builder.GetSubmitted() totalErrors := builder.GetErrors() if totalSubmitted == 0 { if cfg.Download { - if walkingLocalPath { + if walkLocal { errMessage = fmt.Sprintf("walking local path since --%s was not provided; No matches found in path: %s", RemotePathFlag, cfg.Path) } else { errMessage = fmt.Sprintf("no matches found in remote path: %s", cfg.RemotePath) @@ -341,15 +447,135 @@ func (c *JobBuilderClient) executeJobBuilderRequest( errMessage = fmt.Sprintf("%d of %d requests were submitted with errors", totalErrors, totalSubmitted) } + if bulkManagerErr != nil { + if errMessage != "" { + errMessage += fmt.Sprintf("; %s", bulkManagerErr) + } else { + errMessage = bulkManagerErr.Error() + } + } + if errMessage != "" { if !IsValidRstId(cfg.RemoteStorageTarget) { errMessage += fmt.Sprintf("; --%s was not provided so relying on configured rstIds and stub urls", RemoteTargetFlag) } - return false, errors.New(errMessage) + return false, 0, errors.New(errMessage) } - return false, nil + return false, 0, nil } func walkLocalPathInsteadOfRemote(cfg *flex.JobRequestCfg) bool { return cfg.RemotePath == "" } + +const builderWalkCompletePrefix = "builder:walk-complete:" + +func makeWalkCompleteSentinel(jobID string) string { + return builderWalkCompletePrefix + jobID +} + +func isWalkCompleteSentinel(externalID, jobID string) bool { + return externalID == makeWalkCompleteSentinel(jobID) +} + +func (c *JobBuilderClient) getPathsFn(cfg *flex.JobRequestCfg, walkingLocalPath bool) func(string) (string, string, error) { + var remotePathDir string + var remotePathIsGlob bool + var isPathDir bool + if cfg.Download { + remotePathDir, remotePathIsGlob = GetDownloadRemotePathDirectory(cfg.RemotePath) + stat, err := c.mountPoint.Lstat(cfg.Path) + isPathDir = err == nil && stat.IsDir() + } + + return func(walkPath string) (inMountPath, remotePath string, err error) { + if cfg.Download { + if walkingLocalPath { + // Walking cfg.Path to support stub file download and files with a defined rst. + inMountPath = walkPath + } else { + remotePath = walkPath + // GetDownloadInMountPath should never return an error happen since remotePath and + // remotePathDir are derived from cfg.RemotePath, so any error here indicates a bug + // in the walking logic. + inMountPath, err = GetDownloadInMountPath(cfg.Path, remotePath, remotePathDir, remotePathIsGlob, isPathDir, cfg.Flatten) + if err == nil { + // Ensure the local directory structure supports the object downloads + err = c.mountPoint.CreateDir(filepath.Dir(inMountPath), 0755) + } + } + } else { + inMountPath = walkPath + remotePath = inMountPath + } + return + } +} + +const ( + bulkManagerPath = ".beegfs-builder-job/bulkManager" +) + +type bulkManager struct { + StateMountPath string + rstId uint32 + Operation string + JobRequests []*beeremote.JobRequest + nextJobIndex int64 + mu sync.Mutex +} + +func newBulkManager(jobId string, bulkOperation *flex.BuilderJob_BulkOperation) *bulkManager { + stateMountPath := path.Join(bulkManagerPath, jobId, fmt.Sprint(bulkOperation.RstId)) + return &bulkManager{ + StateMountPath: stateMountPath, + rstId: bulkOperation.RstId, + Operation: bulkOperation.Operation, + nextJobIndex: bulkOperation.NextRequestId, + JobRequests: []*beeremote.JobRequest{}, + } +} + +func (m *bulkManager) GetBulkOperation() *flex.BuilderJob_BulkOperation { + return &flex.BuilderJob_BulkOperation{ + RstId: m.rstId, + Operation: m.Operation, + NextRequestId: m.nextJobIndex, + } +} + +func (m *bulkManager) AddRequest(request *beeremote.JobRequest) { + m.mu.Lock() + defer m.mu.Unlock() + + request.SetBulkInfo(&flex.BulkJobRequestInfo{ + StateMountPath: m.StateMountPath, + Operation: m.Operation, + JobIndex: m.nextJobIndex, + }) + m.nextJobIndex++ + + m.JobRequests = append(m.JobRequests, request) +} + +func (m *bulkManager) SubmitJobRequests(ctx context.Context, jobSubmissionChan chan<- *beeremote.JobRequest) (jobsSubmitted int32, jobsWithErrors int32) { + for _, request := range m.JobRequests { + jobWithError := false + status := request.GetGenerationStatus() + if isStatusError(status) { + jobWithError = true + } + + select { + case <-ctx.Done(): + return + case jobSubmissionChan <- request: + if jobWithError { + jobsWithErrors++ + } + jobsSubmitted++ + } + } + + return +} diff --git a/common/rst/mock.go b/common/rst/mock.go index f404b4983..a02119cf0 100644 --- a/common/rst/mock.go +++ b/common/rst/mock.go @@ -49,11 +49,11 @@ type MockClient struct { var _ Provider = &MockClient{} -func (r *MockClient) GetJobRequest(cfg *flex.JobRequestCfg) *beeremote.JobRequest { +func (m *MockClient) GetJobRequest(cfg *flex.JobRequestCfg) *beeremote.JobRequest { return nil } -func (rst *MockClient) GenerateWorkRequests(ctx context.Context, lastJob *beeremote.Job, job *beeremote.Job, availableWorkers int) (requests []*flex.WorkRequest, err error) { +func (m *MockClient) GenerateWorkRequests(ctx context.Context, lastJob *beeremote.Job, job *beeremote.Job, availableWorkers int) (requests []*flex.WorkRequest, err error) { if job.Request.GetMock() != nil { if job.Request.GetMock().ShouldFail { @@ -64,14 +64,14 @@ func (rst *MockClient) GenerateWorkRequests(ctx context.Context, lastJob *beerem return workRequests, nil } - args := rst.Called(job, availableWorkers) + args := m.Called(job, availableWorkers) if args.Error(2) != nil { return nil, args.Error(2) } return args.Get(0).([]*flex.WorkRequest), nil } -func (rst *MockClient) ExecuteWorkRequestPart(ctx context.Context, request *flex.WorkRequest, part *flex.Work_Part) error { +func (m *MockClient) ExecuteWorkRequestPart(ctx context.Context, request *flex.WorkRequest, part *flex.Work_Part) error { if request.GetMock() != nil { if request.GetMock().ShouldFail { @@ -81,7 +81,7 @@ func (rst *MockClient) ExecuteWorkRequestPart(ctx context.Context, request *flex return nil } - args := rst.Called(ctx, request, part) + args := m.Called(ctx, request, part) err := args.Error(0) if err == nil { part.Completed = true @@ -90,11 +90,27 @@ func (rst *MockClient) ExecuteWorkRequestPart(ctx context.Context, request *flex } // ExecuteJobBuilderRequest is not implemented and should never be called. -func (r *MockClient) ExecuteJobBuilderRequest(ctx context.Context, workRequest *flex.WorkRequest, jobSubmissionChan chan<- *beeremote.JobRequest) (bool, error) { - return false, ErrUnsupportedOpForRST +func (m *MockClient) ExecuteJobBuilderRequest(ctx context.Context, workRequest *flex.WorkRequest, jobSubmissionChan chan<- *beeremote.JobRequest) (bool, time.Duration, error) { + return false, 0, ErrUnsupportedOpForRST } -func (rst *MockClient) CompleteWorkRequests(ctx context.Context, job *beeremote.Job, workResults []*flex.Work, abort bool) error { +func (m *MockClient) IncludeInBulkRequest(ctx context.Context, request *beeremote.JobRequest) (include bool, operation string) { + return false, "" +} + +func (m *MockClient) ExecuteBulkRequest(ctx context.Context, stateMountPath string, operation string, requests []*beeremote.JobRequest) (reschedule bool, delay time.Duration, err error) { + return false, 0, ErrUnsupportedOpForRST +} + +func (m *MockClient) CompleteBulkRequest(ctx context.Context, stateMountPath string, operation string) error { + return nil +} + +func (m *MockClient) CancelBulkRequest(ctx context.Context, stateMountPath string, operation string, reason error) error { + return nil +} + +func (m *MockClient) CompleteWorkRequests(ctx context.Context, job *beeremote.Job, workResults []*flex.Work, abort bool) error { if job.Request.GetMock() != nil { if job.Request.GetMock().ShouldFail { @@ -103,32 +119,32 @@ func (rst *MockClient) CompleteWorkRequests(ctx context.Context, job *beeremote. return nil } - args := rst.Called(job, workResults, abort) + args := m.Called(job, workResults, abort) return args.Error(0) } -func (rst *MockClient) GetConfig() *flex.RemoteStorageTarget { - args := rst.Called() +func (m *MockClient) GetConfig() *flex.RemoteStorageTarget { + args := m.Called() return args.Get(0).(*flex.RemoteStorageTarget) } -func (r *MockClient) GetWalk(ctx context.Context, path string, chanSize int, resumeToken string, maxRequests int) (<-chan *filesystem.StreamPathResult, error) { +func (m *MockClient) GetWalk(ctx context.Context, path string, chanSize int, resumeToken string, maxRequests int) (<-chan *filesystem.StreamPathResult, error) { return nil, ErrUnsupportedOpForRST } -func (r *MockClient) SanitizeRemotePath(remotePath string) string { +func (m *MockClient) SanitizeRemotePath(remotePath string) string { return remotePath } -func (r *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, bool, bool, error) { +func (m *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, bool, bool, error) { return 0, time.Time{}, false, false, ErrUnsupportedOpForRST } -func (r *MockClient) GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (string, error) { +func (m *MockClient) GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (string, error) { return "", ErrUnsupportedOpForRST } -func (r *MockClient) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Duration, error) { - args := r.Called(request) +func (m *MockClient) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Duration, error) { + args := m.Called(request) return args.Bool(0), args.Get(1).(time.Duration), args.Error(2) } diff --git a/common/rst/rst.go b/common/rst/rst.go index e2216c181..49e28aa7c 100644 --- a/common/rst/rst.go +++ b/common/rst/rst.go @@ -37,6 +37,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -59,11 +60,19 @@ const ( // initialized but empty struct of the correct type. var SupportedRSTTypes = map[string]func() (any, any){ "s3": func() (any, any) { t := new(flex.RemoteStorageTarget_S3_); return t, &t.S3 }, + // XtreemStore is S3-compatible and uses the existing S3 implementation. + "xtreemstore": func() (any, any) { + t := &flex.RemoteStorageTarget_Xtreemstore{Xtreemstore: &flex.RemoteStorageTarget_XtreemStore{}} + return t, &t.Xtreemstore.S3 + }, // Azure is not currently supported, but this is how an Azure type could be added: // "azure": func() (any, any) { t := new(flex.RemoteStorageTarget_Azure_); return t, &t.Azure }, // Mock could be included here if it ever made sense to allow configuration using a file. } +type SubmitBulkRequestFn func(ctx context.Context) +type EmitBulkRequestFn func(ctx context.Context, request *beeremote.JobRequest) +type AppendBulkRequestFn func(ctx context.Context, request *beeremote.JobRequest) type Provider interface { // GetJobRequest builds a provider-specific job request. GetJobRequest(cfg *flex.JobRequestCfg) *beeremote.JobRequest @@ -78,7 +87,7 @@ type Provider interface { // any new requests into jobSubmissionChan. If building jobs is long running, return // rescheduled==true to reschedule the remaining work for later which allows other work time to // complete. - ExecuteJobBuilderRequest(ctx context.Context, workRequest *flex.WorkRequest, jobSubmissionChan chan<- *beeremote.JobRequest) (reschedule bool, err error) + ExecuteJobBuilderRequest(ctx context.Context, workRequest *flex.WorkRequest, jobSubmissionChan chan<- *beeremote.JobRequest) (reschedule bool, delay time.Duration, err error) // ExecuteWorkRequestPart accepts a request and which part of the request it should carry out. // It blocks until the request is complete, but the caller can cancel the provided context to // return early. It determines and executes the requested operation (if supported) then directly @@ -121,6 +130,40 @@ type Provider interface { // start work requests that have been placed into a wait queue. This is useful for providers // that need the ability to wait for resources to be made available before continuing. IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (ready bool, delay time.Duration, err error) + + // IncludeInBulkRequest indicates whether the request should be included in a provider-defined + // bulk operation. operation is an arbitrary provider-defined identifier that groups compatible + // requests within provider bulk request. + IncludeInBulkRequest(ctx context.Context, request *beeremote.JobRequest) (include bool, operation string) + // ExecuteBulkRequest manages the lifecycle of a provider-defined bulk operation for the specified + // stateMountPath and operation. + // + // requests contains the requests selected for this bulk operation in the current builder pass. + // These requests will be submitted immediately after ExecuteBulkRequest returns. On later builder + // passes after a reschedule, requests may be empty if the walk has already completed or there + // are no newly discovered requests to add to the bulk operation. + // + // Providers are responsible for persisting and recovering any provider-side state needed to + // resume ExecuteBulkRequest or later complete/cancel the bulk operation after the builder job is + // rescheduled. stateMountPath, operation, and the provider itself identify the bulk operation. + // + // stateMountPath is builder-maintained workflow metadata. It is a provider-specific in-mount + // path that identifies where provider-side bulk state should be stored and later recovered from + // during rescheduled builder passes. + // + // Use reschedule and delay to reschedule the builder job while bulk work continues. If there are + // multiple bulk operations associated with a single builder job then the shortest delay will be + // chosen. + // + // Errors should only be returned to abort the builder job. Report all non-fatal errors through + // the included job requests. + ExecuteBulkRequest(ctx context.Context, stateMountPath string, operation string, requests []*beeremote.JobRequest) (reschedule bool, delay time.Duration, err error) + // CompleteBulkRequest completes any final provider-side actions needed to finish the bulk + // operation identified by stateMountPath and operation. + CompleteBulkRequest(ctx context.Context, stateMountPath string, operation string) error + // CancelBulkRequest cancels the remaining provider-side work for the bulk operation identified by + // stateMountPath and operation and signals the included job requests to abort if needed. + CancelBulkRequest(ctx context.Context, stateMountPath string, operation string, reason error) error } // New initializes a provider client based on the provided config. It accepts a context that can be @@ -135,6 +178,8 @@ func New(ctx context.Context, config *flex.RemoteStorageTarget, mountPoint files switch config.Type.(type) { case *flex.RemoteStorageTarget_S3_: return newS3(ctx, config, mountPoint) + case *flex.RemoteStorageTarget_Xtreemstore: + return newXtreemstore(ctx, config, mountPoint) case *flex.RemoteStorageTarget_Mock: // This handles setting up a Mock RST for testing from external packages like WorkerMgr. See // the documentation ion `MockClient` in mock.go for how to setup expectations. @@ -167,6 +212,7 @@ func New(ctx context.Context, config *flex.RemoteStorageTarget, mountPoint files // request IDs. func RecreateWorkRequests(job *beeremote.Job, segments []*flex.WorkRequest_Segment) (requests []*flex.WorkRequest) { request := job.GetRequest() + delayExecution := job.Request.GetDelayExecution() // Ensure when adding new fields that all reference types are cloned to ensure WRs are // initialized properly and don't share references with anything else. Otherwise this can lead @@ -188,6 +234,10 @@ func RecreateWorkRequests(job *beeremote.Job, segments []*flex.WorkRequest_Segme Type: &flex.WorkRequest_Builder{Builder: proto.Clone(request.GetBuilder()).(*flex.BuilderJob)}, Priority: new(request.GetPriority()), } + if delayExecution != nil { + jobBuilderWorkRequest.SetDelayExecution(proto.Clone(delayExecution).(*durationpb.Duration)) + } + return []*flex.WorkRequest{jobBuilderWorkRequest} } @@ -206,6 +256,9 @@ func RecreateWorkRequests(job *beeremote.Job, segments []*flex.WorkRequest_Segme StubLocal: request.GetStubLocal(), Priority: new(request.GetPriority()), } + if delayExecution != nil { + wr.SetDelayExecution(proto.Clone(delayExecution).(*durationpb.Duration)) + } switch request.WhichType() { case beeremote.JobRequest_Sync_case: @@ -258,7 +311,14 @@ func generateSegments(fileSize int64, segCount int64, partsPerSegment int32) []* // // A returned error indicates that one or more job request were not able to be built. However, if // a request was able to be built, the error will be specified in the request's GenerationStatus. -func BuildJobRequests(ctx context.Context, rstMap map[uint32]Provider, mountPoint filesystem.Provider, inMountPath string, remotePath string, cfg *flex.JobRequestCfg) ([]*beeremote.JobRequest, error) { +func BuildJobRequests( + ctx context.Context, + rstMap map[uint32]Provider, + mountPoint filesystem.Provider, + inMountPath string, + remotePath string, + cfg *flex.JobRequestCfg, +) ([]*beeremote.JobRequest, error) { keepLock := false lockedInfo, writeLockSet, rstIds, entryInfoMsg, ownerNode, err := GetLockedInfo(ctx, mountPoint, cfg, inMountPath, false) @@ -339,7 +399,6 @@ func BuildJobRequests(ctx context.Context, rstMap map[uint32]Provider, mountPoin keepLock = true } } // If we couldn't build a runnable job request, there would be no active job to drive the normal unlock path so don't keep the lock. - requests = append(requests, request) } diff --git a/common/rst/s3.go b/common/rst/s3.go index 88ebd1522..4613148c3 100644 --- a/common/rst/s3.go +++ b/common/rst/s3.go @@ -35,6 +35,87 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +// s3ApiClient is the low-level s3 transport layer used by S3Client. Provider specific wrappers can +// customize SDK calls here without reimplementing the higher-level RST behavior. +type s3ApiClient interface { + ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) + ListObjectsV2Pages(ctx context.Context, params *s3.ListObjectsV2Input, pageFn func(*s3.ListObjectsV2Output) (bool, error)) error + RestoreObject(ctx context.Context, params *s3.RestoreObjectInput, optFns ...func(*s3.Options)) (*s3.RestoreObjectOutput, error) + HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) + CreateMultipartUpload(ctx context.Context, params *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) + AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) + CompleteMultipartUpload(ctx context.Context, params *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) + PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) + UploadPart(ctx context.Context, params *s3.UploadPartInput, optFns ...func(*s3.Options)) (*s3.UploadPartOutput, error) + GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) +} + +// defaultS3ApiClient is the default s3ApiClient backed by the AWS SDK's s3 client. +type defaultS3ApiClient struct { + client *s3.Client +} + +var _ s3ApiClient = &defaultS3ApiClient{} + +func (d *defaultS3ApiClient) ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { + return d.client.ListObjectsV2(ctx, params, optFns...) +} + +func (d *defaultS3ApiClient) ListObjectsV2Pages(ctx context.Context, params *s3.ListObjectsV2Input, pageFn func(*s3.ListObjectsV2Output) (bool, error)) error { + paginator := s3.NewListObjectsV2Paginator(d.client, params) + for paginator.HasMorePages() { + output, err := paginator.NextPage(ctx) + if err != nil { + return err + } + cont, err := pageFn(output) + if err != nil { + return err + } + if !cont { + return nil + } + } + return nil +} + +func (d *defaultS3ApiClient) RestoreObject(ctx context.Context, params *s3.RestoreObjectInput, optFns ...func(*s3.Options)) (*s3.RestoreObjectOutput, error) { + return d.client.RestoreObject(ctx, params, optFns...) +} + +func (d *defaultS3ApiClient) HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return d.client.HeadObject(ctx, params, optFns...) +} + +func (d *defaultS3ApiClient) CreateMultipartUpload(ctx context.Context, params *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) { + return d.client.CreateMultipartUpload(ctx, params, optFns...) +} + +func (d *defaultS3ApiClient) AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) { + return d.client.AbortMultipartUpload(ctx, params, optFns...) +} + +func (d *defaultS3ApiClient) CompleteMultipartUpload(ctx context.Context, params *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) { + return d.client.CompleteMultipartUpload(ctx, params, optFns...) +} + +func (d *defaultS3ApiClient) PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + return d.client.PutObject(ctx, params, optFns...) +} + +func (d *defaultS3ApiClient) UploadPart(ctx context.Context, params *s3.UploadPartInput, optFns ...func(*s3.Options)) (*s3.UploadPartOutput, error) { + return d.client.UploadPart(ctx, params, optFns...) +} + +func (d *defaultS3ApiClient) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + return d.client.GetObject(ctx, params, optFns...) +} + +func (d *defaultS3ApiClient) DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + return d.client.DeleteObject(ctx, params, optFns...) +} + type S3StorageClass struct { retrievalTier types.Tier archival bool @@ -44,11 +125,14 @@ type S3StorageClass struct { autoRestore bool // defines whether archived objects should be permitted to be restored. } +// S3Client implements the shared Provider behavior for s3 compatible backends and uses s3ApiClient +// to perform the low-level s3 operations. type S3Client struct { config *flex.RemoteStorageTarget - // TODO: https://github.com/thinkparq/gobee/issues/28 - // Rework client into an `s3Provider` interface type. - client *s3.Client + // s3Config holds provider-specific S3 options used by this client. This allows providers such + // as xtreemstore to reuse the S3 implementation while keeping their own top-level RST type. + s3Config *flex.RemoteStorageTarget_S3 + apiClient s3ApiClient mountPoint filesystem.Provider storageClasses map[types.StorageClass]S3StorageClass isListStartAfterKeySupported *bool @@ -58,15 +142,61 @@ type S3Client struct { var _ Provider = &S3Client{} func newS3(ctx context.Context, rstConfig *flex.RemoteStorageTarget, mountPoint filesystem.Provider) (Provider, error) { - s3Provider := rstConfig.GetS3() + return newS3WithOptions(ctx, rstConfig, rstConfig.GetS3(), mountPoint) +} + +type s3ProviderOption func(*s3ProviderBuildCfg) +type s3ProviderBuildCfg struct { + apiClient func(base s3ApiClient) s3ApiClient + s3Options []func(*s3.Options) +} + +func defaultS3ProviderBuildCfg() s3ProviderBuildCfg { + return s3ProviderBuildCfg{ + apiClient: func(base s3ApiClient) s3ApiClient { return base }, + } +} + +func withS3ApiClient(fn func(s3ApiClient) s3ApiClient) s3ProviderOption { + return func(cfg *s3ProviderBuildCfg) { + if fn != nil { + cfg.apiClient = fn + } + } +} + +func withS3Options(optFns ...func(*s3.Options)) s3ProviderOption { + return func(cfg *s3ProviderBuildCfg) { + for _, optFn := range optFns { + if optFn != nil { + cfg.s3Options = append(cfg.s3Options, optFn) + } + } + } +} + +// newS3WithOptions constructs an S3Client. withS3ApiClient is applied before the client is +// created, so shared wrapper state can rely on apiClient already being populated. +func newS3WithOptions(ctx context.Context, rstConfig *flex.RemoteStorageTarget, s3Config *flex.RemoteStorageTarget_S3, mountPoint filesystem.Provider, opts ...s3ProviderOption) (Provider, error) { + if s3Config == nil { + return nil, fmt.Errorf("s3 configuration must be specified") + } + + buildCfg := defaultS3ProviderBuildCfg() + for _, opt := range opts { + if opt != nil { + opt(&buildCfg) + } + } + awsCfg, err := awsConfig.LoadDefaultConfig( ctx, - awsConfig.WithBaseEndpoint(s3Provider.GetEndpointUrl()), - awsConfig.WithRegion(s3Provider.GetRegion()), + awsConfig.WithBaseEndpoint(s3Config.GetEndpointUrl()), + awsConfig.WithRegion(s3Config.GetRegion()), awsConfig.WithCredentialsProvider( credentials.NewStaticCredentialsProvider( - s3Provider.GetAccessKey(), - s3Provider.GetSecretKey(), + s3Config.GetAccessKey(), + s3Config.GetSecretKey(), "", // session token ), ), @@ -79,26 +209,34 @@ func newS3(ctx context.Context, rstConfig *flex.RemoteStorageTarget, mountPoint // was deprecated for new regions in 2020. So, check whether the provided endpoint url starts // with the bucket as part of the hostname. Otherwise, use the path-style. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html - endpointUrl, err := url.Parse(s3Provider.GetEndpointUrl()) + endpointUrl, err := url.Parse(s3Config.GetEndpointUrl()) if err != nil { return nil, fmt.Errorf("unable to parse s3 end-point: %w", err) } - bucket := s3Provider.GetBucket() + bucket := s3Config.GetBucket() host := endpointUrl.Hostname() usePathStyle := !strings.HasPrefix(host, bucket+".") - client := s3.NewFromConfig(awsCfg, func(o *s3.Options) { + awsClient := s3.NewFromConfig(awsCfg, func(o *s3.Options) { o.UsePathStyle = usePathStyle + for _, optFn := range buildCfg.s3Options { + optFn(o) + } }) + apiClient := buildCfg.apiClient(&defaultS3ApiClient{client: awsClient}) + if apiClient == nil { + return nil, fmt.Errorf("s3 api client wrapper returned nil") + } s3Client := &S3Client{ config: rstConfig, - client: client, + s3Config: s3Config, + apiClient: apiClient, mountPoint: mountPoint, storageClasses: make(map[types.StorageClass]S3StorageClass), isListStartAfterKeySupportedMu: sync.Mutex{}, } - for _, class := range s3Provider.StorageClass { + for _, class := range s3Config.StorageClass { name := types.StorageClass(class.GetName()) if name == "" { return nil, fmt.Errorf("storage class must specify a valid storage class name") @@ -148,12 +286,12 @@ func (s *S3Client) checkStartAfterSupport(ctx context.Context) error { } input := &s3.ListObjectsV2Input{ - Bucket: aws.String(s.config.GetS3().Bucket), + Bucket: aws.String(s.s3Config.Bucket), StartAfter: aws.String("-"), MaxKeys: aws.Int32(0), } - if _, err := s.client.ListObjectsV2(ctx, input); err != nil { + if _, err := s.apiClient.ListObjectsV2(ctx, input); err != nil { var apiErr smithy.APIError if errors.As(err, &apiErr) && apiErr.ErrorCode() == "InvalidArgument" && strings.Contains(strings.ToLower(apiErr.ErrorMessage()), "startafter") { s.isListStartAfterKeySupported = new(bool) @@ -266,8 +404,24 @@ func (r *S3Client) GenerateWorkRequests(ctx context.Context, lastJob *beeremote. } // ExecuteJobBuilderRequest is not implemented and should never be called. -func (r *S3Client) ExecuteJobBuilderRequest(ctx context.Context, workRequest *flex.WorkRequest, jobSubmissionChan chan<- *beeremote.JobRequest) (bool, error) { - return false, ErrUnsupportedOpForRST +func (r *S3Client) ExecuteJobBuilderRequest(ctx context.Context, workRequest *flex.WorkRequest, jobSubmissionChan chan<- *beeremote.JobRequest) (bool, time.Duration, error) { + return false, 0, ErrUnsupportedOpForRST +} + +func (r *S3Client) IncludeInBulkRequest(ctx context.Context, request *beeremote.JobRequest) (include bool, operation string) { + return false, "" +} + +func (r *S3Client) ExecuteBulkRequest(ctx context.Context, stateMountPath string, operation string, requests []*beeremote.JobRequest) (reschedule bool, delay time.Duration, err error) { + return false, 0, ErrUnsupportedOpForRST +} + +func (r *S3Client) CompleteBulkRequest(ctx context.Context, stateMountPath string, operation string) error { + return ErrUnsupportedOpForRST +} + +func (r *S3Client) CancelBulkRequest(ctx context.Context, stateMountPath string, operation string, reason error) error { + return ErrUnsupportedOpForRST } func (r *S3Client) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Duration, error) { @@ -294,7 +448,7 @@ func (r *S3Client) IsWorkRequestReady(ctx context.Context, request *flex.WorkReq } restoreObjectInput := &s3.RestoreObjectInput{ - Bucket: aws.String(r.config.GetS3().Bucket), + Bucket: aws.String(r.s3Config.Bucket), Key: aws.String(sync.RemotePath), RestoreRequest: restoreRequest, } @@ -302,7 +456,7 @@ func (r *S3Client) IsWorkRequestReady(ctx context.Context, request *flex.WorkReq // Multiple workers may attempt to restore the same object concurrently. In that // case, a RestoreAlreadyInProgress error can occur and should be ignored. If the // restore has already completed, subsequent requests will succeed with HTTP 200 OK. - if _, err := r.client.RestoreObject(ctx, restoreObjectInput); err != nil { + if _, err := r.apiClient.RestoreObject(ctx, restoreObjectInput); err != nil { var apiErr smithy.APIError if !errors.As(err, &apiErr) || apiErr.ErrorCode() != "RestoreAlreadyInProgress" { return false, 0, err @@ -419,7 +573,7 @@ func (r *S3Client) GetWalk(ctx context.Context, prefix string, chanSize int, res } input := &s3.ListObjectsV2Input{ - Bucket: aws.String(r.config.GetS3().Bucket), + Bucket: aws.String(r.s3Config.Bucket), Prefix: aws.String(prefixWithoutPattern), MaxKeys: aws.Int32(int32(maxKeysPerPage)), } @@ -436,18 +590,8 @@ func (r *S3Client) GetWalk(ctx context.Context, prefix string, chanSize int, res var key string var lastKey string - objectPaginator := s3.NewListObjectsV2Paginator(r.client, input) - keysFound = objectPaginator.HasMorePages() - for objectPaginator.HasMorePages() { - output, err := objectPaginator.NextPage(ctx) - if err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - send(&filesystem.StreamPathResult{Err: fmt.Errorf("prefix walk was cancelled: %w", err)}) - } else { - send(&filesystem.StreamPathResult{Err: fmt.Errorf("prefix walk failed: %w", err)}) - } - return - } + pageFn := func(output *s3.ListObjectsV2Output) (bool, error) { + keysFound = true // When resuming with s3ResumeToken ContinuationToken and ContinuationStartKey, // search for ContinuationStartKey on the page. If it does not exist and there's a @@ -470,7 +614,7 @@ func (r *S3Client) GetWalk(ctx context.Context, prefix string, chanSize int, res if len(filteredContents) == 0 { if nextGreaterKeyIndex == -1 { // There were no greater keys on the current page. So check the next page. - continue + return true, nil } continuationFindStart = false filteredContents = append(filteredContents, output.Contents[nextGreaterKeyIndex:]...) @@ -495,7 +639,7 @@ func (r *S3Client) GetWalk(ctx context.Context, prefix string, chanSize int, res } else { send(&filesystem.StreamPathResult{ResumeToken: token}) } - return + return false, nil } rt := s3ResumeToken{ContinuationToken: aws.ToString(output.ContinuationToken), ContinuationStartKey: key} @@ -504,11 +648,11 @@ func (r *S3Client) GetWalk(ctx context.Context, prefix string, chanSize int, res } else { send(&filesystem.StreamPathResult{ResumeToken: token}) } - return + return false, nil } if !send(&filesystem.StreamPathResult{Path: key}) { - return + return false, nil } lastKey = key @@ -516,15 +660,24 @@ func (r *S3Client) GetWalk(ctx context.Context, prefix string, chanSize int, res maxKeys-- } } + + return true, nil + } + + err := r.apiClient.ListObjectsV2Pages(ctx, input, pageFn) + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + send(&filesystem.StreamPathResult{Err: fmt.Errorf("prefix walk was cancelled: %w", err)}) + } else { + send(&filesystem.StreamPathResult{Err: fmt.Errorf("prefix walk failed: %w", err)}) + } + return } return } if isKey { - _, err := r.client.HeadObject(ctx, &s3.HeadObjectInput{ - Bucket: aws.String(r.config.GetS3().Bucket), - Key: aws.String(prefix), - }) + _, err := r.headObject(ctx, prefix) if err != nil { var apiErr smithy.APIError if errors.As(err, &apiErr) && apiErr.ErrorCode() == "NotFound" { @@ -820,6 +973,14 @@ func (r *S3Client) archiveStatus(storageClass types.StorageClass, restoreMsg *st return status } +func (r *S3Client) headObject(ctx context.Context, key string) (*s3.HeadObjectOutput, error) { + input := &s3.HeadObjectInput{ + Bucket: aws.String(r.s3Config.Bucket), + Key: aws.String(key), + } + return r.apiClient.HeadObject(ctx, input) +} + // getObjectMetadata returns the object's size in bytes, modification time if it exists. func (r *S3Client) getObjectMetadata(ctx context.Context, key string, keyMustExist bool) (int64, time.Time, *s3ArchiveInfo, error) { if key == "" { @@ -829,15 +990,9 @@ func (r *S3Client) getObjectMetadata(ctx context.Context, key string, keyMustExi return 0, time.Time{}, nil, nil } - headObjectInput := &s3.HeadObjectInput{ - Bucket: aws.String(r.config.GetS3().Bucket), - Key: aws.String(key), - } - - resp, err := r.client.HeadObject(ctx, headObjectInput) + resp, err := r.headObject(ctx, key) if err != nil { - var apiErr smithy.APIError - if errors.As(err, &apiErr) { + if apiErr, ok := errors.AsType[smithy.APIError](err); ok { if apiErr.ErrorCode() == "NotFound" || apiErr.ErrorCode() == "NoSuchKey" { return 0, time.Time{}, nil, os.ErrNotExist } @@ -871,7 +1026,7 @@ func (r *S3Client) createUpload(ctx context.Context, path string, mtime time.Tim } createMultipartUploadInput := &s3.CreateMultipartUploadInput{ - Bucket: aws.String(r.config.GetS3().Bucket), + Bucket: aws.String(r.s3Config.Bucket), Key: aws.String(path), Metadata: metadata, Tagging: tagging, @@ -880,7 +1035,7 @@ func (r *S3Client) createUpload(ctx context.Context, path string, mtime time.Tim createMultipartUploadInput.StorageClass = types.StorageClass(*storageClass) } - result, err := r.client.CreateMultipartUpload(ctx, createMultipartUploadInput) + result, err := r.apiClient.CreateMultipartUpload(ctx, createMultipartUploadInput) if err != nil { return "", err } @@ -890,11 +1045,11 @@ func (r *S3Client) createUpload(ctx context.Context, path string, mtime time.Tim func (r *S3Client) abortUpload(ctx context.Context, uploadID string, remotePath string) error { abortMultipartUploadInput := &s3.AbortMultipartUploadInput{ UploadId: aws.String(uploadID), - Bucket: aws.String(r.config.GetS3().Bucket), + Bucket: aws.String(r.s3Config.Bucket), Key: aws.String(remotePath), } - _, err := r.client.AbortMultipartUpload(ctx, abortMultipartUploadInput) + _, err := r.apiClient.AbortMultipartUpload(ctx, abortMultipartUploadInput) return err } @@ -915,7 +1070,7 @@ func (r *S3Client) finishUpload(ctx context.Context, uploadID string, remotePath }) completeMultipartUploadInput := &s3.CompleteMultipartUploadInput{ - Bucket: aws.String(r.config.GetS3().Bucket), + Bucket: aws.String(r.s3Config.Bucket), Key: aws.String(remotePath), UploadId: aws.String(uploadID), MultipartUpload: &types.CompletedMultipartUpload{ @@ -923,7 +1078,7 @@ func (r *S3Client) finishUpload(ctx context.Context, uploadID string, remotePath }, } - _, err := r.client.CompleteMultipartUpload(ctx, completeMultipartUploadInput) + _, err := r.apiClient.CompleteMultipartUpload(ctx, completeMultipartUploadInput) return err } @@ -976,7 +1131,7 @@ func (r *S3Client) upload( } input := &s3.PutObjectInput{ - Bucket: aws.String(r.config.GetS3().Bucket), + Bucket: aws.String(r.s3Config.Bucket), Key: aws.String(remotePath), Body: filePart, ChecksumSHA256: aws.String(part.ChecksumSha256), @@ -989,7 +1144,7 @@ func (r *S3Client) upload( input.StorageClass = types.StorageClass(*storageClass) } - resp, err := r.client.PutObject(ctx, input) + resp, err := r.apiClient.PutObject(ctx, input) if err != nil { return err @@ -999,7 +1154,7 @@ func (r *S3Client) upload( } uploadPartReq := &s3.UploadPartInput{ - Bucket: aws.String(r.config.GetS3().Bucket), + Bucket: aws.String(r.s3Config.Bucket), Key: aws.String(remotePath), UploadId: aws.String(uploadID), PartNumber: aws.Int32(part.PartNumber), @@ -1007,7 +1162,7 @@ func (r *S3Client) upload( ChecksumSHA256: aws.String(part.ChecksumSha256), } - resp, err := r.client.UploadPart(ctx, uploadPartReq) + resp, err := r.apiClient.UploadPart(ctx, uploadPartReq) if err != nil { return err } @@ -1031,12 +1186,12 @@ func (r *S3Client) download(ctx context.Context, path string, remotePath string, defer filePart.Close() getObjectInput := &s3.GetObjectInput{ - Bucket: aws.String(r.config.GetS3().Bucket), + Bucket: aws.String(r.s3Config.Bucket), Key: aws.String(remotePath), Range: aws.String(fmt.Sprintf("bytes=%d-%d", part.OffsetStart, part.OffsetStop)), } - resp, err := r.client.GetObject(ctx, getObjectInput) + resp, err := r.apiClient.GetObject(ctx, getObjectInput) if err != nil { return err } diff --git a/common/rst/s3_test.go b/common/rst/s3_test.go index 107b1de29..591e31624 100644 --- a/common/rst/s3_test.go +++ b/common/rst/s3_test.go @@ -22,6 +22,9 @@ var testS3Client = &S3Client{ config: &flex.RemoteStorageTarget{ Policies: &flex.RemoteStorageTarget_Policies{}, }, + s3Config: &flex.RemoteStorageTarget_S3{ + Bucket: "test-bucket", + }, } func TestGenerateWorkRequests(t *testing.T) { diff --git a/common/rst/xtreemstore.go b/common/rst/xtreemstore.go new file mode 100644 index 000000000..635e7d992 --- /dev/null +++ b/common/rst/xtreemstore.go @@ -0,0 +1,915 @@ +package rst + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "os" + "path" + "sort" + "strconv" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/smithy-go" + smithyhttp "github.com/aws/smithy-go/transport/http" + "github.com/thinkparq/beegfs-go/common/filesystem" + "github.com/thinkparq/protobuf/go/beeremote" + "github.com/thinkparq/protobuf/go/flex" +) + +const ( + XTS_SYSTEM = ".xts-system" + XTS_SYSTEM_ERRORS = XTS_SYSTEM + "/errors" + XTS_SYSTEM_RETRIEVE_SESSION = XTS_SYSTEM + "/retrieve-session.json" + XTS_SYSTEM_RETRIEVE_BATCH_LIST = XTS_SYSTEM + "/retrieve-batch-list.json" + XTS_SYSTEM_RETRIEVE_BATCH_FMT = XTS_SYSTEM + "/retrieve-batch-%d.json" +) + +type xtreemstoreS3Provider struct { + Provider + s3ApiClient + mountPoint filesystem.Provider +} + +var _ Provider = &xtreemstoreS3Provider{} + +type xtreemstoreS3BulkOperation byte + +const ( + xtreemstoreS3BulkOperationUnknown xtreemstoreS3BulkOperation = iota + xtreemstoreS3BulkOperationRetrieve +) + +func (o xtreemstoreS3BulkOperation) String() string { + switch o { + case xtreemstoreS3BulkOperationRetrieve: + return "bulk-retrieve" + default: + return "unknown" + } +} + +func parseBulkOperation(operation string) xtreemstoreS3BulkOperation { + switch operation { + case "bulk-retrieve": + return xtreemstoreS3BulkOperationRetrieve + default: + return xtreemstoreS3BulkOperationUnknown + } +} + +const ( + xtreemstoreS3BulkRequestInitialized byte = iota + xtreemstoreS3BulkRequestComplete +) + +const xtreemstoreS3BulkRetrieveBatchFileVersion = 1 + +var ( + ErrActiveRetrieveSessionAlreadyExists = errors.New("active retrieve-session already exists") +) + +type xtreemstoreS3BulkRetrieveManager struct { + s3ApiClient s3ApiClient + bucket string + operation string + stateMountPath string + state *xtreemstoreS3BulkRetrieveManagerState +} + +type xtreemstoreS3BulkRetrieveSessionInfo struct { + Active bool `json:"active"` + RetrieveId string `json:"retrieve-id"` + Started time.Time `json:"started"` +} + +type xtreemstoreS3BulkRetrieveManagerState struct { + IncludedJobs int64 `json:"included-jobs"` + ActiveRetrieveId string `json:"active-retrieve-id"` + ActiveJobStart int64 `json:"active-job-start"` + ActiveJobEnd int64 `json:"active-job-end"` +} + +type xtreemstoreS3BulkRetrieveBatchInfo struct { + Number int64 `json:"number"` + Objects int64 `json:"objects"` + Size int64 `json:"size"` +} + +type xtreemstoreS3BulkRetrieveRequest struct { + Ids []string `json:"ids,omitempty"` + BucketRetrieve bool `json:"bucket-retrieve,omitempty"` +} + +type xtreemstoreS3BulkRetrieveBatchFile struct { + Version int `json:"version"` + BatchInfo xtreemstoreS3BulkRetrieveBatchInfo + ActiveRetrieveId string `json:"active-retrieve-id"` + ActiveJobStart int64 `json:"active-job-start"` + ActiveJobEnd int64 `json:"active-job-end"` // exclusive + ActiveJobReferences []uint64 `json:"active-job-references"` +} + +type activeSessionStatuses struct { + jobStatuses []byte + jobCount uint64 + offset uint64 // this will correspond the xtreemstoreS3BulkRetrieveManager.state.ActiveJobStart at the time retrieved +} + +func (s *activeSessionStatuses) IsComplete(jobIndex uint64) (bool, error) { + if jobIndex < s.offset { + return false, fmt.Errorf("invalid index for active session") + } + statusesJobIndex := jobIndex - s.offset + if statusesJobIndex >= uint64(len(s.jobStatuses)) { + return false, fmt.Errorf("invalid index for active session") + } + return s.jobStatuses[statusesJobIndex] == xtreemstoreS3BulkRequestComplete, nil +} + +// newXtreemstore initializes an xtreemstore provider by reusing the S3 client implementation. +func newXtreemstore(ctx context.Context, rstConfig *flex.RemoteStorageTarget, mountPoint filesystem.Provider) (Provider, error) { + xtreemstore := rstConfig.GetXtreemstore() + if xtreemstore == nil || xtreemstore.GetS3() == nil { + return nil, fmt.Errorf("xtreemstore configuration must include s3 settings") + } + + wrapper := &xtreemstoreS3Provider{ + mountPoint: mountPoint, + } + s3Client, err := newS3WithOptions(ctx, rstConfig, xtreemstore.GetS3(), mountPoint, + withS3ApiClient(func(base s3ApiClient) s3ApiClient { + wrapper.s3ApiClient = base + return wrapper + }), + ) + if err != nil { + return nil, fmt.Errorf("unable to initialize xtreemstore provider: %w", err) + } + + wrapper.Provider = s3Client + return wrapper, nil +} + +func newXtreemstoreS3BulkRetrieveManager(s3ApiClient s3ApiClient, bucket string, stateMountPath string, operation string) *xtreemstoreS3BulkRetrieveManager { + stateMountPath = path.Join(stateMountPath, operation) + return &xtreemstoreS3BulkRetrieveManager{ + s3ApiClient: s3ApiClient, + bucket: bucket, + operation: operation, + stateMountPath: stateMountPath, + state: &xtreemstoreS3BulkRetrieveManagerState{}, + } +} + +func xtreemstoreS3BulkMarkRequestComplete(mountPath string, bulkInfo *flex.BulkJobRequestInfo) (err error) { + stateMountPath := path.Join(mountPath, bulkInfo.StateMountPath, bulkInfo.Operation) + manager := &xtreemstoreS3BulkRetrieveManager{ + operation: bulkInfo.Operation, + stateMountPath: stateMountPath, + } + + var f *os.File + if f, err = os.OpenFile(manager.getStatusPath(), os.O_WRONLY, os.FileMode(0600)); err != nil { + return + } + + defer func() { + if closeErr := f.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + }() + + _, err = f.WriteAt([]byte{xtreemstoreS3BulkRequestComplete}, bulkInfo.JobIndex) + return + +} + +func (x *xtreemstoreS3Provider) HeadObject(ctx context.Context, in *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + // Update xtreemstore head-object api request headers to include storage details. + optFns = append(optFns, func(options *s3.Options) { + options.APIOptions = append(options.APIOptions, smithyhttp.AddHeaderValue("x-amz-meta-xts-request-storage-details", "true")) + options.APIOptions = append(options.APIOptions, smithyhttp.AddHeaderValue("x-amz-optional-object-attributes", "RestoreStatus")) + }) + + return x.s3ApiClient.HeadObject(ctx, in, optFns...) +} + +func (x *xtreemstoreS3Provider) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (ready bool, delay time.Duration, err error) { + if !request.HasSync() { + return false, 0, ErrReqAndRSTTypeMismatch + } + + ready, delay, err = x.Provider.IsWorkRequestReady(ctx, request) + if ready { + if request.HasBulkInfo() { + err = xtreemstoreS3BulkMarkRequestComplete(x.mountPoint.GetMountPath(), request.GetBulkInfo()) + if err != nil { + return false, 0, fmt.Errorf("failed to mark bulk request complete: %w", err) + } + } + } + + return +} + +func (x *xtreemstoreS3Provider) IncludeInBulkRequest(ctx context.Context, request *beeremote.JobRequest) (include bool, operation string) { + if !request.HasSync() { + return + } + + sync := request.GetSync() + lockedInfo := sync.GetLockedInfo() + if lockedInfo == nil { + return + } + + if lockedInfo.IsArchived { + include = true + operation = xtreemstoreS3BulkOperationRetrieve.String() + } + + return +} + +func (x *xtreemstoreS3Provider) ExecuteBulkRequest(ctx context.Context, stateMountPath string, operation string, requests []*beeremote.JobRequest) (reschedule bool, delay time.Duration, err error) { + bucket := x.GetConfig().GetXtreemstore().S3.Bucket + path := path.Join(x.mountPoint.GetMountPath(), stateMountPath) + + switch parseBulkOperation(operation) { + case xtreemstoreS3BulkOperationRetrieve: + manager := newXtreemstoreS3BulkRetrieveManager(x.s3ApiClient, bucket, path, operation) + if err = manager.LoadManagerState(); err != nil { + return false, 0, fmt.Errorf("failed to load manager state: %w", err) + } + if err := manager.AddRequests(requests); err != nil { + return false, 0, err + } + return manager.Execute(ctx) + default: + return false, 0, ErrUnsupportedOpForRST + } +} + +func (x *xtreemstoreS3Provider) CompleteBulkRequest(ctx context.Context, stateMountPath string, operation string) error { + return x.deleteBulkStateFiles(stateMountPath, operation) +} + +func (x *xtreemstoreS3Provider) CancelBulkRequest(ctx context.Context, stateMountPath string, operation string, reason error) error { + bucket := x.GetConfig().GetXtreemstore().S3.Bucket + path := path.Join(x.mountPoint.GetMountPath(), stateMountPath) + + switch parseBulkOperation(operation) { + case xtreemstoreS3BulkOperationRetrieve: + manager := newXtreemstoreS3BulkRetrieveManager(x.s3ApiClient, bucket, path, operation) + if err := manager.LoadManagerState(); err != nil { + return fmt.Errorf("failed to load manager state: %w", err) + } + + var errs []error + + // TODO: Verify whether deleting the batch files are desired. Is it enough just to destroy the session? + paths, err := manager.getSortedBatchReferencePaths() + if err != nil { + errs = append(errs, err) + } else { + errs = append(errs, manager.removeBatchReferenceFiles(paths)) + } + + errs = append(errs, manager.destroyRetrieveSession(ctx)) + errs = append(errs, x.deleteBulkStateFiles(stateMountPath, operation)) + + return errors.Join(errs...) + default: + return ErrUnsupportedOpForRST + } +} + +func (x *xtreemstoreS3BulkRetrieveManager) Execute(ctx context.Context) (reschedule bool, delay time.Duration, err error) { + sessionInfo, err := x.getSessionInfo(ctx) + if err != nil { + return false, 0, fmt.Errorf("unable to determine whether retrieve-session is active: %w", err) + } + + if sessionInfo.Active { + if sessionInfo.RetrieveId != x.state.ActiveRetrieveId { + // Another session is active so reschedule + return true, 5 * time.Second, nil + } + } else { + if err = x.StartSession(ctx); err != nil { + if errors.Is(err, ErrActiveRetrieveSessionAlreadyExists) { + return true, 5 * time.Second, nil + } + return false, 0, fmt.Errorf("failed to start retrieve-session: %w", err) + } + } + + batchReferencePaths, err := x.ensureBatchReferenceFiles(ctx) + if err != nil { + return false, 0, fmt.Errorf("failed to create batch reference files for the active retrieve-session: %w", err) + } + + statuses, err := x.getActiveSessionStatuses() + if err != nil { + return false, 0, fmt.Errorf("failed to get bulk retrieve entry states: %w", err) + } + + // Review the remaining batches removing any that are complete. Reschedule the work request if a + // batch is still incomplete. + for _, path := range batchReferencePaths { + complete, err := x.checkBulkBatch(path, statuses) + if err != nil { + return false, 0, fmt.Errorf("failed to create bulk batch reference files: %w", err) + } + + if complete { + if err = os.Remove(path); err != nil { + return false, 0, fmt.Errorf("failed to remove batch file! Path: %q, Error: %q", path, err) + } + } else { + return true, 5 * time.Second, nil + } + } + + if err = x.destroyRetrieveSession(ctx); err != nil { + return false, 0, fmt.Errorf("retrieve-session completed successfully, but the active session could not be destroyed and manual intervention is required: %w", err) + } + return false, 0, nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) AddRequests(requests []*beeremote.JobRequest) (err error) { + if err = os.MkdirAll(x.stateMountPath, os.FileMode(0700)); err != nil { + return + } + + queueStatusPath := x.getStatusPath() + var info os.FileInfo + if info, err = os.Stat(queueStatusPath); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return + } + x.state.IncludedJobs = 0 + } else { + x.state.IncludedJobs = info.Size() + } + + var status *os.File + if status, err = os.OpenFile(queueStatusPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.FileMode(0600)); err != nil { + return err + } + + defer func() { + if closeErr := status.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + }() + + var record *os.File + if record, err = os.OpenFile(x.getRecordPath(), os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.FileMode(0600)); err != nil { + return err + } + defer func() { + if closeErr := record.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + }() + + for _, request := range requests { + if err = x.insertRequest(status, record, request); err != nil { + return err + } + } + return nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) LoadManagerState() error { + f, err := os.OpenFile(x.getManagerPath(), os.O_RDONLY, os.FileMode(0600)) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return err + } + defer f.Close() + + return json.NewDecoder(f).Decode(x.state) +} + +func (x *xtreemstoreS3BulkRetrieveManager) saveManagerState() (err error) { + var f *os.File + if f, err = os.OpenFile(x.getManagerPath(), os.O_WRONLY|os.O_CREATE, os.FileMode(0600)); err != nil { + return + } + + defer func() { + if closeErr := f.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + }() + + err = json.NewEncoder(f).Encode(x.state) + return +} + +func (x *xtreemstoreS3BulkRetrieveManager) getStatusPath() string { + return path.Join(x.stateMountPath, "status") +} + +func (x *xtreemstoreS3BulkRetrieveManager) getRecordPath() string { + return path.Join(x.stateMountPath, "record") +} + +func (x *xtreemstoreS3BulkRetrieveManager) getManagerPath() string { + return path.Join(x.stateMountPath, "manager.json") +} + +func (x *xtreemstoreS3BulkRetrieveManager) getBatchPath(number int64) string { + return path.Join(x.stateMountPath, fmt.Sprintf("batch.%d", number)) +} + +func (x *xtreemstoreS3BulkRetrieveManager) insertRequest(status *os.File, record *os.File, request *beeremote.JobRequest) (err error) { + if !request.HasSync() { + return ErrReqAndRSTTypeMismatch + } + if !request.HasBulkInfo() { + return fmt.Errorf("missing request bulkInfo") + } + if request.GetBulkInfo().JobIndex != x.state.IncludedJobs { + return fmt.Errorf("unexpected request bulkInfo.JobIndex") + } + + if _, err = status.Write([]byte{xtreemstoreS3BulkRequestInitialized}); err != nil { + return + } + + remotePath := request.GetSync().GetRemotePath() + if x.state.IncludedJobs == 0 { + _, err = record.WriteString(remotePath) + } else { + _, err = record.WriteString("\n" + remotePath) + } + + x.state.IncludedJobs++ + return +} + +func (x *xtreemstoreS3Provider) deleteBulkStateFiles(stateMountPath string, operation string) error { + statePath := path.Join(x.mountPoint.GetMountPath(), stateMountPath, operation) + if err := os.RemoveAll(statePath); err != nil { + return fmt.Errorf("delete bulk state files: %w", err) + } + + return nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) getSessionInfo(ctx context.Context) (*xtreemstoreS3BulkRetrieveSessionInfo, error) { + getObjectInput := &s3.GetObjectInput{ + Bucket: aws.String(x.bucket), + Key: aws.String(XTS_SYSTEM_RETRIEVE_SESSION), + } + + resp, err := x.s3ApiClient.GetObject(ctx, getObjectInput) + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) && (apiErr.ErrorCode() == "NotFound" || apiErr.ErrorCode() == "NoSuchKey") { + return &xtreemstoreS3BulkRetrieveSessionInfo{}, nil + } + return nil, err + } + defer resp.Body.Close() + + info := &xtreemstoreS3BulkRetrieveSessionInfo{} + if err := json.NewDecoder(resp.Body).Decode(info); err != nil { + return nil, err + } + return info, nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) getSessionBatchInfo(ctx context.Context) ([]xtreemstoreS3BulkRetrieveBatchInfo, error) { + getObjectInput := &s3.GetObjectInput{ + Bucket: aws.String(x.bucket), + Key: aws.String(XTS_SYSTEM_RETRIEVE_BATCH_LIST), + } + + resp, err := x.s3ApiClient.GetObject(ctx, getObjectInput) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var info []xtreemstoreS3BulkRetrieveBatchInfo + if err := json.NewDecoder(resp.Body).Decode(&info); err != nil { + return nil, fmt.Errorf("decode retrieve batch info: %w", err) + } + + return info, nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) getSessionBatchKeys(ctx context.Context, batchInfo xtreemstoreS3BulkRetrieveBatchInfo) ([]string, error) { + getObjectInput := &s3.GetObjectInput{ + Bucket: aws.String(x.bucket), + Key: aws.String(fmt.Sprintf(XTS_SYSTEM_RETRIEVE_BATCH_FMT, batchInfo.Number)), + } + + resp, err := x.s3ApiClient.GetObject(ctx, getObjectInput) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var keys []string + if err := json.NewDecoder(resp.Body).Decode(&keys); err != nil { + return nil, fmt.Errorf("decode retrieve batch keys for batch %d: %w", batchInfo.Number, err) + } + + return keys, nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) createBatchReferenceFiles(ctx context.Context, batchInfos []xtreemstoreS3BulkRetrieveBatchInfo) (paths []string, err error) { + var keyMap map[string]int64 + if keyMap, err = x.getActiveRecordsMap(); err != nil { + return + } + + for _, batchInfo := range batchInfos { + var keys []string + if keys, err = x.getSessionBatchKeys(ctx, batchInfo); err != nil { + return nil, err + } + if len(keys) != int(batchInfo.Objects) { + err = fmt.Errorf("key count does not match the expected number of objects") + return + } + + references := make([]uint64, 0, len(keys)) + for _, key := range keys { + if index, ok := keyMap[key]; ok { + references = append(references, uint64(index)) + } else { + err = fmt.Errorf("key was not found in the key map") + return + } + } + + path := x.getBatchPath(batchInfo.Number) + var f *os.File + if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.FileMode(0600)); err != nil { + return + } + + batchFile := &xtreemstoreS3BulkRetrieveBatchFile{ + Version: xtreemstoreS3BulkRetrieveBatchFileVersion, + BatchInfo: batchInfo, + ActiveRetrieveId: x.state.ActiveRetrieveId, + ActiveJobStart: x.state.ActiveJobStart, + ActiveJobEnd: x.state.ActiveJobEnd, + ActiveJobReferences: references, + } + + if err = json.NewEncoder(f).Encode(batchFile); err != nil { + if closeErr := f.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + return + } + if err = f.Close(); err != nil { + return + } + + paths = append(paths, path) + } + + return +} + +func (x *xtreemstoreS3BulkRetrieveManager) ensureBatchReferenceFiles(ctx context.Context) ([]string, error) { + batchInfos, err := x.getSessionBatchInfo(ctx) + if err != nil { + return nil, fmt.Errorf("failed to load retrieve-session batch info: %w", err) + } + + paths, err := x.getSortedBatchReferencePaths() + if err != nil { + return nil, err + } + + if err := x.validateBatchReferenceFiles(paths, batchInfos); err == nil { + return paths, nil + } else if len(paths) > 0 { + if removeErr := x.removeBatchReferenceFiles(paths); removeErr != nil { + return nil, fmt.Errorf("failed to reset invalid batch reference files: %w", errors.Join(err, removeErr)) + } + } + + return x.createBatchReferenceFiles(ctx, batchInfos) +} + +func (x *xtreemstoreS3BulkRetrieveManager) StartSession(ctx context.Context) error { + if x.state.IncludedJobs == 0 { + return fmt.Errorf("retrieve-session requires at least one key") + } + + previousState := *x.state + cleanupCreatedSession := func(reason error) error { + *x.state = previousState + if cleanupErr := x.destroyRetrieveSession(ctx); cleanupErr != nil { + return fmt.Errorf("retrieve-session was created but local ownership state could not be persisted, cleanup also failed, and manual intervention is required: %w", errors.Join(reason, cleanupErr)) + } + return reason + } + + x.state.ActiveJobStart = x.state.ActiveJobEnd + x.state.ActiveJobEnd = x.state.IncludedJobs + keys, err := x.getActiveRecords() + if err != nil { + return err + } + + retrieveSessionJson, err := json.Marshal(&xtreemstoreS3BulkRetrieveRequest{Ids: keys}) + if err != nil { + return fmt.Errorf("failed to marshal retrieve-session request: %w", err) + } + + _, err = x.s3ApiClient.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(x.bucket), + Key: aws.String(XTS_SYSTEM_RETRIEVE_SESSION), + Body: bytes.NewReader(retrieveSessionJson), + ContentType: aws.String("application/json"), + }) + if err != nil { + var responseErr *smithyhttp.ResponseError + if errors.As(err, &responseErr) && responseErr.HTTPStatusCode() == http.StatusConflict { + *x.state = previousState + return ErrActiveRetrieveSessionAlreadyExists + } + + return fmt.Errorf("failed to start retrieve-session: %w", err) + } + + sessionInfo, err := x.getSessionInfo(ctx) + if err != nil { + return cleanupCreatedSession(fmt.Errorf("failed to recover retrieve-session ownership state after creation: %w", err)) + } + x.state.ActiveRetrieveId = sessionInfo.RetrieveId + + if err = x.saveManagerState(); err != nil { + return cleanupCreatedSession(fmt.Errorf("failed to store retrieve-session ownership state after creation: %w", err)) + } + return nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) destroyRetrieveSession(ctx context.Context) error { + _, err := x.s3ApiClient.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(x.bucket), + Key: aws.String(XTS_SYSTEM_RETRIEVE_SESSION), + }) + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) && (apiErr.ErrorCode() == "NotFound" || apiErr.ErrorCode() == "NoSuchKey") { + return nil + } + return fmt.Errorf("destroy retrieve session: %w", err) + } + + return nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) getActiveRecordsMap() (map[string]int64, error) { + f, err := os.OpenFile(x.getRecordPath(), os.O_RDONLY, os.FileMode(0600)) + if err != nil { + return nil, err + } + defer f.Close() + + if x.state.ActiveJobStart < 0 || x.state.ActiveJobEnd < x.state.ActiveJobStart { + return nil, fmt.Errorf("invalid active record range: start=%d end=%d", x.state.ActiveJobStart, x.state.ActiveJobEnd) + } + + keyMap := make(map[string]int64) + scanner := bufio.NewScanner(f) + for index := int64(0); scanner.Scan(); index++ { + if index < x.state.ActiveJobStart { + continue + } + if index >= x.state.ActiveJobEnd { + break + } + keyMap[scanner.Text()] = index + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("failed to read bulk entry keys file: %w", err) + } + + return keyMap, nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) getActiveRecords() ([]string, error) { + f, err := os.OpenFile(x.getRecordPath(), os.O_RDONLY, os.FileMode(0600)) + if err != nil { + return nil, err + } + defer f.Close() + + if x.state.ActiveJobStart < 0 || x.state.ActiveJobEnd < x.state.ActiveJobStart { + return nil, fmt.Errorf("invalid active record range: start=%d end=%d", x.state.ActiveJobStart, x.state.ActiveJobEnd) + } + + keys := make([]string, 0, max(0, int(x.state.ActiveJobEnd-x.state.ActiveJobStart))) + scanner := bufio.NewScanner(f) + for index := int64(0); scanner.Scan(); index++ { + if index < x.state.ActiveJobStart { + continue + } + if index >= x.state.ActiveJobEnd { + break + } + keys = append(keys, scanner.Text()) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("failed to read queued records: %w", err) + } + + return keys, nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) loadBatchReferenceFile(path string) (*xtreemstoreS3BulkRetrieveBatchFile, error) { + f, err := os.OpenFile(path, os.O_RDONLY, os.FileMode(0600)) + if err != nil { + return nil, fmt.Errorf("failed to open batch reference file %q: %w", path, err) + } + defer f.Close() + + batchFile := &xtreemstoreS3BulkRetrieveBatchFile{} + if err := json.NewDecoder(f).Decode(batchFile); err != nil { + return nil, fmt.Errorf("failed to decode batch reference file %q: %w", path, err) + } + if batchFile.Version != xtreemstoreS3BulkRetrieveBatchFileVersion { + return nil, fmt.Errorf("invalid batch reference file %q: unsupported version %d", path, batchFile.Version) + } + + return batchFile, nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) validateLoadedBatchReferenceFile(path string, batchFile *xtreemstoreS3BulkRetrieveBatchFile) error { + if batchFile.ActiveRetrieveId != x.state.ActiveRetrieveId { + return fmt.Errorf("invalid batch reference file %q: retrieve-session mismatch", path) + } + if batchFile.ActiveJobStart != x.state.ActiveJobStart || batchFile.ActiveJobEnd != x.state.ActiveJobEnd { + return fmt.Errorf("invalid batch reference file %q: active job range mismatch", path) + } + + return nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) validateBatchReferenceFiles(paths []string, batchInfos []xtreemstoreS3BulkRetrieveBatchInfo) error { + if len(paths) != len(batchInfos) { + return fmt.Errorf("unexpected batch reference file count") + } + + expectedByNumber := make(map[int64]xtreemstoreS3BulkRetrieveBatchInfo, len(batchInfos)) + for _, batchInfo := range batchInfos { + expectedByNumber[batchInfo.Number] = batchInfo + } + + seen := make(map[int64]struct{}, len(paths)) + for _, path := range paths { + batchFile, err := x.loadBatchReferenceFile(path) + if err != nil { + return err + } + if err := x.validateLoadedBatchReferenceFile(path, batchFile); err != nil { + return err + } + + expectedBatchInfo, ok := expectedByNumber[batchFile.BatchInfo.Number] + if !ok { + return fmt.Errorf("invalid batch reference file %q: unexpected batch number %d", path, batchFile.BatchInfo.Number) + } + if batchFile.BatchInfo != expectedBatchInfo { + return fmt.Errorf("invalid batch reference file %q: batch metadata mismatch", path) + } + if _, exists := seen[batchFile.BatchInfo.Number]; exists { + return fmt.Errorf("invalid batch reference file %q: duplicate batch number %d", path, batchFile.BatchInfo.Number) + } + seen[batchFile.BatchInfo.Number] = struct{}{} + } + + return nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) removeBatchReferenceFiles(paths []string) error { + errs := make([]error, 0, len(paths)) + for _, path := range paths { + if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { + errs = append(errs, fmt.Errorf("remove batch reference file %q: %w", path, err)) + } + } + + return errors.Join(errs...) +} + +func (x *xtreemstoreS3BulkRetrieveManager) getSortedBatchReferencePaths() ([]string, error) { + prefix := "batch." + entries, err := os.ReadDir(x.stateMountPath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return []string{}, nil + } + return nil, fmt.Errorf("failed to list batch reference files: %w", err) + } + + type batchReference struct { + number int + path string + } + + batches := []batchReference{} + for _, entry := range entries { + if entry.IsDir() { + continue + } + + filename := entry.Name() + if !strings.HasPrefix(filename, prefix) { + continue + } + + suffix := strings.TrimPrefix(filename, prefix) + batchNumber, err := strconv.Atoi(suffix) + if err != nil { + continue + } + + batches = append(batches, batchReference{ + number: batchNumber, + path: path.Join(x.stateMountPath, filename), + }) + } + + sort.Slice(batches, func(i, j int) bool { + return batches[i].number < batches[j].number + }) + + paths := make([]string, 0, len(batches)) + for _, batch := range batches { + paths = append(paths, batch.path) + } + return paths, nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) getActiveSessionStatuses() (*activeSessionStatuses, error) { + if x.state.ActiveJobStart < 0 || x.state.ActiveJobEnd < x.state.ActiveJobStart { + return nil, fmt.Errorf("invalid active record range: start=%d end=%d", x.state.ActiveJobStart, x.state.ActiveJobEnd) + } + + activeStatuses := &activeSessionStatuses{offset: uint64(x.state.ActiveJobStart)} + if x.state.ActiveJobEnd == x.state.ActiveJobStart { + return activeStatuses, nil + } + + f, err := os.OpenFile(x.getStatusPath(), os.O_RDONLY, os.FileMode(0600)) + if err != nil { + return nil, err + } + defer f.Close() + + activeStatuses.jobCount = uint64(x.state.ActiveJobEnd - x.state.ActiveJobStart) + activeStatuses.jobStatuses = make([]byte, activeStatuses.jobCount) + if n, err := f.ReadAt(activeStatuses.jobStatuses, int64(activeStatuses.offset)); err != nil { + return nil, fmt.Errorf("failed to read bulk entry state file: %w", err) + } else if n != len(activeStatuses.jobStatuses) { + return nil, fmt.Errorf("invalid bulk entry state") + } + + return activeStatuses, nil +} + +func (x *xtreemstoreS3BulkRetrieveManager) checkBulkBatch(path string, activeStatuses *activeSessionStatuses) (bool, error) { + batchFile, err := x.loadBatchReferenceFile(path) + if err != nil { + return false, err + } + if err := x.validateLoadedBatchReferenceFile(path, batchFile); err != nil { + return false, err + } + + for _, index := range batchFile.ActiveJobReferences { + if isComplete, err := activeStatuses.IsComplete(index); err != nil { + return false, err + } else if !isComplete { + return false, nil + } + } + + return true, nil +} diff --git a/common/rst/xtreemstore_test.go b/common/rst/xtreemstore_test.go new file mode 100644 index 000000000..de9378f42 --- /dev/null +++ b/common/rst/xtreemstore_test.go @@ -0,0 +1 @@ +package rst diff --git a/ctl/internal/cmd/rst/list.go b/ctl/internal/cmd/rst/list.go index 7bb6d6742..e24b70ebf 100644 --- a/ctl/internal/cmd/rst/list.go +++ b/ctl/internal/cmd/rst/list.go @@ -52,18 +52,16 @@ func runListCmd(cmd *cobra.Command, cfg rst.GetRSTCfg) error { switch rst.WhichType() { case flex.RemoteStorageTarget_S3_case: - stringBuilder := strings.Builder{} rstType = "s3" - rst.GetS3().ProtoReflect().Range(func(fd protoreflect.FieldDescriptor, v protoreflect.Value) bool { - if string(fd.Name()) == "secret_key" && !cfg.ShowSecrets { - stringBuilder.WriteString(fmt.Sprintf("%s: *****, ", fd.Name())) - } else { - stringBuilder.WriteString(fmt.Sprintf("%s: %s, ", fd.Name(), v)) - } - return true - }) - // Get rid of the last comma+space in the printed configuration. - rstConfiguration = stringBuilder.String()[:stringBuilder.Len()-2] + rstConfiguration = formatS3RSTConfiguration(rst.GetS3(), cfg.ShowSecrets) + case flex.RemoteStorageTarget_Xtreemstore_case: + rstType = "xtreemstore" + xtreemstoreConfig := rst.GetXtreemstore() + if xtreemstoreConfig == nil { + rstConfiguration = "xtreemstore configuration is not set" + break + } + rstConfiguration = formatS3RSTConfiguration(xtreemstoreConfig.GetS3(), cfg.ShowSecrets) default: if !cfg.ShowSecrets { rstType = "unknown" @@ -85,3 +83,25 @@ func runListCmd(cmd *cobra.Command, cfg rst.GetRSTCfg) error { return nil } + +func formatS3RSTConfiguration(s3Config *flex.RemoteStorageTarget_S3, showSecrets bool) string { + if s3Config == nil { + return "s3 configuration is not set" + } + + stringBuilder := strings.Builder{} + s3Config.ProtoReflect().Range(func(fd protoreflect.FieldDescriptor, v protoreflect.Value) bool { + if string(fd.Name()) == "secret_key" && !showSecrets { + fmt.Fprintf(&stringBuilder, "%s: *****, ", fd.Name()) + } else { + fmt.Fprintf(&stringBuilder, "%s: %s, ", fd.Name(), v) + } + return true + }) + + if stringBuilder.Len() == 0 { + return "" + } + // Get rid of the last comma+space in the printed configuration. + return stringBuilder.String()[:stringBuilder.Len()-2] +} diff --git a/rst/remote/build/beegfs-remote.toml b/rst/remote/build/beegfs-remote.toml index 9584d3d02..a0e812c82 100644 --- a/rst/remote/build/beegfs-remote.toml +++ b/rst/remote/build/beegfs-remote.toml @@ -107,6 +107,18 @@ path-db = "/var/lib/beegfs/remote/path.badger" # access-key = "" # secret-key = "" +# [[remote-storage-target]] +# id = "4" +# name = "xtreemstore" +# policies = { FastStartMaxSize = 104857600 } +# +# [remote-storage-target.xtreemstore] +# endpoint-url = "https://:" +# region = "" +# bucket = "" +# access-key = "" +# secret-key = "" + # # --- Section 2: [Command Line Arguments] --- # diff --git a/rst/remote/internal/job/job.go b/rst/remote/internal/job/job.go index e00adfc81..ba98d0039 100644 --- a/rst/remote/internal/job/job.go +++ b/rst/remote/internal/job/job.go @@ -110,7 +110,6 @@ func (j *Job) GenerateSubmission(ctx context.Context, lastJob *Job, rstClient rs seg := proto.Clone(wr.GetSegment()).(*flex.WorkRequest_Segment) j.Segments = append(j.Segments, &Segment{segment: seg}) } - } else { workRequests = rst.RecreateWorkRequests(j.Get(), j.GetSegments()) } diff --git a/rst/remote/internal/job/manager.go b/rst/remote/internal/job/manager.go index 24a4b8712..00e931cb5 100644 --- a/rst/remote/internal/job/manager.go +++ b/rst/remote/internal/job/manager.go @@ -495,22 +495,20 @@ func (m *Manager) SubmitJobRequest(jr *beeremote.JobRequest) (*beeremote.JobResu var jobSubmission workermgr.JobSubmission if jr.GenerationStatus != nil { status := jr.GenerationStatus - if status != nil { - switch status.State { - case beeremote.JobRequest_GenerationStatus_ALREADY_COMPLETE: - // ParseDataTime will return the parsed mtime or a zero-mtime. Either way we should - // mark the job as complete so ignore the err. - mtime, _ := time.ParseDateTime(status.Message) - err = rst.GetErrJobAlreadyCompleteWithMtime(mtime) - case beeremote.JobRequest_GenerationStatus_ALREADY_OFFLOADED: - err = rst.ErrJobAlreadyOffloaded - case beeremote.JobRequest_GenerationStatus_FAILED_PRECONDITION: - err = fmt.Errorf("%w: %s", rst.ErrJobFailedPrecondition, status.Message) - case beeremote.JobRequest_GenerationStatus_ERROR: - err = errors.New(status.Message) - default: - err = fmt.Errorf("failure occurred while generating job request and the state is unknown: %s", status.Message) - } + switch status.GetState() { + case beeremote.JobRequest_GenerationStatus_ALREADY_COMPLETE: + // ParseDataTime will return the parsed mtime or a zero-mtime. Either way we should + // mark the job as complete so ignore the err. + mtime, _ := time.ParseDateTime(status.Message) + err = rst.GetErrJobAlreadyCompleteWithMtime(mtime) + case beeremote.JobRequest_GenerationStatus_ALREADY_OFFLOADED: + err = rst.ErrJobAlreadyOffloaded + case beeremote.JobRequest_GenerationStatus_FAILED_PRECONDITION: + err = fmt.Errorf("%w: %s", rst.ErrJobFailedPrecondition, status.Message) + case beeremote.JobRequest_GenerationStatus_ERROR: + err = errors.New(status.Message) + default: + err = fmt.Errorf("failure occurred while generating job request and the state is unknown: %s", status.Message) } } else { jobSubmission, err = job.GenerateSubmission(m.ctx, lastJob, rstClient) @@ -1024,6 +1022,15 @@ func (m *Manager) UpdateWork(workResult *flex.Work) error { for _, workResult := range job.WorkResults { if !workResult.InTerminalState() && !workResult.RequiresUserIntervention() { // Don't do anything else if all work requests haven't reached a terminal state or aren't failed. + // Reflect active execution once any worker reports progress, but don't finalize job state + // until all work requests have finished or need intervention. + if entryToUpdate.Status().GetState() == flex.Work_RUNNING { + status := job.GetStatus() + status.SetState(beeremote.Job_RUNNING) + status.SetMessage("one or more work requests are in progress") + status.SetUpdated(timestamppb.Now()) + } + return nil } // Verify all work requests have reached the same terminal state. diff --git a/rst/sync/internal/workmgr/manager.go b/rst/sync/internal/workmgr/manager.go index fabfbc3e8..78174e141 100644 --- a/rst/sync/internal/workmgr/manager.go +++ b/rst/sync/internal/workmgr/manager.go @@ -399,6 +399,7 @@ func (m *Manager) pullInWork(start string, stop string, availableTokens *int) (n return } + currentTime := time.Now() var lastSubmissionId string for item != nil && *availableTokens > 0 { submissionId := item.Key @@ -410,18 +411,24 @@ func (m *Manager) pullInWork(start string, stop string, availableTokens *int) (n workRequestID: entry.WorkRequest.RequestId, } - // Check based on the work identifier if there is already an existing workContext in the - // activeWork map. If so skip adding it to the map or queue again as we could block a worker - // when it tries to lock the journal entry if another worker is already handling the WR. - if _, ok := m.activeWork[workId]; !ok { - workCtx, workCtxCancel := context.WithCancel(m.workerCtx) - activeWork := workAssignment{ctx: workCtx, workIdentifier: workId} - m.activeWork[activeWork.workIdentifier] = workContext{ctx: workCtx, cancel: workCtxCancel} - m.activeWorkQueue <- activeWork - *availableTokens -= 1 - m.scheduler.RemoveWorkToken(submissionId) - priority := priorityIdMap[entry.WorkRequest.GetPriority()] - beeSyncQueued.Add(priority, 1) + if currentTime.Before(entry.ExecuteAfter) { + if err = m.rescheduleWork(workId); err != nil { + return + } + } else { + if _, ok := m.activeWork[workId]; !ok { + // Check based on the work identifier if there is already an existing workContext in the + // activeWork map. If so skip adding it to the map or queue again as we could block a worker + // when it tries to lock the journal entry if another worker is already handling the WR. + workCtx, workCtxCancel := context.WithCancel(m.workerCtx) + activeWork := workAssignment{ctx: workCtx, workIdentifier: workId} + m.activeWork[activeWork.workIdentifier] = workContext{ctx: workCtx, cancel: workCtxCancel} + m.activeWorkQueue <- activeWork + *availableTokens -= 1 + m.scheduler.RemoveWorkToken(submissionId) + priority := priorityIdMap[entry.WorkRequest.GetPriority()] + beeSyncQueued.Add(priority, 1) + } } item, err = nextItem() @@ -443,6 +450,32 @@ func (m *Manager) pullInWork(start string, stop string, availableTokens *int) (n return nextSubmissionId, nil } +// rescheduleWork moves the work to the wait-queue until executeAfter expires. +func (m *Manager) rescheduleWork(workId workIdentifier) error { + submissionId := workId.submissionID + journalEntry, commitJournalEntry, err := m.workJournal.GetAndLockEntry(submissionId) + if err != nil { + if errors.Is(err, kvstore.ErrEntryAlreadyDeleted) || errors.Is(err, kvstore.ErrEntryNotInDB) { + // If the entry was already deleted or not found, then likely it was cancelled before we + // picked it up. There is nothing more for us to do. Just ignore the reschedule. + return nil + } + return fmt.Errorf("unable to reschedule work: %w", err) + } + + entry := journalEntry.Value + status := entry.WorkResult.GetStatus() + status.SetState(flex.Work_RESCHEDULED) + status.SetMessage("waiting for work request to be ready") + if err := commitJournalEntry(); err != nil { + return err + } + + m.scheduler.RemoveWorkToken(submissionId) + m.scheduler.AddRescheduleWorkToken(submissionId, entry.ExecuteAfter) + return nil +} + // pullInRescheduledWork moves ready rescheduled work from the priority range to the activeWork map // and returns the next the time more rescheduled work will be ready. Whenever there's an error, // time.Now().Add(workJournalRetryTime) will be returned as the nextExecuteAfter time. Zero time is @@ -539,8 +572,10 @@ func (m *Manager) initScheduler(priority int, start string, stop string) (entrie return } - var rescheduledCount int var scheduledCount int + var rescheduledCount int + var replayCount int + var unrecoverableCount int var submissionId string isNextSubmissionIdSet := false workRequestPriority := priorityIdMap[int32(priority+1)] @@ -548,7 +583,17 @@ func (m *Manager) initScheduler(priority int, start string, stop string) (entrie submissionId = submission.Key entry := submission.Entry.Value - if entry.ExecuteAfter.IsZero() { + var status *flex.Work_Status + var state flex.Work_State + if entry.WorkResult != nil { + status = entry.WorkResult.GetStatus() + } + if status != nil { + state = status.GetState() + } + + switch state { + case flex.Work_SCHEDULED: m.scheduler.AddWorkToken(submissionId) if !isNextSubmissionIdSet { m.scheduler.SetNextSubmissionId(submissionId, priority) @@ -556,13 +601,30 @@ func (m *Manager) initScheduler(priority int, start string, stop string) (entrie } scheduledCount++ beeSyncNewRequests.Add(workRequestPriority, 1) - } else { + case flex.Work_RESCHEDULED: m.scheduler.AddRescheduleWorkToken(submissionId, entry.ExecuteAfter) rescheduledCount++ beeSyncRescheduled.Add(workRequestPriority, 1) + case flex.Work_RUNNING, flex.Work_COMPLETED: + // Submission has already been scheduled and executed so treat it as rescheduled work + // since this preserves the next submissionId priority scheduler boundary. + m.scheduler.AddRescheduleWorkToken(submissionId, time.Time{}) + replayCount++ + default: + m.log.Warn("skipping unrecoverable work journal entry during scheduler init", + zap.String("submissionId", submissionId), + zap.String("jobId", entry.WorkRequest.GetJobId()), + zap.String("requestId", entry.WorkRequest.GetRequestId()), + zap.String("state", state.String()), + zap.Time("executeAfter", entry.ExecuteAfter), + zap.Bool("hasWorkResult", entry.WorkResult != nil), + zap.Bool("hasStatus", status != nil), + ) + m.scheduler.AddRescheduleWorkToken(submissionId, time.Time{}) + unrecoverableCount++ } - entriesFound++ + entriesFound++ submission, err = nextItem() if err != nil { err = fmt.Errorf("unable to get work journal entry: %w", err) @@ -570,7 +632,8 @@ func (m *Manager) initScheduler(priority int, start string, stop string) (entrie } } - if scheduledCount == 0 && rescheduledCount > 0 { + allRescheduledCount := rescheduledCount + replayCount + if scheduledCount == 0 && allRescheduledCount > 0 { // All recovered were rescheduled work request so increment the last known rescheduled // submissionId to get the nextExpectedSubmissionId. nextExpectedSubmissionId, _, err := scheduler.IncrementSubmissionId(submissionId) @@ -581,8 +644,14 @@ func (m *Manager) initScheduler(priority int, start string, stop string) (entrie } } - if scheduledCount > 0 || rescheduledCount > 0 { - m.log.Info(" recovered work requests", zap.String("priority", workRequestPriority), zap.Int("scheduled", scheduledCount), zap.Int("rescheduled", rescheduledCount)) + if scheduledCount > 0 || rescheduledCount > 0 || replayCount > 0 || unrecoverableCount > 0 { + m.log.Info(" recovered work requests", + zap.String("priority", workRequestPriority), + zap.Int("scheduled", scheduledCount), + zap.Int("rescheduled", rescheduledCount), + zap.Int("replayed", replayCount), + zap.Int("unrecoverable", unrecoverableCount), + ) } return } @@ -643,6 +712,10 @@ func (m *Manager) SubmitWorkRequest(wr *flex.WorkRequest) (*flex.Work, error) { wr.SetPriority(priority) workEntry.Value.WorkRequest = &workRequest{WorkRequest: wr} workResult := newWorkFromRequest(workEntry.Value.WorkRequest) + if wr.GetDelayExecution() != nil { + executeAfter := time.Now().Add(wr.GetDelayExecution().AsDuration()) + workEntry.Value.ExecuteAfter = executeAfter + } workEntry.Value.WorkResult = workResult job.Value[workRequestId] = submissionId diff --git a/rst/sync/internal/workmgr/manager_test.go b/rst/sync/internal/workmgr/manager_test.go index 23bab1836..e537f851e 100644 --- a/rst/sync/internal/workmgr/manager_test.go +++ b/rst/sync/internal/workmgr/manager_test.go @@ -221,6 +221,7 @@ func TestSubmitWorkRequest(t *testing.T) { // First simulate a successful request: mockRST.On("ExecuteWorkRequestPart", mock.Anything, matchJobAndRequestID("0", "0"), mock.Anything).Return(nil).Times(2) mockRST.On("IsWorkRequestReady", matchJobAndRequestID("0", "0")).Return(true, time.Duration(0), nil).Times(1) + mockBeeRemote.On("updateWork", matchRespIDsAndStatus("0", "0", flex.Work_RUNNING)).Return(nil).Times(1) mockBeeRemote.On("updateWork", matchRespIDsAndStatus("0", "0", flex.Work_COMPLETED)).Return(nil).Times(1) testRequest1 := proto.Clone(baseTestRequest).(*flex.WorkRequest) resp, err := mgr.SubmitWorkRequest(testRequest1) @@ -233,6 +234,7 @@ func TestSubmitWorkRequest(t *testing.T) { // Then simulate the RST returning an error (note if an error happens the state is always failed): mockRST.On("ExecuteWorkRequestPart", mock.Anything, matchJobAndRequestID("1", "0"), mock.Anything).Return(fmt.Errorf("test wants an error")).Times(1) mockRST.On("IsWorkRequestReady", matchJobAndRequestID("1", "0")).Return(true, time.Duration(0), nil).Times(1) + mockBeeRemote.On("updateWork", matchRespIDsAndStatus("1", "0", flex.Work_RUNNING)).Return(nil).Times(1) mockBeeRemote.On("updateWork", matchRespIDsAndStatus("1", "0", flex.Work_FAILED)).Return(nil).Times(1) testRequest2 := proto.Clone(baseTestRequest).(*flex.WorkRequest) testRequest2.SetJobId("1") @@ -251,6 +253,7 @@ func TestSubmitWorkRequest(t *testing.T) { // Then simulate the request completing, but it was not able to be sent to BeeRemote. // Also for some "reason" a job ID was skipped, but it should still get picked up. mockRST.On("ExecuteWorkRequestPart", mock.Anything, matchJobAndRequestID("3", "1"), mock.Anything).Return(nil).Times(2) + mockBeeRemote.On("updateWork", matchRespIDsAndStatus("3", "1", flex.Work_RUNNING)).Return(nil).Times(1) mockBeeRemote.On("updateWork", matchRespIDsAndStatus("3", "1", flex.Work_COMPLETED)).Return(fmt.Errorf("test requests a failed response from BeeRemote")) mockRST.On("IsWorkRequestReady", matchJobAndRequestID("3", "1")).Return(true, time.Duration(0), nil).Times(1) testRequest3 := proto.Clone(baseTestRequest).(*flex.WorkRequest) @@ -301,6 +304,7 @@ func TestUpdateRequests(t *testing.T) { // send a response to BeeRemote. mockRST.On("ExecuteWorkRequestPart", mock.Anything, matchJobAndRequestID("1", "2"), mock.Anything).Return(fmt.Errorf("test wants an error")).Times(1) mockRST.On("IsWorkRequestReady", matchJobAndRequestID("1", "2")).Return(true, time.Duration(0), nil).Times(1) + mockBeeRemote.On("updateWork", matchRespIDsAndStatus("1", "2", flex.Work_RUNNING)).Return(nil).Times(1) mockBeeRemote.On("updateWork", matchRespIDsAndStatus("1", "2", flex.Work_FAILED)).Return(fmt.Errorf("test requests a failed response from BeeRemote")) testRequest2 := proto.Clone(baseTestRequest).(*flex.WorkRequest) testRequest2.SetJobId("1") @@ -329,6 +333,7 @@ func TestUpdateRequests(t *testing.T) { // Force the the request to stay active because it can't send a response to BeeRemote. mockRST.On("ExecuteWorkRequestPart", mock.Anything, matchJobAndRequestID("1", "2"), mock.Anything).Return(nil).Times(2) mockRST.On("IsWorkRequestReady", matchJobAndRequestID("1", "2")).Return(true, time.Duration(0), nil).Times(1) + mockBeeRemote.On("updateWork", matchRespIDsAndStatus("1", "2", flex.Work_RUNNING)).Return(nil).Times(1) mockBeeRemote.On("updateWork", matchRespIDsAndStatus("1", "2", flex.Work_COMPLETED)).Return(fmt.Errorf("test requests a failed response from BeeRemote")) testRequest2_2 := proto.Clone(baseTestRequest).(*flex.WorkRequest) testRequest2_2.SetJobId("1") @@ -354,6 +359,7 @@ func TestUpdateRequests(t *testing.T) { // send a response to BeeRemote. mockRST.On("ExecuteWorkRequestPart", mock.Anything, matchJobAndRequestID("1", "3"), mock.Anything).Return(nil).Times(2) mockRST.On("IsWorkRequestReady", matchJobAndRequestID("1", "3")).Return(true, time.Duration(0), nil).Times(1) + mockBeeRemote.On("updateWork", matchRespIDsAndStatus("1", "3", flex.Work_RUNNING)).Return(nil).Times(1) mockBeeRemote.On("updateWork", matchRespIDsAndStatus("1", "3", flex.Work_COMPLETED)).Return(fmt.Errorf("test requests a failed response from BeeRemote")) testRequest3 := proto.Clone(baseTestRequest).(*flex.WorkRequest) testRequest3.SetJobId("1") diff --git a/rst/sync/internal/workmgr/work.go b/rst/sync/internal/workmgr/work.go index 6761d3701..1b5e6403d 100644 --- a/rst/sync/internal/workmgr/work.go +++ b/rst/sync/internal/workmgr/work.go @@ -303,10 +303,15 @@ func (w *worker) process(work workAssignment) { return } - // Update the entry in BadgerDB so other goroutines can get read only access to the result. + // Update the entry in BadgerDB so other goroutines can get read only access to the result, then + // make a best-effort, non-blocking attempt to notify BeeRemote that the work request is running. status.SetState(flex.Work_RUNNING) status.SetMessage("attempting to carry out the work request") commitJournalEntry(kvstore.WithUpdateOnly(true)) + if _, err := w.beeRemoteClient.UpdateWorkRequest(work.ctx, result.Work); err != nil { + log.Warn("unable to update remote job status to running; continuing work request without retrying", zap.Error(err)) + } + if request.HasBuilder() { cleanupEntries = w.processBuilder(work, client, entry) } else { @@ -394,11 +399,12 @@ func (w *worker) processBuilder(work workAssignment, client rst.Provider, entry mappedPriorityId := priorityIdMap[request.GetPriority()] var reschedule bool + var rescheduleDelay time.Duration var err error jobSubmissionChan := make(chan *pbr.JobRequest, 2048) go func() { defer close(jobSubmissionChan) - reschedule, err = client.ExecuteJobBuilderRequest(work.ctx, request.WorkRequest, jobSubmissionChan) + reschedule, rescheduleDelay, err = client.ExecuteJobBuilderRequest(work.ctx, request.WorkRequest, jobSubmissionChan) }() total := 0 @@ -437,7 +443,7 @@ processJobs: message = fmt.Sprintf("%s: %d job request(s) failed! See `beegfs remote status/job list` for details", message, totalErrors) } status.SetMessage(message) - entry.ExecuteAfter = time.Now() + entry.ExecuteAfter = time.Now().Add(rescheduleDelay) w.sendWorkResult(work, result.Work) w.rescheduleWork(work.submissionID, entry.ExecuteAfter) beeSyncRescheduled.Add(mappedPriorityId, 1)