Skip to content
Open
35 changes: 35 additions & 0 deletions docs/configuration/storage-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Quickwit currently supports four types of storage providers:

Storage URIs refer to different storage providers identified by a URI "protocol" or "scheme". Quickwit supports the following storage URI protocols:
- `s3://` for Amazon S3 and S3-compatible
- `s3+<name>://` for additional S3-compatible backends configured under `storage.s3.named.<name>` (see [Named S3 backends](#named-s3-backends))
- `azure://` for Azure Blob Storage
- `file://` for local file systems
- `gs://` for Google Cloud Storage
Expand Down Expand Up @@ -104,6 +105,40 @@ storage:
endpoint: https://storage.googleapis.com
```

#### Named S3 backends

In addition to the primary `s3:` block, you can declare any number of additional S3-compatible backends under `storage.s3.named.<name>`. Each entry is an independent endpoint with its own credentials, region, and flags. Indexes route to a named backend via the URI scheme `s3+<name>://bucket/path` (plain `s3://` continues to use the primary endpoint).

Each named entry accepts the same fields as the primary `s3:` block, *except* `named` itself (no recursion). If `access_key_id` / `secret_access_key` are omitted on a named entry, the global AWS SDK credential chain is used (env vars, instance metadata, etc.).

```yaml
storage:
s3:
# Primary backend — addressed by plain `s3://...` URIs.
endpoint: https://s3.us-east-1.amazonaws.com
region: us-east-1
named:
# Addressed by `s3+secondary://bucket/path` URIs.
secondary:
endpoint: https://s3.eu-west-3.amazonaws.com
region: eu-west-3
access_key_id: ${SECONDARY_S3_ACCESS_KEY_ID}
secret_access_key: ${SECONDARY_S3_SECRET_ACCESS_KEY}
# Addressed by `s3+seaweed://bucket/path` URIs. Falls back to the
# global AWS SDK credentials when keys are omitted.
seaweed:
endpoint: http://seaweedfs-s3:8333
region: us-east-1
force_path_style_access: true
```

An index pointed at a named backend declares its URI accordingly:

```yaml
index_id: logs-eu
index_uri: s3+secondary://logs-bucket/logs-eu
```

### Azure storage configuration

| Property | Description | Default value |
Expand Down
41 changes: 37 additions & 4 deletions quickwit/quickwit-common/src/uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ impl FromStr for Protocol {
"ram" => Ok(Protocol::Ram),
"s3" => Ok(Protocol::S3),
"gs" => Ok(Protocol::Google),
// `s3+<name>://...` for a named S3-compatible backend configured under
// `storage.s3.named.<name>`. Routes to the same factory as `s3://`.
s if s.starts_with("s3+") && s.len() > 3 => Ok(Protocol::S3),
Comment thread
papaharry marked this conversation as resolved.
_ => bail!("unknown URI protocol `{protocol}`"),
}
}
Expand Down Expand Up @@ -262,9 +265,13 @@ impl Uri {
if uri_str.is_empty() {
bail!("failed to parse empty URI");
}
let (protocol, mut path) = match uri_str.split_once(PROTOCOL_SEPARATOR) {
None => (Protocol::File, uri_str.to_string()),
Some((protocol, path)) => (Protocol::from_str(protocol)?, path.to_string()),
let (scheme_opt, protocol, mut path) = match uri_str.split_once(PROTOCOL_SEPARATOR) {
None => (None, Protocol::File, uri_str.to_string()),
Some((scheme, path)) => (
Some(scheme.to_string()),
Protocol::from_str(scheme)?,
path.to_string(),
),
};
if protocol == Protocol::File {
if path.starts_with('~') {
Expand Down Expand Up @@ -292,8 +299,14 @@ impl Uri {
.to_string_lossy()
.to_string();
}
// Preserve `s3+<name>` qualifier so the storage resolver can route to
// the named backend; other schemes normalize to canonical form.
let display_scheme: &str = match scheme_opt.as_deref() {
Some(s) if s.starts_with("s3+") => s,
_ => protocol.as_str(),
};
Ok(Self {
uri: format!("{protocol}{PROTOCOL_SEPARATOR}{path}"),
uri: format!("{display_scheme}{PROTOCOL_SEPARATOR}{path}"),
protocol,
})
}
Expand Down Expand Up @@ -812,4 +825,24 @@ mod tests {
serde_json::Value::String("s3://bucket/key".to_string())
);
}

#[test]
fn test_uri_s3_named_preserved() {
// The `s3+<name>` qualifier is the routing token for named S3-compatible
// backends (`storage.s3.named.<name>`). It must survive parse + serialize
// so the storage resolver can recover the backend name on deserialization;
// before this guarantee, the qualifier was stripped by URI normalization
// and every `s3+<name>://` URI silently resolved to the primary endpoint.
let uri = Uri::from_str("s3+alt://bucket/key").unwrap();
assert_eq!(uri.protocol(), Protocol::S3);
assert_eq!(uri.as_str(), "s3+alt://bucket/key");
let json = serde_json::to_value(&uri).unwrap();
assert_eq!(
json,
serde_json::Value::String("s3+alt://bucket/key".to_string())
);
let round_trip: Uri = serde_json::from_value(json).unwrap();
assert_eq!(round_trip.as_str(), "s3+alt://bucket/key");
assert_eq!(round_trip.protocol(), Protocol::S3);
}
}
133 changes: 133 additions & 0 deletions quickwit/quickwit-config/src/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,87 @@ pub struct S3StorageConfig {
pub disable_stalled_stream_protection_upload: bool,
#[serde(default)]
pub disable_stalled_stream_protection_download: bool,
/// Additional named S3-compatible backends, addressed via `s3+<name>://bucket/path`
/// URIs. Each entry is an independent endpoint with its own credentials, region,
/// etc. The map key (`<name>`) is the routing token used in the URI scheme.
#[serde(default)]
#[serde(skip_serializing_if = "std::collections::BTreeMap::is_empty")]
pub named: std::collections::BTreeMap<String, NamedS3StorageConfig>,
}

/// Configuration for a named S3-compatible backend nested under
/// `storage.s3.named.<name>`. Mirrors `S3StorageConfig` but cannot itself
/// have a `named` field (no recursion).
#[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct NamedS3StorageConfig {
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub flavor: Option<StorageBackendFlavor>,
#[serde(default)]
pub access_key_id: Option<String>,
#[serde(default)]
pub secret_access_key: Option<String>,
#[serde(default)]
pub region: Option<String>,
#[serde(default)]
pub endpoint: Option<String>,
#[serde(default)]
pub force_path_style_access: bool,
#[serde(default)]
pub disable_multi_object_delete: bool,
#[serde(default)]
pub disable_multipart_upload: bool,
#[serde(default)]
pub checksum_algorithm: ChecksumAlgorithm,
}

impl NamedS3StorageConfig {
/// Project this named config back into a full `S3StorageConfig`
/// (with an empty `named` map) so it can flow through the existing
/// S3 client construction code unchanged.
pub fn as_s3_config(&self) -> S3StorageConfig {
S3StorageConfig {
flavor: self.flavor,
access_key_id: self.access_key_id.clone(),
secret_access_key: self.secret_access_key.clone(),
region: self.region.clone(),
endpoint: self.endpoint.clone(),
force_path_style_access: self.force_path_style_access,
disable_multi_object_delete: self.disable_multi_object_delete,
disable_multipart_upload: self.disable_multipart_upload,
checksum_algorithm: self.checksum_algorithm,
disable_checksums: false,
disable_stalled_stream_protection_upload: false,
disable_stalled_stream_protection_download: false,
named: Default::default(),
}
}

pub fn redact(&mut self) {
if let Some(secret_access_key) = self.secret_access_key.as_mut() {
*secret_access_key = "***redacted***".to_string();
}
}
}

impl fmt::Debug for NamedS3StorageConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NamedS3StorageConfig")
.field("flavor", &self.flavor)
.field("access_key_id", &self.access_key_id)
.field(
"secret_access_key",
&self.secret_access_key.as_ref().map(|_| "***redacted***"),
)
.field("region", &self.region)
.field("endpoint", &self.endpoint)
.field("force_path_style_access", &self.force_path_style_access)
.field("disable_multi_object_delete", &self.disable_multi_object_delete)
.field("disable_multipart_upload", &self.disable_multipart_upload)
.field("checksum_algorithm", &self.checksum_algorithm)
.finish()
}
}

impl S3StorageConfig {
Expand Down Expand Up @@ -397,6 +478,9 @@ impl S3StorageConfig {
if let Some(secret_access_key) = self.secret_access_key.as_mut() {
*secret_access_key = "***redacted***".to_string();
}
for named_config in self.named.values_mut() {
named_config.redact();
}
}

pub fn endpoint(&self) -> Option<String> {
Expand Down Expand Up @@ -442,6 +526,7 @@ impl fmt::Debug for S3StorageConfig {
"disable_stalled_stream_protection_download",
&self.disable_stalled_stream_protection_download,
)
.field("named", &self.named)
.finish()
}
}
Expand Down Expand Up @@ -740,4 +825,52 @@ mod tests {
assert_eq!(s3_storage_config.flavor, Some(StorageBackendFlavor::MinIO));
}
}

#[test]
fn test_storage_s3_named_backends_serde() {
let s3_storage_config_yaml = r#"
endpoint: https://primary.example.com
region: us-east-1
named:
alt:
endpoint: https://alt.example.com
region: eu-west-3
force_path_style_access: true
access_key_id: alt-key
secret_access_key: alt-secret
secondary:
endpoint: http://secondary.example.com:8333
region: us-east-1
force_path_style_access: true
"#;
let s3_storage_config: S3StorageConfig =
serde_yaml::from_str(s3_storage_config_yaml).unwrap();
assert_eq!(s3_storage_config.named.len(), 2);

let alt = s3_storage_config.named.get("alt").unwrap();
assert_eq!(alt.region.as_deref(), Some("eu-west-3"));
assert_eq!(alt.access_key_id.as_deref(), Some("alt-key"));
assert!(alt.force_path_style_access);

// `as_s3_config` projects a named entry back into a full S3StorageConfig
// (with an empty `named` map) so it can drive the S3 client builder
// unchanged.
let projected = alt.as_s3_config();
assert_eq!(projected.region.as_deref(), Some("eu-west-3"));
assert_eq!(projected.access_key_id.as_deref(), Some("alt-key"));
assert!(projected.force_path_style_access);
assert!(projected.named.is_empty());
}

#[test]
fn test_storage_s3_named_backends_redact() {
let mut named = NamedS3StorageConfig {
access_key_id: Some("public-key".to_string()),
secret_access_key: Some("super-secret".to_string()),
..Default::default()
};
named.redact();
assert_eq!(named.access_key_id.as_deref(), Some("public-key"));
assert_eq!(named.secret_access_key.as_deref(), Some("***redacted***"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use aws_sdk_s3::Client as S3Client;
use quickwit_common::uri::Uri;
use quickwit_config::{S3StorageConfig, StorageBackend};
use tokio::sync::OnceCell;
use tokio::sync::{Mutex, OnceCell};

use super::s3_compatible_storage::create_s3_client;
use crate::{
DebouncedStorage, S3CompatibleObjectStorage, Storage, StorageFactory, StorageResolverError,
};

/// Extracts the named-backend key out of an `s3+<name>://...` URI, if any.
/// Returns `None` for plain `s3://...`.
fn parse_named_key(uri: &Uri) -> Option<&str> {
let scheme_end = uri.as_str().find("://")?;
let scheme = &uri.as_str()[..scheme_end];
scheme.strip_prefix("s3+")
}

/// S3 compatible object storage resolver.
pub struct S3CompatibleObjectStorageFactory {
storage_config: S3StorageConfig,
Expand All @@ -34,6 +43,8 @@ pub struct S3CompatibleObjectStorageFactory {
// end up being used, or if something like azure, gcs, or even local files, will be used
// instead.
s3_client: OnceCell<S3Client>,
// One cached S3Client per named backend. Built lazily on first use.
named_s3_clients: Mutex<HashMap<String, S3Client>>,
}

impl S3CompatibleObjectStorageFactory {
Expand All @@ -42,6 +53,7 @@ impl S3CompatibleObjectStorageFactory {
Self {
storage_config,
s3_client: OnceCell::new(),
named_s3_clients: Mutex::new(HashMap::new()),
}
}
}
Expand All @@ -53,6 +65,30 @@ impl StorageFactory for S3CompatibleObjectStorageFactory {
}

async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
if let Some(name) = parse_named_key(uri) {
let named_config = self
.storage_config
.named
.get(name)
.ok_or_else(|| {
StorageResolverError::InvalidUri(format!(
"no `storage.s3.named.{name}` entry configured for URI `{uri}`"
))
})?
.as_s3_config();
let mut clients = self.named_s3_clients.lock().await;
let client = if let Some(client) = clients.get(name) {
client.clone()
} else {
let client = create_s3_client(&named_config).await;
Comment thread
papaharry marked this conversation as resolved.
Outdated
clients.insert(name.to_string(), client.clone());
client
};
drop(clients);
let storage =
S3CompatibleObjectStorage::from_uri_and_client(&named_config, uri, client).await?;
return Ok(Arc::new(DebouncedStorage::new(storage)));
}
let s3_client = self
.s3_client
.get_or_init(|| create_s3_client(&self.storage_config))
Expand All @@ -64,3 +100,23 @@ impl StorageFactory for S3CompatibleObjectStorageFactory {
Ok(Arc::new(DebouncedStorage::new(storage)))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_named_key() {
// Plain s3:// URIs route through the primary backend.
assert_eq!(parse_named_key(&Uri::for_test("s3://bucket/key")), None);
// `s3+<name>` URIs return the named-backend key.
assert_eq!(
parse_named_key(&Uri::for_test("s3+alt://bucket/key")),
Some("alt")
);
assert_eq!(
parse_named_key(&Uri::for_test("s3+with-dash://bucket/key")),
Some("with-dash")
);
}
}