diff --git a/MODULE.bazel b/MODULE.bazel index 2e4df22af..3f50fd5d2 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -49,6 +49,7 @@ use_repo( "com_github_buildbarn_go_sha256tree", "com_github_fxtlabs_primes", "com_github_go_jose_go_jose_v3", + "com_github_gocql_gocql", "com_github_google_uuid", "com_github_gorilla_mux", "com_github_grpc_ecosystem_go_grpc_middleware", diff --git a/go.mod b/go.mod index 75e5e6876..5df4f8bd4 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.33.14 github.com/bazelbuild/buildtools v0.0.0-20250204160707-ad48c76ab9b5 github.com/bazelbuild/remote-apis v0.0.0-20250211041012-7f922028fcfa + github.com/bazelbuild/rules_go v0.53.0 github.com/buildbarn/go-sha256tree v0.0.0-20250310211320-0f70f20e855b github.com/fxtlabs/primes v0.0.0-20150821004651-dad82d10a449 github.com/go-jose/go-jose/v3 v3.0.3 @@ -54,14 +55,20 @@ require ( mvdan.cc/gofumpt v0.7.0 ) +require ( + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/mock v1.7.0-rc.1 // indirect + google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 // indirect +) + require ( cel.dev/expr v0.19.0 // indirect - cloud.google.com/go v0.116.0 // indirect + cloud.google.com/go v0.118.0 // indirect cloud.google.com/go/auth v0.14.1 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.7 // indirect cloud.google.com/go/compute/metadata v0.6.0 // indirect - cloud.google.com/go/iam v1.2.2 // indirect - cloud.google.com/go/monitoring v1.21.2 // indirect + cloud.google.com/go/iam v1.3.1 // indirect + cloud.google.com/go/monitoring v1.22.1 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.48.1 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.48.1 // indirect @@ -88,31 +95,33 @@ require ( github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/gocql/gocql v1.7.0 github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/s2a-go v0.1.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go/v2 v2.14.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect + github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect - go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.32.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect golang.org/x/crypto v0.33.0 // indirect - golang.org/x/mod v0.17.0 // indirect + golang.org/x/mod v0.23.0 // indirect golang.org/x/net v0.35.0 // indirect golang.org/x/text v0.22.0 // indirect golang.org/x/time v0.10.0 // indirect - golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect - google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250102185135-69823020774d // indirect + golang.org/x/tools v0.30.0 // indirect + google.golang.org/genproto v0.0.0-20250115164207-1a7da9e5054f // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect sigs.k8s.io/yaml v1.1.0 // indirect diff --git a/go.sum b/go.sum index 53e6ba1b7..80cb44bce 100644 --- a/go.sum +++ b/go.sum @@ -1,26 +1,26 @@ cel.dev/expr v0.19.0 h1:lXuo+nDhpyJSpWxpPVi5cPUwzKb+dsdOiw6IreM5yt0= cel.dev/expr v0.19.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.116.0 h1:B3fRrSDkLRt5qSHWe40ERJvhvnQwdZiHu0bJOpldweE= -cloud.google.com/go v0.116.0/go.mod h1:cEPSRWPzZEswwdr9BxE6ChEn01dWlTaF05LiC2Xs70U= +cloud.google.com/go v0.118.0 h1:tvZe1mgqRxpiVa3XlIGMiPcEUbP1gNXELgD4y/IXmeQ= +cloud.google.com/go v0.118.0/go.mod h1:zIt2pkedt/mo+DQjcT4/L3NDxzHPR29j5HcclNH+9PM= cloud.google.com/go/auth v0.14.1 h1:AwoJbzUdxA/whv1qj3TLKwh3XX5sikny2fc40wUl+h0= cloud.google.com/go/auth v0.14.1/go.mod h1:4JHUxlGXisL0AW8kXPtUF6ztuOksyfUQNFjfsOCXkPM= cloud.google.com/go/auth/oauth2adapt v0.2.7 h1:/Lc7xODdqcEw8IrZ9SvwnlLX6j9FHQM74z6cBk9Rw6M= cloud.google.com/go/auth/oauth2adapt v0.2.7/go.mod h1:NTbTTzfvPl1Y3V1nPpOgl2w6d/FjO7NNUQaWSox6ZMc= cloud.google.com/go/compute/metadata v0.6.0 h1:A6hENjEsCDtC1k8byVsgwvVcioamEHvZ4j01OwKxG9I= cloud.google.com/go/compute/metadata v0.6.0/go.mod h1:FjyFAW1MW0C203CEOMDTu3Dk1FlqW3Rga40jzHL4hfg= -cloud.google.com/go/iam v1.2.2 h1:ozUSofHUGf/F4tCNy/mu9tHLTaxZFLOUiKzjcgWHGIA= -cloud.google.com/go/iam v1.2.2/go.mod h1:0Ys8ccaZHdI1dEUilwzqng/6ps2YB6vRsjIe00/+6JY= -cloud.google.com/go/logging v1.12.0 h1:ex1igYcGFd4S/RZWOCU51StlIEuey5bjqwH9ZYjHibk= -cloud.google.com/go/logging v1.12.0/go.mod h1:wwYBt5HlYP1InnrtYI0wtwttpVU1rifnMT7RejksUAM= +cloud.google.com/go/iam v1.3.1 h1:KFf8SaT71yYq+sQtRISn90Gyhyf4X8RGgeAVC8XGf3E= +cloud.google.com/go/iam v1.3.1/go.mod h1:3wMtuyT4NcbnYNPLMBzYRFiEfjKfJlLVLrisE7bwm34= +cloud.google.com/go/logging v1.13.0 h1:7j0HgAp0B94o1YRDqiqm26w4q1rDMH7XNRU34lJXHYc= +cloud.google.com/go/logging v1.13.0/go.mod h1:36CoKh6KA/M0PbhPKMq6/qety2DCAErbhXT62TuXALA= cloud.google.com/go/longrunning v0.6.4 h1:3tyw9rO3E2XVXzSApn1gyEEnH2K9SynNQjMlBi3uHLg= cloud.google.com/go/longrunning v0.6.4/go.mod h1:ttZpLCe6e7EXvn9OxpBRx7kZEB0efv8yBO6YnVMfhJs= -cloud.google.com/go/monitoring v1.21.2 h1:FChwVtClH19E7pJ+e0xUhJPGksctZNVOk2UhMmblmdU= -cloud.google.com/go/monitoring v1.21.2/go.mod h1:hS3pXvaG8KgWTSz+dAdyzPrGUYmi2Q+WFX8g2hqVEZU= +cloud.google.com/go/monitoring v1.22.1 h1:KQbnAC4IAH+5x3iWuPZT5iN9VXqKMzzOgqcYB6fqPDE= +cloud.google.com/go/monitoring v1.22.1/go.mod h1:AuZZXAoN0WWWfsSvET1Cpc4/1D8LXq8KRDU87fMS6XY= cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6QJs= cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY= -cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI= -cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io= +cloud.google.com/go/trace v1.11.3 h1:c+I4YFjxRQjvAhRmSsmjpASUKq88chOX854ied0K/pE= +cloud.google.com/go/trace v1.11.3/go.mod h1:pt7zCYiDSQjC9Y2oqCsh9jF4GStB/hmjrYLsxRR27q8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0 h1:3c8yed4lgqTt+oTQ+JNMDo+F4xprBf+O/il4ZC0nRLw= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0/go.mod h1:obipzmGjfSjam60XLwGfqUkJsfiheAl+TUjG+4yzyPM= @@ -72,9 +72,17 @@ github.com/bazelbuild/buildtools v0.0.0-20250204160707-ad48c76ab9b5 h1:ls2GvHSkr github.com/bazelbuild/buildtools v0.0.0-20250204160707-ad48c76ab9b5/go.mod h1:PLNUetjLa77TCCziPsz0EI8a6CUxgC+1jgmWv0H25tg= github.com/bazelbuild/remote-apis v0.0.0-20250211041012-7f922028fcfa h1:lM/fuBngye4r1brpzne7xNrGRxRtX3wuJpvGpeQ5Gts= github.com/bazelbuild/remote-apis v0.0.0-20250211041012-7f922028fcfa/go.mod h1:/xo1pn3QkEL2JXrLeK30jvjVR/zXM9H8EqcWb/l5/A0= +github.com/bazelbuild/rules_go v0.53.0 h1:u160DT+RRb+Xb2aSO4piN8xhs4aZvWz2UDXCq48F4ao= +github.com/bazelbuild/rules_go v0.53.0/go.mod h1:xB1jfsYHWlnZyPPxzlOSst4q2ZAwS251Mp9Iw6TPuBc= +github.com/bazelbuild/rules_go v0.54.0 h1:D9aCU7j5rdRxg2rXOZX5zHZ395XC0KbgC4rnyaQ3ofM= +github.com/bazelbuild/rules_go v0.54.0/go.mod h1:T90Gpyq4HDFlsrvtQa2CBdHNJ2P4rAu/uUTmQbanzf0= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/buildbarn/go-sha256tree v0.0.0-20250310211320-0f70f20e855b h1:IKUxixGBm9UxobU7c248z0BF0ojG19uoSLz8MFZM/KA= github.com/buildbarn/go-sha256tree v0.0.0-20250310211320-0f70f20e855b/go.mod h1:e7g3/yWApcg+PpDqd4eQEEV8pexQmfCgK3frP+1Wuvk= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -117,30 +125,25 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus= +github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.7.0-rc.1 h1:YojYx61/OLFsiv6Rw1Z96LpldJIy31o+UHmwAUMJ6/U= +github.com/golang/mock v1.7.0-rc.1/go.mod h1:s42URUywIqd+OcERslBJvOjepvNymP31m3q8d/GkuRs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= -github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -150,7 +153,6 @@ github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9 github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0= github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= -github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gTgghdIA6Stxb52D5RnLI1SLyw= @@ -165,6 +167,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92Bcuy github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= @@ -211,21 +215,17 @@ github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= @@ -280,10 +280,11 @@ golang.org/x/lint v0.0.0-20241112194109-818c5a804067/go.mod h1:3xt1FjdF8hUf6vQPI golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= -golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.23.0 h1:Zb7khfcRGKk+kqfxFaP5tZqCnDZMjC5VtUBs87Hr6QM= +golang.org/x/mod v0.23.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -292,8 +293,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= @@ -307,6 +308,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= @@ -317,7 +319,9 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -334,6 +338,7 @@ golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= @@ -352,10 +357,11 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY= +golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -367,11 +373,10 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 h1:ToEetK57OidYuqD4Q5w+vfEnPvPpuTwedCNVohYJfNk= -google.golang.org/genproto v0.0.0-20241118233622-e639e219e697/go.mod h1:JJrvXBWRZaFMxBufik1a4RpFw4HhgVtBBWQeQgUj2cc= -google.golang.org/genproto/googleapis/api v0.0.0-20250102185135-69823020774d h1:H8tOf8XM88HvKqLTxe755haY6r1fqqzLbEnfrmLXlSA= -google.golang.org/genproto/googleapis/api v0.0.0-20250102185135-69823020774d/go.mod h1:2v7Z7gP2ZUOGsaFyxATQSRoBnKygqVq2Cwnvom7QiqY= +google.golang.org/genproto v0.0.0-20250115164207-1a7da9e5054f h1:387Y+JbxF52bmesc8kq1NyYIp33dnxCw6eiA7JMsTmw= +google.golang.org/genproto v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:0joYwWwLQh18AOj8zMYeZLjzuqcYTU3/nC5JdCvC3JI= +google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 h1:GVIKPyP/kLIyVOgOnTwFOrvQaQUzOzGMCxgFUOEmm24= +google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422/go.mod h1:b6h1vNKhxaSoEI+5jc3PJUCustfli/mRab7295pY7rw= google.golang.org/genproto/googleapis/bytestream v0.0.0-20250212204824-5a70512c5d8b h1:nX2MXx+i/N0wYBX4vfm56Qf5xTrFMeTZYnRbSJaXdGg= google.golang.org/genproto/googleapis/bytestream v0.0.0-20250212204824-5a70512c5d8b/go.mod h1:7VGktjvijnuhf2AobFqsoaBGnG8rImcxqoL+QPBPRq4= google.golang.org/genproto/googleapis/rpc v0.0.0-20250212204824-5a70512c5d8b h1:FQtJ1MxbXoIIrZHZ33M+w5+dAP9o86rgpjoKr/ZmT7k= @@ -381,28 +386,22 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= -google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 h1:F29+wU6Ee6qgu9TddPgooOdaqsxTMunOoj8KA5yuS5A= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1/go.mod h1:5KF+wpkbTSbGcR9zteSqZV6fqFOWBl4Yde8En8MryZA= google.golang.org/grpc/examples v0.0.0-20201112215255-90f1b3ee835b h1:NuxyvVZoDfHZwYW9LD4GJiF5/nhiSyP4/InTrvw9Ibk= google.golang.org/grpc/examples v0.0.0-20201112215255-90f1b3ee835b/go.mod h1:IBqQ7wSUJ2Ep09a8rMWFsg4fmI2r38zwsq8a0GgxXpM= google.golang.org/grpc/security/advancedtls v1.0.0 h1:/KQ7VP/1bs53/aopk9QhuPyFAp9Dm9Ejix3lzYkCrDA= google.golang.org/grpc/security/advancedtls v1.0.0/go.mod h1:o+s4go+e1PJ2AjuQMY5hU82W7lDlefjJA6FqEHRVHWk= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/pkg/blobstore/cassandra/BUILD.bazel b/pkg/blobstore/cassandra/BUILD.bazel new file mode 100644 index 000000000..e968ab356 --- /dev/null +++ b/pkg/blobstore/cassandra/BUILD.bazel @@ -0,0 +1,38 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "cassandra", + srcs = [ + "cassandra_blob_access.go", + "format.go", + ], + importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/cassandra", + visibility = ["//visibility:public"], + deps = [ + "//pkg/blobstore", + "//pkg/blobstore/buffer", + "//pkg/blobstore/slicing", + "//pkg/capabilities", + "//pkg/digest", + "//pkg/util", + "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", + "@com_github_gocql_gocql//:gocql", + "@com_github_prometheus_client_golang//prometheus", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_golang_x_sync//errgroup", + ], +) + +go_test( + name = "cassandra_test", + srcs = [ + "cassandra_blob_access_test.go", + "format_test.go", + ], + embed = [":cassandra"], + deps = [ + "@com_github_gocql_gocql//:gocql", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/blobstore/cassandra/README.md b/pkg/blobstore/cassandra/README.md new file mode 100644 index 000000000..28f8e66c7 --- /dev/null +++ b/pkg/blobstore/cassandra/README.md @@ -0,0 +1,104 @@ +# Cassandra + +## Required setup + +`bb-storage` should not be configured to use the admin user by default in +production. Instead, you should be using a regular user which cannot create +tables or keyspaces. + +The tables that are required can be set up using command similar to the +below in a `cqlsh` session: + +```sql +CREATE KEYSPACE IF NOT EXISTS buildbarn_storage WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor': 3 }; + +USE buildbarn_storage; + +CREATE TABLE IF NOT EXISTS prod_metadata ( + digest_function TEXT, + digest_hash BLOB, + digest_size_bytes BIGINT, + digest_instance_name TEXT, + blob_id ASCII, + last_access TIMESTAMP, + segment_count INT, + segment_size INT, + PRIMARY KEY ((digest_function, digest_hash, digest_size_bytes, digest_instance_name))) + WITH gc_grace_seconds = 86400; + +CREATE TABLE IF NOT EXISTS prod_content ( + blob_id ASCII, + segment INT, + content BLOB, + PRIMARY KEY ((blob_id, segment))) + WITH gc_grace_seconds = 86400; + +CREATE TABLE IF NOT EXISTS prod_orphaned_content ( + blob_id ASCII, + digest_instance_name TEXT, + digest_function TEXT, + digest_hash BLOB, + digest_size_bytes BIGINT, + segment_count INT, + timestamp DATE, +PRIMARY KEY (blob_id, digest_instance_name)) +WITH WITH gc_grace_seconds = 86400; +``` + +Note that you will need a table prefix (`prod` in this case), which allow +multiple different environments to run in the same keyspace. + +The `prod_orphaned_content` table is used to track items that may have been +orphaned. Essentially, as blobs begin to be streamed into cassandra, a note +is taken of the blob and the estimated number of segments it will require. +Users are expected to have their own processes in place for using this data +to "reap" orphan content from the table. + +The `prod_orphaned_content` table will contain many Cassandra tombstones +(most rows are deleted soon after they are inserted). For that reason, you +should reduce the `gc_grace_seconds` to 1 day. This reduces the minimum +amount of time tombstones will be kept around. This can cause some rows to +resurrect if a Cassandra node is down for more than a day, but it is not a +problem at the application level: they will just need to be reaped again. + +In the long term, this might still create many tombstones, causing non-critical +errors when reaping old orphan rows. Consider setting a scheduled compaction +(e.g. every 2h) to help ensure that tombstones are regularly reviewed for +garbage collection. + +The `prod_metadata` and `prod_content` tables also contain many tombstones +so the `gc_grace_seconds` attribute is also set to 1 day (86400s). + +### Configuring `bb-storage` + +An example snippet of jsonnet config for setting up Cassandra is: + +```jsonnet +{ + cassandra: { + hosts: ["cassandra.mycorp.com:1234"], + context: "example/bazel", + keyspace: "buildbarn_storage", + tablePrefix: "prod", + segmentSize: 524288, + username: "cassandra_user", + password: "hunter2", + } +} +``` + +The segment size is the size in bytes before a new segment should be created. +It is a balancing act between being able to store as many items as possible +in a single segment and the maximum row size allowed in Cassandra (1MB). In +this example, 512kb was chosen, which should allow the majority of writes +seen by Buildbarn to be written in a single row. + +### Using the nearest DC + +To minimise latency, you can set a `preferredDC` in the `cassandra` +configuration. This works on the assumption that physical proximity will +lead to lower latencies, so the preferred DC is the one that is closest to the +particular cluster the Buildbarn is installed in. To find the nearest data +centre, talk to someone familiar with your network topology, and select from +the datacenters listed in your `NetworkTopologyStrategy` (if you're using +one) when creating the Cassandra keyspace. diff --git a/pkg/blobstore/cassandra/cassandra_blob_access.go b/pkg/blobstore/cassandra/cassandra_blob_access.go new file mode 100644 index 000000000..726e43151 --- /dev/null +++ b/pkg/blobstore/cassandra/cassandra_blob_access.go @@ -0,0 +1,664 @@ +package cassandra + +import ( + "bytes" + "context" + "crypto/tls" + "errors" + "fmt" + "io" + "log" + "math" + "math/rand" + "strconv" + "strings" + "sync" + "time" + + "github.com/buildbarn/bb-storage/pkg/blobstore" + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" + "github.com/buildbarn/bb-storage/pkg/blobstore/slicing" + "github.com/buildbarn/bb-storage/pkg/capabilities" + bbdigest "github.com/buildbarn/bb-storage/pkg/digest" + "github.com/buildbarn/bb-storage/pkg/util" + "github.com/gocql/gocql" + "github.com/prometheus/client_golang/prometheus" + + "golang.org/x/sync/errgroup" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + writeConsistency = gocql.LocalQuorum + readConsistency = gocql.LocalQuorum + lastAccessUpdateConsistency = gocql.LocalOne + + simultaneousWorkers = 100 + + // The number of workers to use for updating the `last_access` field + lastAccessUpdateWorkerCount = 50 +) + +var ( + cassandraBlobStorePrometheusMetrics sync.Once + + segmentWriteHist = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "storage", + Name: "cassandra_segment_write_duration_seconds", + Help: "Amount of time in seconds it takes to write each segment to Cassandra, measured in seconds", + Buckets: util.DecimalExponentialBuckets(-3, 6, 2), + }) + + segmentReadHist = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "storage", + Name: "cassandra_segment_read_duration_seconds", + Help: "Amount of time in seconds it takes to read each segment from Cassandra", + Buckets: util.DecimalExponentialBuckets(-3, 6, 2), + }) + + lastAccessUpdatesFailures = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "buildbarn", + Subsystem: "storage", + Name: "cassandra_last_access_failure_count", + Help: "Count of the number of times the `last_access` field could not be updated", + }) +) + +type cassandraBlobAccess struct { + capabilities.Provider + readBufferFactory blobstore.ReadBufferFactory + segmentSize int32 + session *gocql.Session + lastAccessUpdateInterval time.Duration + lastAccessUpdate chan<- func() + tables *tables +} + +func NewCassandraSession(clusterHosts []string, keyspace string, preferredDc string, port, protoVersion int32, username, password string, tlsConfig *tls.Config) (*gocql.Session, error) { + cluster := gocql.NewCluster(clusterHosts...) + cluster.Keyspace = keyspace + cluster.Consistency = writeConsistency + + if preferredDc != "" { + cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy(preferredDc)) + } + + if port != 0 { + cluster.Port = int(port) + } + + if protoVersion != 0 { + cluster.ProtoVersion = int(protoVersion) + } + + if username != "" { + cluster.Authenticator = gocql.PasswordAuthenticator{ + Username: username, + Password: password, + } + } + + if tlsConfig != nil { + cluster.SslOpts = &gocql.SslOptions{ + Config: tlsConfig, + } + } + + return cluster.CreateSession() +} + +// NewCassandraBlobAccess provides an implementation of storage backed by Cassandra. +func NewCassandraBlobAccess(capabilitiesProvider capabilities.Provider, readBufferFactory blobstore.ReadBufferFactory, session *gocql.Session, segmentSizeInBytes int32, lastAccessUpdateInterval time.Duration, tablePrefix string) blobstore.BlobAccess { + cassandraBlobStorePrometheusMetrics.Do(func() { + prometheus.MustRegister(segmentWriteHist) + prometheus.MustRegister(segmentReadHist) + prometheus.MustRegister(lastAccessUpdatesFailures) + }) + + if tablePrefix == "" { + log.Printf("No table prefix set. Cowardly refusing to proceed.") + return blobstore.NewErrorBlobAccess(errors.New("table prefix must be set")) + } + log.Printf("Table prefix: %s", tablePrefix) + + // Create the worker pool for updating the last access times + jobs := make(chan func(), lastAccessUpdateWorkerCount) + for i := 0; i < lastAccessUpdateWorkerCount; i++ { + go lastUpdateWorker(jobs) + } + + log.Printf("Cassandra storage ready.") + + return &cassandraBlobAccess{ + Provider: capabilitiesProvider, + session: session, + readBufferFactory: readBufferFactory, + segmentSize: segmentSizeInBytes, + lastAccessUpdateInterval: lastAccessUpdateInterval, + lastAccessUpdate: jobs, + tables: newTables(tablePrefix, session), + } +} + +func logErrorIfNotCancelledContext(err error, msg string, args ...interface{}) { + if err != nil && !errors.Is(err, context.Canceled) { + initialMessage := fmt.Sprintf(msg, args...) + log.Printf("%s: %v", initialMessage, err) + } +} + +func (a *cassandraBlobAccess) Get(ctx context.Context, digest bbdigest.Digest) buffer.Buffer { + metadata, err := a.tables.metadata.read(ctx, digest) + if err != nil { + if errors.Is(err, gocql.ErrNotFound) { + return buffer.NewBufferFromError(status.Errorf(codes.NotFound, "No metadata found for digest %s", digest.String())) + } + return buffer.NewBufferFromError(err) + } + + a.tables.metadata.updateLastAccessTime(context.Background(), digest, metadata.instanceName, metadata.lastAccess, a.lastAccessUpdateInterval, a.lastAccessUpdate) + + reader := sequentialReader{ + ctx: ctx, + session: a.session, + tableName: a.tables.content.tableName, + blobID: metadata.blobID, + segmentCount: metadata.segmentCount, + segmentNumber: 0, + currentReader: nil, + digest: digest, + } + + return a.readBufferFactory.NewBufferFromReader(digest, io.NopCloser(&reader), buffer.Irreparable(digest)) +} + +func (a *cassandraBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest bbdigest.Digest, slicer slicing.BlobSlicer) buffer.Buffer { + b, _ := slicer.Slice(a.Get(ctx, parentDigest), childDigest) + return b +} + +func (a *cassandraBlobAccess) Put(ctx context.Context, digest bbdigest.Digest, b buffer.Buffer) error { + // We derive the blob id from the digest. The metadata table will control access to + // this. It's okay for two streams to write the same data simultaneously because of + // the way that Cassandra acts: because the primary key will be the same, the inserts + // will effectively be updates, and since the data is identical, this will be safe. + blobID := strings.Join([]string{ + digest.GetDigestFunction().GetEnumValue().String(), + digest.GetHashString(), + strconv.Itoa(int(digest.GetSizeBytes())), + digest.GetInstanceName().String(), + }, "-") + + estimatedSegmentCount := getSegmentCount(a.segmentSize, digest.GetSizeBytes()) + + now := time.Now() + var segmentCount int + + if a.tables.content.isSegmentPresent(ctx, blobID, estimatedSegmentCount) { + segmentCount = estimatedSegmentCount + } else { + instanceName := digest.GetInstanceName().String() + if err := a.tables.orphanedContent.insert(ctx, digest, blobID, instanceName, estimatedSegmentCount, now); err != nil { + b.Discard() + return err + } + defer a.tables.orphanedContent.delete(ctx, blobID, instanceName) + + actualCount, err := a.writeContentData(ctx, digest, b, blobID) + if err != nil { + return err + } + segmentCount = actualCount + } + + return a.tables.metadata.update(ctx, digest, blobID, digest.GetInstanceName().String(), segmentCount, now, a.segmentSize) +} + +func getSegmentCount(segmentSize int32, sizeInBytes int64) int { + segmentSize64 := int64(segmentSize) + + return int((sizeInBytes + segmentSize64 - 1) / segmentSize64) +} + +func (a *cassandraBlobAccess) writeContentData(ctx context.Context, digest bbdigest.Digest, b buffer.Buffer, blobID string) (int, error) { + segmentCount := 0 + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(simultaneousWorkers + 1) + + g.Go(func() error { + reader := b.ToReader() + defer reader.Close() + + for { + // Read either up to EOF or until we have `segmentSize` bytes + buf := new(bytes.Buffer) + _, err := io.CopyN(buf, reader, int64(a.segmentSize)) + isEOF := err == io.EOF + + if err != nil && err != io.EOF { + logErrorIfNotCancelledContext(err, "Put: Failed to read next segment to write from client. %d of %s (digest function: %s, size: %d)", segmentCount, digest.GetHashString(), digest.GetDigestFunction().GetEnumValue().String(), digest.GetSizeBytes()) + return err + } + + count := segmentCount + g.Go(func() error { + return a.tables.content.insertSegment(gCtx, blobID, buf.Bytes(), count, digest.GetHashString()) + }) + + segmentCount++ + + if isEOF { + break + } + } + + // TODO: Ensure hashes match at this point + // TODO: Delete from the blobs table if hashes do not match + + return nil + }) + + err := g.Wait() + if err != nil { + logErrorIfNotCancelledContext(err, "Put: Unable to store %s", digest.String()) + return 0, err + } + return segmentCount, nil +} + +func (a *cassandraBlobAccess) FindMissing(ctx context.Context, digests bbdigest.Set) (bbdigest.Set, error) { + // The obvious thing to do here is to create a `SELECT ... IN (?, ?)` for each digest, but doing so + // prevents token-aware routing of queries. Instead, we issue a single `SELECT` per digest and + // aggregate the values that come back. + + builder := bbdigest.NewSetBuilder() + var mu sync.Mutex + + cancellableCtx, cancel := context.WithCancel(ctx) + defer cancel() + + g, errCtx := errgroup.WithContext(cancellableCtx) + + sem := make(chan struct{}, simultaneousWorkers) + + // Try not to be a terrible, terrible citizen + for _, digest := range digests.Items() { + sem <- struct{}{} + + g.Go(func() error { + defer func() { + <-sem + }() + + err := a.isMissing(errCtx, digest, &builder, &mu) + if err != nil { + cancel() + return err + } + return nil + }) + } + err := g.Wait() + if err != nil { + logErrorIfNotCancelledContext(err, "FindMissing: returning early because of error reading data") + return bbdigest.EmptySet, err + } + + toReturn := builder.Build() + + return toReturn, nil +} + +func (a *cassandraBlobAccess) isMissing(ctx context.Context, digest bbdigest.Digest, builder *bbdigest.SetBuilder, mu *sync.Mutex) error { + metadata, err := a.tables.metadata.read(ctx, digest) + if err != nil { + if errors.Is(err, gocql.ErrNotFound) { + mu.Lock() + defer mu.Unlock() + builder.Add(digest) + return nil + } + + return err + } + + a.tables.metadata.updateLastAccessTime(context.Background(), digest, metadata.instanceName, metadata.lastAccess, a.lastAccessUpdateInterval, a.lastAccessUpdate) + + return nil +} + +func lastUpdateWorker(jobs chan func()) { + for j := range jobs { + j() + } +} + +type tables struct { + content *contentTable + metadata *metadataTable + orphanedContent *orphanedContentTable +} + +func newTables(prefix string, session *gocql.Session) *tables { + return &tables{ + content: &contentTable{ + tableName: prefix + "_content", + session: session, + }, + metadata: &metadataTable{ + tableName: prefix + "_metadata", + session: session, + }, + orphanedContent: &orphanedContentTable{ + tableName: prefix + "_orphaned_content", + session: session, + }, + } +} + +type contentTable struct { + tableName string + session *gocql.Session +} + +func (t *contentTable) insertSegment(ctx context.Context, blobID string, data []byte, segment int, digestHash string) error { + defer func(start time.Time) { + segmentWriteHist.Observe(float64(time.Since(start).Seconds())) + }(time.Now()) + + if err := retryCassandraWrite( + ctx, + t.session, + writeConsistency, + fmt.Sprintf("INSERT INTO %s (blob_id, segment, content) VALUES(?, ?, ?)", t.tableName), + blobID, + segment, + data, + ); err != nil { + logErrorIfNotCancelledContext(err, "Put: unable to write to cassandra for blobID %s (%s) and segment %d", blobID, digestHash, segment) + return err + } + + return nil +} + +func (t *contentTable) isSegmentPresent(ctx context.Context, blobID string, segment int) bool { + // Quick check to see if the data is already present in the `contents` table. This + // might be the case if we wrote with one instance name, but now use another. We + // look for the last segment. + err := t.session.Query( + fmt.Sprintf("SELECT blob_id FROM %s WHERE blob_id = ? AND segment = ?", t.tableName), + blobID, + segment, + ).WithContext(ctx).Consistency(readConsistency).Idempotent(true).Exec() + return errors.Is(err, gocql.ErrNotFound) +} + +type metadataTable struct { + tableName string + session *gocql.Session +} + +// read queries the metadata table to check if a blob exists. +// It returns: +// - gocql.ErrNotFound when no row, or a row with empty blobID is found +// - a wrapped gocql error when the Cassandra query failed +func (t *metadataTable) read(ctx context.Context, digest bbdigest.Digest) (*metadataTableRow, error) { + var row metadataTableRow + var err error + + // We check the universal pool first + for _, name := range []string{"", digest.GetInstanceName().String()} { + err = t.session.Query( + fmt.Sprintf("SELECT digest_instance_name, last_access, segment_size, segment_count, blob_id FROM %s WHERE digest_function = ? AND digest_hash = ? AND digest_size_bytes = ? AND digest_instance_name = ? LIMIT 1", t.tableName), + digest.GetDigestFunction().GetEnumValue().String(), + digest.GetHashString(), + digest.GetSizeBytes(), + name, + ).WithContext(ctx).Consistency(readConsistency).Idempotent(true).Scan(&row.instanceName, &row.lastAccess, &row.segmentSize, &row.segmentCount, &row.blobID) + + if err == nil && row.blobID != "" { + break + } + } + + if err != nil && !errors.Is(err, gocql.ErrNotFound) { + logErrorIfNotCancelledContext(err, "metadataTable.read: Cassandra error reading metadata for digest %s", digest.String()) + return nil, fmt.Errorf("cassandra error reading metadata for digest %s: %w", digest.String(), err) + } + + if errors.Is(err, gocql.ErrNotFound) || row.blobID == "" { + return nil, gocql.ErrNotFound + } + + return &row, nil +} + +func (t *metadataTable) update(ctx context.Context, digest bbdigest.Digest, blobID, digestInstanceName string, segmentCount int, now time.Time, segmentSize int32) error { + // Finally update metadata table. Note that we use a broader `writeConsistency` here, so that + // if we fail over the metadata is correct in all datacenters. This may require a cross-DC read + // later on, but in most cases this won't be necessary as we don't expect to be constantly + // flipping between DCs. We only do this on a `put` (and not when updating `last_access`) + // because the `last_access` time is far less important than whether the data is present. + if err := retryCassandraWrite( + ctx, + t.session, + gocql.EachQuorum, + fmt.Sprintf("INSERT INTO %s (digest_function, digest_hash, digest_size_bytes, digest_instance_name, blob_id, segment_count, segment_size, last_access) VALUES(?, ?, ?, ?, ?, ?, ?, ?)", t.tableName), + digest.GetDigestFunction().GetEnumValue().String(), + digest.GetHashString(), + digest.GetSizeBytes(), + digestInstanceName, + blobID, + segmentCount, + segmentSize, + now, + ); err != nil { + logErrorIfNotCancelledContext(err, "Put: Unable to update info table for hash %s", digest.String()) + return err + } + + return nil +} + +func (t *metadataTable) updateLastAccessTime(ctx context.Context, digest bbdigest.Digest, instanceName string, lastAccessed time.Time, lastAccessUpdateInterval time.Duration, lastAccessUpdate chan<- func()) { + // Note, we are "adding" a negative duration. `Sub` takes a `time.Time` and returns a `time.Duration`, which + // is the opposite of what we want. + window := time.Now().Add(-lastAccessUpdateInterval) + if lastAccessed.After(window) { + return + } + + updaterFunc := func() { + // This query might insert a row with (blob_id, segment_count, segment_size) == (null, null, null), which is fine. + // Such a row should be considered missing from the POV of the service. Eventually, it will either be updated again + // with a blobID or (more likely) be reaped. + // We do not use `IF EXISTS` because it would make the query `SERIAL`, meaning it needs to be coordinated between + // all the replicas, which is very expensive (think of it as having to take a distributed lock for that row). + if err := retryCassandraWrite( + ctx, + t.session, + lastAccessUpdateConsistency, + fmt.Sprintf("UPDATE %s SET last_access = ? WHERE digest_function = ? AND digest_hash = ? AND digest_size_bytes = ? AND digest_instance_name = ?", t.tableName), + time.Now(), + digest.GetDigestFunction().GetEnumValue().String(), + digest.GetHashString(), + digest.GetSizeBytes(), + instanceName, + ); err != nil { + // There's nothing sensible we can do to recover here, but at least log the error + logErrorIfNotCancelledContext(err, "Unable to update last access time for %s", digest.String()) + lastAccessUpdatesFailures.Inc() + } + } + + select { + case lastAccessUpdate <- updaterFunc: + // Fantastic. We're able to update the last access time. + default: + // This isn't ideal, but one of two things is going to happen: + // 1. The digest won't be accessed again, and will age out sooner + // 2. The digest will be accessed again, and maybe next time we'll + // be able to update it. + // Either way, we're making the trade-off of a possible inability to + // update _everything_ properly for not blocking other operations in + // this blobstore. + } +} + +type metadataTableRow struct { + instanceName string + blobID string + segmentSize int + segmentCount int + lastAccess time.Time +} + +type orphanedContentTable struct { + tableName string + session *gocql.Session +} + +func (t *orphanedContentTable) delete(ctx context.Context, blobID, digestInstanceName string) { + _ = retryCassandraWrite( + ctx, + t.session, + writeConsistency, + fmt.Sprintf("DELETE FROM %s WHERE blob_id = ? AND digest_instance_name = ?", t.tableName), + blobID, + digestInstanceName, + ) +} + +func (t *orphanedContentTable) insert(ctx context.Context, digest bbdigest.Digest, blobID, digestInstanceName string, estimatedSegmentCount int, now time.Time) error { + if err := retryCassandraWrite( + ctx, + t.session, + writeConsistency, + fmt.Sprintf("INSERT INTO %s (digest_function, digest_hash, digest_size_bytes, digest_instance_name, blob_id, segment_count, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?)", t.tableName), + digest.GetDigestFunction().GetEnumValue().String(), + digest.GetHashString(), + digest.GetSizeBytes(), + digestInstanceName, + blobID, + estimatedSegmentCount, + now, + ); err != nil { + logErrorIfNotCancelledContext(err, "Unable to insert into orphan table for %s", digest.String()) + return err + } + + return nil +} + +type sequentialReader struct { + ctx context.Context + session *gocql.Session + tableName string + segmentCount int + blobID string + segmentNumber int + currentReader io.Reader + digest bbdigest.Digest +} + +func (s *sequentialReader) Read(p []byte) (n int, err error) { + if s.currentReader == nil { + err := s.fetchNext() + if err != nil { + logErrorIfNotCancelledContext(err, "Read for %s (blob id %s) failed", s.digest.String(), s.blobID) + return 0, err + } + } + + read, err := s.currentReader.Read(p) + if err != nil { + if err != io.EOF { + // If something went wrong, and it wasn't the end of the reader, bail + logErrorIfNotCancelledContext(err, "Get: Unable to fully read blob %s of %s", s.blobID, s.digest.String()) + return read, err + } + + // If we've read the end of the final segment, we're done! + if s.segmentNumber == s.segmentCount { + // Making it explicit that `err` is already `io.EOF` + return read, io.EOF + } + + // Otherwise, prep for the next read + err := s.fetchNext() + if err != nil { + return read, err + } + } + + return read, nil +} + +func (s *sequentialReader) fetchNext() error { + defer func(start time.Time) { + segmentReadHist.Observe(float64(time.Since(start).Seconds())) + }(time.Now()) + + var data []byte + err := s.session.Query( + fmt.Sprintf("SELECT content FROM %s WHERE blob_id = ? AND segment = ? LIMIT 1", s.tableName), + s.blobID, + s.segmentNumber, + ).WithContext(s.ctx).Consistency(readConsistency).Idempotent(true).Scan(&data) + s.segmentNumber++ + if err != nil { + logErrorIfNotCancelledContext(err, "Unable to read segment %d of %s", s.segmentNumber, s.digest.String()) + return err + } + s.currentReader = bytes.NewReader(data) + return nil +} + +func retryCassandraWrite(ctx context.Context, session *gocql.Session, consistency gocql.Consistency, query string, values ...interface{}) error { + // Attempt to write the segment up to three times. + var err error + maxTries := 3 + cqlQuery := session.Query(query, values...).WithContext(ctx).Consistency(consistency).Idempotent(true) + for i := 0; i < maxTries; i++ { + if i > 0 { + // Sleep for ~500ms, then ~5s. + duration := time.Duration(rand.Intn(int(math.Pow10(i + 2)))) + err := sleep(ctx, duration*time.Millisecond) + if errors.Is(err, context.Canceled) { + return err + } + } + + err = cqlQuery.Exec() + if err == nil { + return nil + } + + // If the context is cancelled, then there's not much else we can do. + if errors.Is(err, context.Canceled) { + return err + } + + logErrorIfNotCancelledContext(err, "Failed to write to backend (try %d/%d): %s", i+1, maxTries, formatQuery(cqlQuery)) + } + + return err +} + +func sleep(ctx context.Context, duration time.Duration) error { + timer := time.NewTimer(duration) + defer timer.Stop() + + select { + case <-timer.C: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/pkg/blobstore/cassandra/cassandra_blob_access_test.go b/pkg/blobstore/cassandra/cassandra_blob_access_test.go new file mode 100644 index 000000000..1d4b981f0 --- /dev/null +++ b/pkg/blobstore/cassandra/cassandra_blob_access_test.go @@ -0,0 +1,26 @@ +package cassandra + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_getSegmentCount(t *testing.T) { + tests := []struct { + segmentSize int32 + sizeInBytes int64 + want int + }{ + {64, 1, 1}, + {64, (12 * 64) - 1, 12}, + {64, (12 * 64), 12}, + {64, (12 * 64) + 1, 13}, + } + + for _, tt := range tests { + t.Run("", func(t *testing.T) { + require.Equal(t, tt.want, getSegmentCount(tt.segmentSize, tt.sizeInBytes)) + }) + } +} diff --git a/pkg/blobstore/cassandra/format.go b/pkg/blobstore/cassandra/format.go new file mode 100644 index 000000000..5f8d23b4d --- /dev/null +++ b/pkg/blobstore/cassandra/format.go @@ -0,0 +1,24 @@ +package cassandra + +import ( + "fmt" + "strings" + + "github.com/gocql/gocql" +) + +func formatQuery(query *gocql.Query) string { + valuesBuilder := strings.Builder{} + for i, v := range query.Values() { + if i != 0 { + valuesBuilder.WriteString(" ") + } + s := fmt.Sprintf("%v", v) + maxFieldStrlen := 100 + valuesBuilder.WriteString(s[:min(len(s), maxFieldStrlen)]) + if len(s) >= maxFieldStrlen { + valuesBuilder.WriteString("...") + } + } + return fmt.Sprintf("[query %s \"%s\" [%s]]", query.GetConsistency(), query.Statement(), valuesBuilder.String()) +} diff --git a/pkg/blobstore/cassandra/format_test.go b/pkg/blobstore/cassandra/format_test.go new file mode 100644 index 000000000..7292734c4 --- /dev/null +++ b/pkg/blobstore/cassandra/format_test.go @@ -0,0 +1,18 @@ +package cassandra + +import ( + "bytes" + "testing" + + "github.com/gocql/gocql" + "github.com/stretchr/testify/require" +) + +func Test_formatQuery(t *testing.T) { + session := gocql.Session{} + longBlob := bytes.Repeat([]byte("This is a very long string!"), 100) + query := session.Query("INSERT INTO some_table (id, first_name, long_blob) VALUES (?, ?, ?)", 1234, "Jon Snow", longBlob) + expectedOutput := `[query ANY "INSERT INTO some_table (id, first_name, long_blob) VALUES (?, ?, ?)" ` + + `[1234 Jon Snow [84 104 105 115 32 105 115 32 97 32 118 101 114 121 32 108 111 110 103 32 115 116 114 105 110 103 33...]]` + require.Equal(t, expectedOutput, formatQuery(query)) +} diff --git a/pkg/blobstore/configuration/BUILD.bazel b/pkg/blobstore/configuration/BUILD.bazel index a3236d0da..7732d37ff 100644 --- a/pkg/blobstore/configuration/BUILD.bazel +++ b/pkg/blobstore/configuration/BUILD.bazel @@ -21,6 +21,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/blobstore", + "//pkg/blobstore/cassandra", "//pkg/blobstore/completenesschecking", "//pkg/blobstore/grpcclients", "//pkg/blobstore/local", diff --git a/pkg/blobstore/configuration/new_blob_access.go b/pkg/blobstore/configuration/new_blob_access.go index 6a966cb97..564305f0b 100644 --- a/pkg/blobstore/configuration/new_blob_access.go +++ b/pkg/blobstore/configuration/new_blob_access.go @@ -3,11 +3,13 @@ package configuration import ( "archive/zip" "context" + "crypto/tls" "os" "sync" "time" "github.com/buildbarn/bb-storage/pkg/blobstore" + "github.com/buildbarn/bb-storage/pkg/blobstore/cassandra" "github.com/buildbarn/bb-storage/pkg/blobstore/local" "github.com/buildbarn/bb-storage/pkg/blobstore/mirrored" "github.com/buildbarn/bb-storage/pkg/blobstore/readcaching" @@ -569,6 +571,46 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration * BlobAccess: blobstore.NewDeadlineEnforcingBlobAccess(base.BlobAccess, timeout.AsDuration()), DigestKeyFormat: base.DigestKeyFormat, }, "deadline_enforcing", nil + + case *pb.BlobAccessConfiguration_Cassandra: + config := backend.Cassandra + + var cassandraTLSConfig *tls.Config + if config.Tls != nil { + var err error + cassandraTLSConfig, err = util.NewTLSConfigFromClientConfiguration(config.Tls) + if err != nil { + return BlobAccessInfo{}, "", util.StatusWrap(err, "Failed to create tls config") + } + } else { + cassandraTLSConfig = nil + } + + session, err := cassandra.NewCassandraSession( + config.Hosts, + config.Keyspace, + config.PreferredDc, + config.Port, + config.ProtocolVersion, + config.Username, + config.Password, + cassandraTLSConfig) + if err != nil { + return BlobAccessInfo{}, "", util.StatusWrap(err, "Unable to create cassandra session") + } + + blobAccess := cassandra.NewCassandraBlobAccess( + creator.GetDefaultCapabilitiesProvider(), + readBufferFactory, + session, + config.SegmentSizeBytes, + config.LastAccessUpdateInterval.AsDuration(), + config.TablePrefix, + ) + return BlobAccessInfo{ + BlobAccess: blobAccess, + DigestKeyFormat: creator.GetBaseDigestKeyFormat(), + }, "cassandra", nil } return creator.NewCustomBlobAccess(configuration, nc) } diff --git a/pkg/proto/configuration/blobstore/BUILD.bazel b/pkg/proto/configuration/blobstore/BUILD.bazel index f92085c37..77fa2419e 100644 --- a/pkg/proto/configuration/blobstore/BUILD.bazel +++ b/pkg/proto/configuration/blobstore/BUILD.bazel @@ -13,6 +13,7 @@ proto_library( "//pkg/proto/configuration/digest:digest_proto", "//pkg/proto/configuration/grpc:grpc_proto", "//pkg/proto/configuration/http:http_proto", + "//pkg/proto/configuration/tls:tls_proto", "@googleapis//google/rpc:status_proto", "@protobuf//:duration_proto", "@protobuf//:empty_proto", @@ -32,6 +33,7 @@ go_proto_library( "//pkg/proto/configuration/digest", "//pkg/proto/configuration/grpc", "//pkg/proto/configuration/http", + "//pkg/proto/configuration/tls", "@org_golang_google_genproto_googleapis_rpc//status", ], ) diff --git a/pkg/proto/configuration/blobstore/blobstore.proto b/pkg/proto/configuration/blobstore/blobstore.proto index 8f3409886..23ac39b0d 100644 --- a/pkg/proto/configuration/blobstore/blobstore.proto +++ b/pkg/proto/configuration/blobstore/blobstore.proto @@ -12,6 +12,7 @@ import "pkg/proto/configuration/cloud/gcp/gcp.proto"; import "pkg/proto/configuration/digest/digest.proto"; import "pkg/proto/configuration/grpc/grpc.proto"; import "pkg/proto/configuration/http/http.proto"; +import "pkg/proto/configuration/tls/tls.proto"; option go_package = "github.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore"; @@ -199,6 +200,11 @@ message BlobAccessConfiguration { // value. When gRPC calls are timed out a `DEADLINE_EXCEEDED` error // code will be returned. DeadlineEnforcingBlobAccess deadline_enforcing = 28; + + // Use Cassandra for storing blobs. It is recommended to use this + // backend in conjunction with a `local` store as part of a + // `read_caching` backend. + CassandraBlobAccessConfiguration cassandra = 29; } // Was 'redis'. Instead of using Redis, one may run a separate @@ -935,3 +941,52 @@ message DeadlineEnforcingBlobAccess { // The backend to which all operations are delegated. BlobAccessConfiguration backend = 2; } + +message CassandraBlobAccessConfiguration { + // The hosts to use, assuming no discovery is being used. + repeated string hosts = 1; + // Override the port to connect on from the default. + int32 port = 2; + + buildbarn.configuration.tls.ClientConfiguration tls = 3; + + // The Cassandra protocol version. This can normally be omitted from + // your config. + int32 protocol_version = 4; + + // If using a `NetworkTopologyStrategy` and a Cassandra instance with hosts + // in multiple locations, setting this enables host selection policies + // which will prioritize and return hosts which are in the specified + // datacentre before returning hosts in all other datercentres. + string preferred_dc = 5; + + // User name and password for connecting to the database. + string username = 6; + string password = 7; + + // Keyspace to use + string keyspace = 8; + + // The maximum size of a segment before a new one is created. Cassandra + // has a maximum row size of 1MB, so blobs larger than the value of this + // option will be broken up into a series of "segments", each of which is + // `segment_size_bytes` except for the last, which may be smaller. It is + // desirable to minimise the number of segments, so this value may need + // some tuning based on your usage, however a sensible starting point is to + // use a value of 512KB (524288 bytes) + int32 segment_size_bytes = 9; + + // Only update the last access timestamp for a node after this + // duration is exceeded. + google.protobuf.Duration last_access_update_interval = 10; + + // The table prefix to use when accessing data. This is required, and + // using it allows you to host multiple different environments in the same + // keyspace. Suggested values are `prod` and `uat`. + string table_prefix = 11; + + // Anything that comes with the `Digest.InstanceName` set to this value will + // be stored in cassandra with the `digest_instance_name` value set to the + // empty string. + string universal_instance_name = 12; +}