Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/blobstore/configuration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
deps = [
"//pkg/blobstore",
"//pkg/blobstore/completenesschecking",
"//pkg/blobstore/fallback",
"//pkg/blobstore/grpcclients",
"//pkg/blobstore/local",
"//pkg/blobstore/mirrored",
Expand Down
14 changes: 14 additions & 0 deletions pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/fallback"
"github.com/buildbarn/bb-storage/pkg/blobstore/local"
"github.com/buildbarn/bb-storage/pkg/blobstore/mirrored"
"github.com/buildbarn/bb-storage/pkg/blobstore/readcaching"
Expand Down Expand Up @@ -379,6 +380,19 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *
BlobAccess: readfallback.NewReadFallbackBlobAccess(primary.BlobAccess, secondary.BlobAccess, replicator),
DigestKeyFormat: primary.DigestKeyFormat.Combine(secondary.DigestKeyFormat),
}, "read_fallback", nil
case *pb.BlobAccessConfiguration_Fallback:
primary, err := nc.NewNestedBlobAccess(backend.Fallback.Primary, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
secondary, err := nc.NewNestedBlobAccess(backend.Fallback.Secondary, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
return BlobAccessInfo{
BlobAccess: fallback.NewFallbackBlobAccess(primary.BlobAccess, secondary.BlobAccess),
DigestKeyFormat: primary.DigestKeyFormat.Combine(secondary.DigestKeyFormat),
}, "fallback", nil
case *pb.BlobAccessConfiguration_Demultiplexing:
// Construct a trie for each of the backends specified
// in the configuration indexed by instance name prefix.
Expand Down
34 changes: 34 additions & 0 deletions pkg/blobstore/fallback/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "fallback",
srcs = ["fallback_blob_access.go"],
importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/fallback",
visibility = ["//visibility:public"],
deps = [
"//pkg/blobstore",
"//pkg/blobstore/buffer",
"//pkg/blobstore/slicing",
"//pkg/digest",
"//pkg/util",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
)

go_test(
name = "fallback_test",
srcs = ["fallback_blob_access_test.go"],
deps = [
":fallback",
"//pkg/blobstore/buffer",
"//pkg/blobstore/slicing",
"//pkg/digest",
"//pkg/testutil",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
)
184 changes: 184 additions & 0 deletions pkg/blobstore/fallback/fallback_blob_access.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package fallback

import (
"context"
"time"

remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/blobstore/slicing"
"github.com/buildbarn/bb-storage/pkg/digest"
"github.com/buildbarn/bb-storage/pkg/util"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func isPrimaryUnavailable(err error) bool {
return status.Code(err) == codes.Unavailable
}

type fallbackBlobAccess struct {
primary blobstore.BlobAccess
secondary blobstore.BlobAccess
}

// NewFallbackBlobAccess creates a BlobAccess that tries to use a primary
// backend by default and falls back to a secondary backend if the primary is
// not available. Successful writes to the primary are async, best-effort
// replicated to the secondary. Writes to the secondary during fallback are not
// replicated back to the primary.
func NewFallbackBlobAccess(primary, secondary blobstore.BlobAccess) blobstore.BlobAccess {
return &fallbackBlobAccess{
primary: primary,
secondary: secondary,
}
}

func (ba *fallbackBlobAccess) Get(ctx context.Context, d digest.Digest) buffer.Buffer {
return buffer.WithErrorHandler(
ba.primary.Get(ctx, d),
&getErrorHandler{
context: ctx,
digest: d,
secondary: ba.secondary,
})
}

type getErrorHandler struct {
context context.Context
digest digest.Digest
secondary blobstore.BlobAccess
}

func (eh *getErrorHandler) OnError(err error) (buffer.Buffer, error) {
// If secondary is nil this is an error from a call to the secondary
// backend and there's nothing more to fallback on, return the error.
if eh.secondary == nil {
return nil, util.StatusWrap(err, "Secondary")
}
if !isPrimaryUnavailable(err) {
return nil, util.StatusWrap(err, "Primary")
}

// Primary is unvailable, try the secondary backend. Set secondary to nil to
// mark that we are on the fallback path, since WithErrorHandler will invoke
// this OnError() again if there is an error on the secondary backend.
secondary := eh.secondary
eh.secondary = nil
return secondary.Get(eh.context, eh.digest), nil
}

func (getErrorHandler) Done() {}

func (ba *fallbackBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer {
return buffer.WithErrorHandler(
ba.primary.GetFromComposite(ctx, parentDigest, childDigest, slicer),
&getFromCompositeErrorHandler{
context: ctx,
parentDigest: parentDigest,
childDigest: childDigest,
slicer: slicer,
secondary: ba.secondary,
})
}

type getFromCompositeErrorHandler struct {
context context.Context
parentDigest digest.Digest
childDigest digest.Digest
slicer slicing.BlobSlicer
secondary blobstore.BlobAccess
}

func (eh *getFromCompositeErrorHandler) OnError(err error) (buffer.Buffer, error) {
// If secondary is nil this is an error from a call to the secondary
// backend and there's nothing more to fallback on, return the error.
if eh.secondary == nil {
return nil, util.StatusWrap(err, "Secondary")
}
if !isPrimaryUnavailable(err) {
return nil, util.StatusWrap(err, "Primary")
}

// Primary is unvailable, try the secondary backend. Set secondary to nil to
// mark that we are on the fallback path, since WithErrorHandler will invoke
// this OnError() again if there is an error on the secondary backend.
secondary := eh.secondary
eh.secondary = nil
return secondary.GetFromComposite(eh.context, eh.parentDigest, eh.childDigest, eh.slicer), nil
}

func (getFromCompositeErrorHandler) Done() {}

func (ba *fallbackBlobAccess) Put(ctx context.Context, digest digest.Digest, buf buffer.Buffer) error {
sizeBytes, err := buf.GetSizeBytes()
if err != nil {
buf.Discard()
return err
}
primaryBuf, secondaryBuf := buf.CloneCopy(int(sizeBytes))

err = ba.primary.Put(ctx, digest, primaryBuf)

// Write to primary succeeded, so async, best-effort replicate to secondary.
if err == nil {
go func() {
replicateCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 10*time.Second)
defer cancel()
_ = ba.secondary.Put(replicateCtx, digest, secondaryBuf)
}()
return nil
}

// Primary is unavailable, try the secondary backend.
if isPrimaryUnavailable(err) {
primaryBuf.Discard()
if err := ba.secondary.Put(ctx, digest, secondaryBuf); err != nil {
return util.StatusWrap(err, "Secondary")
}
return nil
}

secondaryBuf.Discard()
return util.StatusWrap(err, "Primary")
}

func (ba *fallbackBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) {
missing, err := ba.primary.FindMissing(ctx, digests)

// Primary succeeded, return its results.
if err == nil {
return missing, nil
}

// Primary is unavailable, try the secondary backend.
if isPrimaryUnavailable(err) {
if missing, err = ba.secondary.FindMissing(ctx, digests); err != nil {
return digest.EmptySet, util.StatusWrap(err, "Secondary")
}
return missing, nil
}

return digest.EmptySet, util.StatusWrap(err, "Primary")
}

func (ba *fallbackBlobAccess) GetCapabilities(ctx context.Context, instanceName digest.InstanceName) (*remoteexecution.ServerCapabilities, error) {
capabilities, err := ba.primary.GetCapabilities(ctx, instanceName)

// Primary succeeded, return its results.
if err == nil {
return capabilities, nil
}

// Primary is unavailable, try the secondary backend.
if isPrimaryUnavailable(err) {
if capabilities, err = ba.secondary.GetCapabilities(ctx, instanceName); err != nil {
return nil, util.StatusWrap(err, "Secondary")
}
return capabilities, nil
}

return nil, util.StatusWrap(err, "Primary")
}
Loading