Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
484 changes: 355 additions & 129 deletions common/rst/builder.go

Large diffs are not rendered by default.

50 changes: 33 additions & 17 deletions common/rst/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ type MockClient struct {

var _ Provider = &MockClient{}

func (r *MockClient) GetJobRequest(cfg *flex.JobRequestCfg) *beeremote.JobRequest {
func (m *MockClient) GetJobRequest(cfg *flex.JobRequestCfg) *beeremote.JobRequest {
return nil
}

func (rst *MockClient) GenerateWorkRequests(ctx context.Context, lastJob *beeremote.Job, job *beeremote.Job, availableWorkers int) (requests []*flex.WorkRequest, err error) {
func (m *MockClient) GenerateWorkRequests(ctx context.Context, lastJob *beeremote.Job, job *beeremote.Job, availableWorkers int) (requests []*flex.WorkRequest, err error) {

if job.Request.GetMock() != nil {
if job.Request.GetMock().ShouldFail {
Expand All @@ -64,14 +64,14 @@ func (rst *MockClient) GenerateWorkRequests(ctx context.Context, lastJob *beerem
return workRequests, nil
}

args := rst.Called(job, availableWorkers)
args := m.Called(job, availableWorkers)
if args.Error(2) != nil {
return nil, args.Error(2)
}
return args.Get(0).([]*flex.WorkRequest), nil
}

func (rst *MockClient) ExecuteWorkRequestPart(ctx context.Context, request *flex.WorkRequest, part *flex.Work_Part) error {
func (m *MockClient) ExecuteWorkRequestPart(ctx context.Context, request *flex.WorkRequest, part *flex.Work_Part) error {

if request.GetMock() != nil {
if request.GetMock().ShouldFail {
Expand All @@ -81,7 +81,7 @@ func (rst *MockClient) ExecuteWorkRequestPart(ctx context.Context, request *flex
return nil
}

args := rst.Called(ctx, request, part)
args := m.Called(ctx, request, part)
err := args.Error(0)
if err == nil {
part.Completed = true
Expand All @@ -90,11 +90,27 @@ func (rst *MockClient) ExecuteWorkRequestPart(ctx context.Context, request *flex
}

// ExecuteJobBuilderRequest is not implemented and should never be called.
func (r *MockClient) ExecuteJobBuilderRequest(ctx context.Context, workRequest *flex.WorkRequest, jobSubmissionChan chan<- *beeremote.JobRequest) (bool, error) {
return false, ErrUnsupportedOpForRST
func (m *MockClient) ExecuteJobBuilderRequest(ctx context.Context, workRequest *flex.WorkRequest, jobSubmissionChan chan<- *beeremote.JobRequest) (bool, time.Duration, error) {
return false, 0, ErrUnsupportedOpForRST
}

func (rst *MockClient) CompleteWorkRequests(ctx context.Context, job *beeremote.Job, workResults []*flex.Work, abort bool) error {
func (m *MockClient) IncludeInBulkRequest(ctx context.Context, request *beeremote.JobRequest) (include bool, operation string) {
return false, ""
}

func (m *MockClient) ExecuteBulkRequest(ctx context.Context, stateMountPath string, operation string, requests []*beeremote.JobRequest) (reschedule bool, delay time.Duration, err error) {
return false, 0, ErrUnsupportedOpForRST
}

func (m *MockClient) CompleteBulkRequest(ctx context.Context, stateMountPath string, operation string) error {
return nil
}

func (m *MockClient) CancelBulkRequest(ctx context.Context, stateMountPath string, operation string, reason error) error {
return nil
}

func (m *MockClient) CompleteWorkRequests(ctx context.Context, job *beeremote.Job, workResults []*flex.Work, abort bool) error {

if job.Request.GetMock() != nil {
if job.Request.GetMock().ShouldFail {
Expand All @@ -103,32 +119,32 @@ func (rst *MockClient) CompleteWorkRequests(ctx context.Context, job *beeremote.
return nil
}

args := rst.Called(job, workResults, abort)
args := m.Called(job, workResults, abort)
return args.Error(0)
}

func (rst *MockClient) GetConfig() *flex.RemoteStorageTarget {
args := rst.Called()
func (m *MockClient) GetConfig() *flex.RemoteStorageTarget {
args := m.Called()
return args.Get(0).(*flex.RemoteStorageTarget)
}

func (r *MockClient) GetWalk(ctx context.Context, path string, chanSize int, resumeToken string, maxRequests int) (<-chan *filesystem.StreamPathResult, error) {
func (m *MockClient) GetWalk(ctx context.Context, path string, chanSize int, resumeToken string, maxRequests int) (<-chan *filesystem.StreamPathResult, error) {
return nil, ErrUnsupportedOpForRST
}

func (r *MockClient) SanitizeRemotePath(remotePath string) string {
func (m *MockClient) SanitizeRemotePath(remotePath string) string {
return remotePath
}

func (r *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, bool, bool, error) {
func (m *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, bool, bool, error) {
return 0, time.Time{}, false, false, ErrUnsupportedOpForRST
}

func (r *MockClient) GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (string, error) {
func (m *MockClient) GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (string, error) {
return "", ErrUnsupportedOpForRST
}

func (r *MockClient) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Duration, error) {
args := r.Called(request)
func (m *MockClient) IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (bool, time.Duration, error) {
args := m.Called(request)
return args.Bool(0), args.Get(1).(time.Duration), args.Error(2)
}
65 changes: 62 additions & 3 deletions common/rst/rst.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand All @@ -59,11 +60,19 @@ const (
// initialized but empty struct of the correct type.
var SupportedRSTTypes = map[string]func() (any, any){
"s3": func() (any, any) { t := new(flex.RemoteStorageTarget_S3_); return t, &t.S3 },
// XtreemStore is S3-compatible and uses the existing S3 implementation.
"xtreemstore": func() (any, any) {
t := &flex.RemoteStorageTarget_Xtreemstore{Xtreemstore: &flex.RemoteStorageTarget_XtreemStore{}}
return t, &t.Xtreemstore.S3
},
Comment thread
iamjoemccormick marked this conversation as resolved.
// Azure is not currently supported, but this is how an Azure type could be added:
// "azure": func() (any, any) { t := new(flex.RemoteStorageTarget_Azure_); return t, &t.Azure },
// Mock could be included here if it ever made sense to allow configuration using a file.
}

type SubmitBulkRequestFn func(ctx context.Context)
type EmitBulkRequestFn func(ctx context.Context, request *beeremote.JobRequest)
type AppendBulkRequestFn func(ctx context.Context, request *beeremote.JobRequest)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(non-blocking): we should mention AppendBulkRequestsFn is called under a lock so it should be fast. For example of a provider did I/O in append it could really slow everything down.

Assisted-by: Claude:claude-opus-4-6

type Provider interface {
// GetJobRequest builds a provider-specific job request.
GetJobRequest(cfg *flex.JobRequestCfg) *beeremote.JobRequest
Expand All @@ -78,7 +87,7 @@ type Provider interface {
// any new requests into jobSubmissionChan. If building jobs is long running, return
// rescheduled==true to reschedule the remaining work for later which allows other work time to
// complete.
ExecuteJobBuilderRequest(ctx context.Context, workRequest *flex.WorkRequest, jobSubmissionChan chan<- *beeremote.JobRequest) (reschedule bool, err error)
ExecuteJobBuilderRequest(ctx context.Context, workRequest *flex.WorkRequest, jobSubmissionChan chan<- *beeremote.JobRequest) (reschedule bool, delay time.Duration, err error)
// ExecuteWorkRequestPart accepts a request and which part of the request it should carry out.
// It blocks until the request is complete, but the caller can cancel the provided context to
// return early. It determines and executes the requested operation (if supported) then directly
Expand Down Expand Up @@ -121,6 +130,40 @@ type Provider interface {
// start work requests that have been placed into a wait queue. This is useful for providers
// that need the ability to wait for resources to be made available before continuing.
IsWorkRequestReady(ctx context.Context, request *flex.WorkRequest) (ready bool, delay time.Duration, err error)

// IncludeInBulkRequest indicates whether the request should be included in a provider-defined
// bulk operation. operation is an arbitrary provider-defined identifier that groups compatible
// requests within provider bulk request.
IncludeInBulkRequest(ctx context.Context, request *beeremote.JobRequest) (include bool, operation string)
// ExecuteBulkRequest manages the lifecycle of a provider-defined bulk operation for the specified
// stateMountPath and operation.
//
// requests contains the requests selected for this bulk operation in the current builder pass.
// These requests will be submitted immediately after ExecuteBulkRequest returns. On later builder
// passes after a reschedule, requests may be empty if the walk has already completed or there
// are no newly discovered requests to add to the bulk operation.
//
// Providers are responsible for persisting and recovering any provider-side state needed to
// resume ExecuteBulkRequest or later complete/cancel the bulk operation after the builder job is
// rescheduled. stateMountPath, operation, and the provider itself identify the bulk operation.
//
// stateMountPath is builder-maintained workflow metadata. It is a provider-specific in-mount
// path that identifies where provider-side bulk state should be stored and later recovered from
// during rescheduled builder passes.
//
// Use reschedule and delay to reschedule the builder job while bulk work continues. If there are
// multiple bulk operations associated with a single builder job then the shortest delay will be
// chosen.
//
// Errors should only be returned to abort the builder job. Report all non-fatal errors through
// the included job requests.
ExecuteBulkRequest(ctx context.Context, stateMountPath string, operation string, requests []*beeremote.JobRequest) (reschedule bool, delay time.Duration, err error)
// CompleteBulkRequest completes any final provider-side actions needed to finish the bulk
// operation identified by stateMountPath and operation.
CompleteBulkRequest(ctx context.Context, stateMountPath string, operation string) error
// CancelBulkRequest cancels the remaining provider-side work for the bulk operation identified by
// stateMountPath and operation and signals the included job requests to abort if needed.
CancelBulkRequest(ctx context.Context, stateMountPath string, operation string, reason error) error
}

// New initializes a provider client based on the provided config. It accepts a context that can be
Expand All @@ -135,6 +178,8 @@ func New(ctx context.Context, config *flex.RemoteStorageTarget, mountPoint files
switch config.Type.(type) {
case *flex.RemoteStorageTarget_S3_:
return newS3(ctx, config, mountPoint)
case *flex.RemoteStorageTarget_Xtreemstore:
return newXtreemstore(ctx, config, mountPoint)
case *flex.RemoteStorageTarget_Mock:
// This handles setting up a Mock RST for testing from external packages like WorkerMgr. See
// the documentation ion `MockClient` in mock.go for how to setup expectations.
Expand Down Expand Up @@ -167,6 +212,7 @@ func New(ctx context.Context, config *flex.RemoteStorageTarget, mountPoint files
// request IDs.
func RecreateWorkRequests(job *beeremote.Job, segments []*flex.WorkRequest_Segment) (requests []*flex.WorkRequest) {
request := job.GetRequest()
delayExecution := job.Request.GetDelayExecution()

// Ensure when adding new fields that all reference types are cloned to ensure WRs are
// initialized properly and don't share references with anything else. Otherwise this can lead
Expand All @@ -188,6 +234,10 @@ func RecreateWorkRequests(job *beeremote.Job, segments []*flex.WorkRequest_Segme
Type: &flex.WorkRequest_Builder{Builder: proto.Clone(request.GetBuilder()).(*flex.BuilderJob)},
Priority: new(request.GetPriority()),
}
if delayExecution != nil {
jobBuilderWorkRequest.SetDelayExecution(proto.Clone(delayExecution).(*durationpb.Duration))
}

return []*flex.WorkRequest{jobBuilderWorkRequest}
}

Expand All @@ -206,6 +256,9 @@ func RecreateWorkRequests(job *beeremote.Job, segments []*flex.WorkRequest_Segme
StubLocal: request.GetStubLocal(),
Priority: new(request.GetPriority()),
}
if delayExecution != nil {
wr.SetDelayExecution(proto.Clone(delayExecution).(*durationpb.Duration))
}

switch request.WhichType() {
case beeremote.JobRequest_Sync_case:
Expand Down Expand Up @@ -258,7 +311,14 @@ func generateSegments(fileSize int64, segCount int64, partsPerSegment int32) []*
//
// A returned error indicates that one or more job request were not able to be built. However, if
// a request was able to be built, the error will be specified in the request's GenerationStatus.
func BuildJobRequests(ctx context.Context, rstMap map[uint32]Provider, mountPoint filesystem.Provider, inMountPath string, remotePath string, cfg *flex.JobRequestCfg) ([]*beeremote.JobRequest, error) {
func BuildJobRequests(
ctx context.Context,
rstMap map[uint32]Provider,
mountPoint filesystem.Provider,
inMountPath string,
remotePath string,
cfg *flex.JobRequestCfg,
) ([]*beeremote.JobRequest, error) {
keepLock := false
lockedInfo, writeLockSet, rstIds, entryInfoMsg, ownerNode, err := GetLockedInfo(ctx, mountPoint, cfg, inMountPath, false)

Expand Down Expand Up @@ -339,7 +399,6 @@ func BuildJobRequests(ctx context.Context, rstMap map[uint32]Provider, mountPoin
keepLock = true
}
} // If we couldn't build a runnable job request, there would be no active job to drive the normal unlock path so don't keep the lock.

requests = append(requests, request)
}

Expand Down
Loading
Loading