Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions flink-filesystems/flink-s3-fs-native/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,45 @@ input.sinkTo(FileSink.forRowFormat(new Path("s3://my-bucket/output"),

| Key | Default | Description |
|-----|---------|-------------|
| s3.access-key | (none) | AWS access key |
| s3.secret-key | (none) | AWS secret key |
| s3.access-key | (none) | AWS access key (fallback key: `s3.access.key`) |
| s3.secret-key | (none) | AWS secret key (fallback key: `s3.secret.key`) |
| s3.region | (auto-detect) | AWS region (auto-detected via AWS_REGION, ~/.aws/config, EC2 metadata) |
| s3.endpoint | (none) | Custom S3 endpoint (for MinIO, LocalStack, etc.) |
| s3.path-style-access | false | Use path-style access (auto-enabled for custom endpoints) |
| s3.upload.min.part.size | 5242880 | Minimum part size for multipart uploads (5MB) |
| s3.upload.max.concurrent.uploads | CPU cores | Maximum concurrent uploads per stream |
| s3.path-style-access | false | Use path-style access for S3 (required by most S3-compatible servers such as MinIO; fallback key: `s3.path.style.access`) |
| s3.chunked-encoding.enabled | true | Enable chunked encoding for S3 requests. Disable for S3-compatible servers that do not support it |
| s3.checksum-validation.enabled | true | Enable checksum validation for S3 requests. Disable for S3-compatible servers that do not support it |
| s3.upload.min.part.size | 5242880 | Minimum part size for multipart uploads (5MB to 5GB) |
| s3.upload.max.concurrent.uploads | CPU cores | Maximum concurrent part uploads per stream |
| s3.entropy.key | (none) | Key for entropy injection in paths |
| s3.entropy.length | 4 | Length of entropy string |
| s3.bulk-copy.enabled | true | Enable bulk copy operations |
| s3.bulk-copy.enabled | true | Enable bulk copy operations using S3TransferManager |
| s3.bulk-copy.max-concurrent | 16 | Maximum number of concurrent copy operations |
| s3.connection.max | 50 | Maximum HTTP connections in the S3 client connection pool. Applies to both sync (Apache HTTP) and async (Netty) clients. Must be ≥ `s3.bulk-copy.max-concurrent` |
| s3.async.enabled | true | Enable async read/write with TransferManager |
| s3.read.buffer.size | 262144 (256KB) | Read buffer size per stream (64KB - 4MB) |

### Credentials Provider

| Key | Default | Description |
|-----|---------|-------------|
| fs.s3.aws.credentials.provider | (none) | Comma-separated list of AWS credentials provider class names. Providers are tried in order; the first one that returns credentials is used. Supports fully-qualified AWS SDK v2 class names (e.g. `software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider`) or simple names from the SDK auth package (e.g. `AnonymousCredentialsProvider`, `DefaultCredentialsProvider`). When not set, the default chain is used: delegation tokens → static credentials (if configured) → `DefaultCredentialsProvider` |

### Retries and Timeouts

| Key | Default | Description |
|-----|---------|-------------|
| s3.retry.max-num-retries | 3 | Maximum retry attempts for failed S3 requests. Uses the AWS SDK's default retry strategy (exponential backoff with jitter). Set to `0` to disable retries |
| s3.connection.timeout | 60s | HTTP connection timeout for the S3 client (time to establish a connection) |
| s3.socket.timeout | 60s | HTTP socket timeout for the S3 client (time to wait for data after connection is established) |
| s3.connection.max-idle-time | 60s | Maximum idle time for HTTP connections in the connection pool |
| s3.close.timeout | 60s | Timeout for closing the S3 filesystem (waiting for pending operations to complete during shutdown) |
| s3.client.close.timeout | 30s | Timeout for closing the S3 client and releasing resources |

### Server-Side Encryption (SSE)

| Key | Default | Description |
|-----|---------|-------------|
| s3.sse.type | none | Encryption type: `none`, `sse-s3` (AES256), `sse-kms` (AWS KMS) |
| s3.sse.type | none | Encryption type. Accepted values: `none`, `sse-s3` or `aes256` (S3-managed keys), `sse-kms` or `aws:kms` (KMS-managed keys). The `aes256` and `aws:kms` aliases match Hadoop S3A's `fs.s3a.server-side-encryption-algorithm` values to ease migration |
| s3.sse.kms.key-id | (none) | KMS key ID/ARN/alias for SSE-KMS (uses default aws/s3 key if not specified) |
| s3.sse.kms.encryption-context | (none) | Encryption context key-value pairs for SSE-KMS. Format: `key1:value1,key2:value2`. Keys/values containing `:` must be quoted. |

Expand Down Expand Up @@ -108,13 +129,15 @@ Bucket names containing dots (e.g., `my.company.data`) are fully supported throu

### Supported Properties

All global S3 configuration properties can be overridden at the bucket level:
Only the following properties can be overridden at the bucket level. Any other `s3.bucket.<bucket>.<prop>` key is ignored (a warning is logged):

- **Connection:** `region`, `endpoint`, `path-style-access`
- **Credentials:** `access-key`, `secret-key`, `aws.credentials.provider`
- **Encryption:** `sse.type`, `sse.kms.key-id`
- **IAM Assume Role:** `assume-role.arn`, `assume-role.external-id`, `assume-role.session-name`, `assume-role.session-duration`

Timeouts, retries, encoding/checksum flags, entropy, upload/copy settings, and the credentials provider chain are configured globally only.

## Server-Side Encryption (SSE)

The filesystem supports server-side encryption for data at rest:
Expand Down Expand Up @@ -250,19 +273,23 @@ s3.assume-role.session-duration: 3600 # 1 hour

## MinIO and S3-Compatible Storage

The filesystem auto-detects custom endpoints and configures appropriate settings:
For S3-compatible servers (MinIO, LocalStack, Ceph RGW, etc.), set the endpoint plus any compatibility flags the server requires. These flags are not auto-detected from the endpoint value — the defaults target AWS S3 and must be overridden explicitly:

```yaml
s3.access-key: minioadmin
s3.secret-key: minioadmin
s3.secret-key: minioadmin
s3.endpoint: http://localhost:9000
s3.path-style-access: true # Auto-enabled for custom endpoints

# Required: MinIO does not support virtual-hosted-style addressing.
s3.path-style-access: true

# Required: MinIO does not support AWS chunked encoding or AWS-style
# checksum trailers used by the SDK by default.
s3.chunked-encoding.enabled: false
s3.checksum-validation.enabled: false
```

MinIO-specific optimizations are applied automatically:
- Path-style access enabled
- Chunked encoding disabled (compatibility)
- Checksum validation disabled (compatibility)
The exact subset of compatibility flags needed depends on the server and version — consult its documentation. The defaults in this filesystem (`path-style-access=false`, `chunked-encoding.enabled=true`, `checksum-validation.enabled=true`) are tuned for AWS S3.

## Memory Optimization for Large Files

Expand Down Expand Up @@ -395,6 +422,8 @@ s3.endpoint: http://localhost:9000
s3.access-key: minioadmin
s3.secret-key: minioadmin
s3.path-style-access: true
s3.chunked-encoding.enabled: false
s3.checksum-validation.enabled: false
EOF

$FLINK_HOME/bin/flink run YourJob.jar
Expand Down