Skip to content

Swartzn/feat/add xtreemstore rst#312

Open
swartzn wants to merge 14 commits into
mainfrom
swartzn/feat/add-xtreem-store-rst
Open

Swartzn/feat/add xtreemstore rst#312
swartzn wants to merge 14 commits into
mainfrom
swartzn/feat/add-xtreem-store-rst

Conversation

@swartzn
Copy link
Copy Markdown
Contributor

@swartzn swartzn commented Mar 31, 2026

ThinkParQ/protobuf#71 is required.

Here's a few flowcharts to show the bulk request workflow BULK_XTREEMSTORE_FLOW.md

What does this PR do / why do we need it?

Required for all PRs.

  • Implements s3Provider so the S3Client functionality can be extended in other providers.
  • Adds XtreemStore provider.
  • Providers gain the ability to handle bulk requests.

Related Issue(s)

Required when applicable.

Where should the reviewer(s) start reviewing this?

Only required for larger PRs when this may not be immediately obvious.

Are there any specific topics we should discuss before merging?

Not required.

What are the next steps after this PR?

Not required.

Checklist before merging:

Required for all PRs.

When creating a PR these are items to keep in mind that cannot be checked by GitHub actions:

  • Documentation:
    • Does developer documentation (code comments, readme, etc.) need to be added or updated?
    • Does the user documentation need to be expanded or updated for this change?
  • Testing:
    • Does this functionality require changing or adding new unit tests?
    • Does this functionality require changing or adding new integration tests?
  • Git Hygiene:

For more details refer to the Go coding standards and the pull request process.

@swartzn swartzn requested a review from iamjoemccormick March 31, 2026 23:48
@swartzn swartzn self-assigned this Mar 31, 2026
@swartzn swartzn requested a review from a team as a code owner March 31, 2026 23:48
@swartzn swartzn force-pushed the swartzn/feat/add-xtreem-store-rst branch from 407202d to fa556f3 Compare April 1, 2026 10:05
@swartzn swartzn changed the title Swartzn/feat/add xtreem store rst Swartzn/feat/add xtreemstore rst Apr 1, 2026
Copy link
Copy Markdown
Member

@iamjoemccormick iamjoemccormick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping a note I finished with my initial pass through this.

Comment thread ctl/internal/cmd/rst/list.go
Comment thread common/rst/s3.go
Comment thread common/rst/builder.go Outdated
Comment thread common/rst/builder.go Outdated
Comment thread common/rst/s3.go Outdated
Comment thread common/rst/builder.go Outdated
Comment thread rst/remote/internal/job/manager.go Outdated
Comment thread common/rst/rst.go Outdated
Comment thread common/rst/rst.go Outdated
Comment thread common/rst/rst.go Outdated
@swartzn swartzn force-pushed the swartzn/feat/add-xtreem-store-rst branch 3 times, most recently from a2a3f5e to 4e1d63a Compare April 8, 2026 15:06
@swartzn swartzn requested a review from iamjoemccormick April 8, 2026 15:36
@swartzn swartzn force-pushed the swartzn/feat/add-xtreem-store-rst branch from 4e1d63a to 634f0aa Compare April 9, 2026 15:46
Introduce s3ApiClient and s3Provider abstractions along with newS3WithOptions for extending
s3Client for new s3-compatible providers without changing the base S3 flow.
@swartzn swartzn force-pushed the swartzn/feat/add-xtreem-store-rst branch from 634f0aa to cce8e6e Compare April 13, 2026 16:20
Copy link
Copy Markdown
Member

@iamjoemccormick iamjoemccormick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submitting review feedback on the first two commits (up to "add xtreemstore provider").

Comment thread common/rst/s3.go
Comment thread common/rst/s3.go Outdated
Comment thread common/rst/s3.go Outdated
Comment thread common/rst/rst.go
Comment thread rst/remote/internal/job/job.go Outdated
Comment on lines +108 to +113
executeAfter := rst.GetJobExecuteAfter(j.Get())
j.Segments = make([]*Segment, 0, len(workRequests))
for _, wr := range workRequests {
if executeAfter != nil {
wr.SetExecuteAfter(executeAfter)
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: unless I'm missing something doesn't this still do what we were trying to avoid? Generating an timestamp on one node (Remote) that is then passed to the Sync nodes which who's times may not be in sync?

I would prefer instead we make this a DelayExecution field on the job request that uses the protobuf duration type that is propagated to a DelayExecution field on the work requests.

I would also propose this is set by the GenerateWorkRequests method, if appropriate for that RST client type. It doesn't feel right to set this one field in GenerateSubmission().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this but was distracted by the other comments and forgot to come back to it. Anyway, this required adding a DelayExecution field to both beeremote.JobRequest and flex.WorkRequest which allows the sync node to convert the delay to ExecuteAfter in SubmitWorkRequest. GenerateWorkRequests is still called by remote.

See e85cf75

Comment thread rst/remote/internal/job/manager.go
Comment thread rst/sync/internal/workmgr/manager.go Outdated
Comment thread rst/sync/internal/workmgr/manager.go
zap.Bool("hasWorkResult", entry.WorkResult != nil),
zap.Bool("hasStatus", status != nil),
)
m.scheduler.AddRescheduleWorkToken(submissionId, time.Time{})
Copy link
Copy Markdown
Member

@iamjoemccormick iamjoemccormick Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: so when this happens we still add a token because when the journal is replayed later this WR will still get picked up and presumably the invalid result sent back to Remote?

Edit: I see this was added with the "update job state to running..." commit. Was this a bug? If so it'd be worth a mention in the commit message.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the work needs to be processed in order to complete/update remote.

It was a bug. I missed adding m.scheduler.AddRescheduleWorkToken(submissionId, time.Time{}) the first time. Showed up in my testing.

Copy link
Copy Markdown
Member

@iamjoemccormick iamjoemccormick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posting review comments for the remaining commits.

Comment thread common/rst/s3.go Outdated
Comment thread common/rst/s3.go Outdated
Comment thread common/rst/rst.go Outdated
Comment thread common/rst/s3.go Outdated
Comment thread common/rst/builder.go Outdated
Comment thread common/rst/builder.go Outdated
Comment on lines +311 to +313
if _, err := w.beeRemoteClient.UpdateWorkRequest(work.ctx, result.Work); err != nil {
log.Warn("unable to update remote job status to running; continuing work request without retrying", zap.Error(err))
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(blocking): we discussed this on Slack, but adding a note here so we don't loose track.

Sync intentionally never informed Remote when a work request shifted to "running" to reduce load on Remote and the jobs DB. I would prefer we keep it that way, as for the most part Remote never needs to know about this unless a user checks the job status.

What we could do is implement this issue https://github.com/ThinkParQ/bee-remote/issues/14 so if the user runs remote status or remote job list Remote will reach out and refresh the job and work request statuses. The GetJobsRequest message already has a UpdateWorkResults field that was intended to control if only the latest results known to Remote are returned, or if it also probes the Sync nodes.

We could either set UpdateWorkResults by default for CTL commands, or add a new --refresh-results flag for this. My thinking is for the most part users only care to know once jobs reach a terminal state, the intermediate states aren't that interesting unless you're trying to debug specific job issues.

@swartzn swartzn force-pushed the swartzn/feat/add-xtreem-store-rst branch from cce8e6e to b01a2c0 Compare April 15, 2026 18:37
@swartzn swartzn requested a review from iamjoemccormick April 16, 2026 14:16
swartzn added 2 commits April 17, 2026 07:43
…After time on executing sync node to avoid time synchronization issues
…d emit flow

Replace the bulk submit API with an emit callback passed to BuildBulkRequest instead of returning
job requests from submit. This lets BuildBulkRequests, append and submit run under different
contexts while providers continue emitting through the builder.

Also fixes submission errors from being reported when jobs were not actually submitted because the
context was canceled.
@swartzn swartzn force-pushed the swartzn/feat/add-xtreem-store-rst branch from 2ba6ab2 to 9d97563 Compare April 17, 2026 12:57
Copy link
Copy Markdown
Member

@iamjoemccormick iamjoemccormick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a handful of remaining observations, my comment on GenerateSubmission() is the main blocker.

}

} else {
workRequests = rst.RecreateWorkRequests(j.Get(), j.GetSegments())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: the RecreateWorkRequests() path was updated to handle SetDelayExecution(), but I think we also need to set this in the GenerateWorkRequests() path?

I'm actually not sure if the RecreateWorkRequests() path in GenerateSubmission() is ever executed anymore, elsewhere just calls RecreateWorkRequests() directly.

Comment thread common/rst/builder.go Outdated
if client.IncludeInBulkRequest(walkCtx, jobRequest) {
rstId := jobRequest.GetRemoteStorageTarget()

bulkRequestStatesMu.Lock()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(non-blocking): to avoid lock contention in a hot path consider making this a RWMutex as after the initial population of bulkRequestStates the map is only read.

Assisted-by: Claude:claude-opus-4-6

Comment thread common/rst/rst.go

type SubmitBulkRequestFn func(ctx context.Context)
type EmitBulkRequestFn func(ctx context.Context, request *beeremote.JobRequest)
type AppendBulkRequestFn func(ctx context.Context, request *beeremote.JobRequest)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(non-blocking): we should mention AppendBulkRequestsFn is called under a lock so it should be fast. For example of a provider did I/O in append it could really slow everything down.

Assisted-by: Claude:claude-opus-4-6

@iamjoemccormick iamjoemccormick mentioned this pull request Apr 28, 2026
9 tasks
Comment thread common/rst/xtreemstore.go Fixed
Comment thread common/rst/xtreemstore.go Fixed
Comment thread common/rst/xtreemstore.go Fixed
Comment thread common/rst/xtreemstore.go Fixed
Comment thread common/rst/xtreemstore.go Fixed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants