Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 47 additions & 17 deletions flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

The **Delta Flink Connector** enables Apache Flink streaming jobs to write data into **Delta Lake tables**.
```aiignore
Note: this is a private build right now and there will be no product support provided.
Suggestions and feedbacks are welcome.
Note: this is a private build right now. Suggestions and feedbacks are welcome.
```

### Supported Flink Versions
Expand Down Expand Up @@ -256,18 +255,19 @@ There are two categories of per-table config:
- Affect runtime behavior and are **not stored** in Delta table metadata.


| Key | Type, | Default | Description |
|-----------------------|---------|-----------|------------------------------------------------------------------------------------------------------------------------|
| checkpoint.frequency | Double | 0.0 | Probability [0.0–1.0] to create Delta checkpoint on commit. `0.0` disables checkpoints, `1.0` checkpoints every commit |
| checksum.enable | Boolean | true | Generate checksum files on commit |
| file_rolling.strategy | String | size | size / count |
| file_rolling.size | Integer | 104857600 | Number of bytes per file |
| file_rolling.count | Integer | | Max records per file |
| schema_evolution.mode | String | no | no → strict, newcolumn → allow adding new columns |
| Key | Type, | Default | Description |
|-----------------------|---------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| checkpoint.frequency | Double | 0.0 | Probability [0.0–1.0] to create Delta checkpoint on commit. `0.0` disables checkpoints, `1.0` checkpoints every commit |
| checksum.enable | Boolean | true | Generate checksum files on commit |
| file_rolling.strategy | String | size | size / count |
| file_rolling.size | Integer | 104857600 | Number of bytes per file |
| file_rolling.count | Integer | | Max records per file |
| schema_evolution.mode | String | no | no → strict, newcolumn → allow adding new columns |
| credentials.source | String | uc | Storage credential source: `uc` (fetch temporary credentials from Unity Catalog) or `ambient` (fetch nothing; rely on the runtime environment — workload identity, instance profile, etc) |

---

### 5.3 Default Delta Properties Provided by the Sink
### Default Delta Properties Provided by the Sink

The sink sets defaults for certain Delta properties, which can be overridden by user configs:

Expand Down Expand Up @@ -298,20 +298,33 @@ If an unsupported schema change is detected, the sink will fail the job.

## 7. Security & Credentials

The sink resolves storage credentials based on the per-table `credentials.source` option:
- `uc` (default): fetch temporary credentials from Unity Catalog (vended and rotated automatically).
- `ambient`: fetch nothing; rely on the runtime environment (workload identity, instance profile,
ADC, or `core-site.xml`) to supply them.

### Unity Catalog (UC)

When using Unity Catalog:
- Clients must provide a **Personal Access Token (PAT)**
This is the default (`credentials.source = uc`). When using Unity Catalog:
- Clients must provide a **Personal Access Token (PAT)** or **OAuth2 Configuration**
- UC handles **credential vending**
- Temporary credentials are managed/rotated automatically by UC

Typical config keys:
- `unitycatalog.endpoint`
- `unitycatalog.token`
Provide exactly one authentication method; both share `unitycatalog.endpoint`:

- **Personal Access Token (PAT)**
- `unitycatalog.token`
- **OAuth2 (client credentials)**
- `unitycatalog.oauth.uri`
- `unitycatalog.oauth.client_id`
- `unitycatalog.oauth.client_secret`

> DataStream API equivalents: `withToken(...)` for PAT, or `withOauthUri(...)` /
> `withOauthClientId(...)` / `withOauthClientSecret(...)` for OAuth2.

### Path-based Access (Without UC)

When using path-based access without UC support:
When using path-based access without UC support, set `credentials.source = ambient`. Then:
- Users can configure static S3 credentials in:

```
Expand Down Expand Up @@ -340,6 +353,23 @@ Example:
</configuration>
```

Alternatively, the same `fs.*` settings can be supplied directly as **per-table options**, without
editing `core-site.xml`. They are forwarded to the engine's Hadoop `Configuration` and override the
connector's built-in file-system defaults (see [Per-table Configuration](#per-table-configuration)).

```sql
CREATE TEMPORARY TABLE sink (
id BIGINT,
dt STRING
) WITH (
'connector' = 'delta',
'table_path' = '<path>',
'fs.s3a.access.key' = 'YOUR_ACCESS_KEY',
'fs.s3a.secret.key' = 'YOUR_SECRET_KEY',
'fs.s3a.endpoint' = 'https://s3.amazonaws.com'
);
```

---

## 8. Recommended Tuning for High-Partition Tables
Expand Down
9 changes: 6 additions & 3 deletions flink/src/main/java/io/delta/flink/table/TableConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,16 @@ public Map<String, String> catalogConf() {
/**
* Configuration to be forwarded to the Kernel engine.
*
* <p>This returns the subset of configuration entries that are relevant to engine-side behavior.
* If your engine uses different option names, translate them here.
* <p>This carries custom options that will be passed to the engine. Currently it includes all
* customer-provided file-system options (keys starting with {@code "fs."}, e.g. {@code
* fs.s3a.access.key}), which are added to the Hadoop {@code Configuration} used by the engine.
*
* @return a map of engine configuration entries
*/
public Map<String, String> engineConf() {
return Map.of();
return raw.entrySet().stream()
.filter(entry -> entry.getKey().startsWith("fs."))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

/** @return whether checksum file creation is enabled for this table */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.defaults.engine.fileio.FileIO;
import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.Literal;
Expand All @@ -39,6 +40,7 @@
import io.delta.kernel.utils.CloseableIterable;
import io.delta.kernel.utils.CloseableIterator;
import java.io.File;
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.file.AccessDeniedException;
import java.util.*;
Expand Down Expand Up @@ -161,6 +163,63 @@ void testTableStoredConfIntoDeltaLogs() {
});
}

/**
* Reads a single merged Hadoop configuration value from the engine produced by {@link
* AbstractKernelTable#createEngine()}. {@code createEngine} builds the configuration internally
* and only exposes it through the engine's {@link FileIO}, so this reaches it via {@code
* DefaultEngine}'s private {@code fileIO} field rather than adding a production-only accessor.
*/
private static String engineConfValue(Engine engine, String key) throws Exception {
Field fileIOField = DefaultEngine.class.getDeclaredField("fileIO");
fileIOField.setAccessible(true);
FileIO fileIO = (FileIO) fileIOField.get(engine);
return fileIO.getConf(key).orElse(null);
}

@Test
void testEngineConfigurationPrecedence() {
StructType schema = new StructType().add("id", IntegerType.INTEGER);

// Customer-provided fs.* options: one overrides a built-in default, one collides with a
// credential vended by the credential manager, one is unique.
Map<String, String> tableConfig = new HashMap<>();
tableConfig.put("fs.s3a.path.style.access", "true"); // built-in default is "false"
tableConfig.put("fs.s3a.access.key", "customer-key"); // also vended by the credential manager
tableConfig.put("fs.s3a.endpoint", "customer-endpoint"); // no default, no credential

withTempDir(
dir -> {
LocalFileSystemTable table =
new LocalFileSystemTable(dir.toURI(), tableConfig, schema, Collections.emptyList());
// Inject a credential manager that vends a value colliding with a customer fs.* key.
// The field is protected/transient and only initialized by open() when null, so setting
// it here needs no change to AbstractKernelTable.
table.credentialManager =
new CredentialManager.AmbientCredentialManager() {
@Override
Map<String, String> getCredentials() {
return Map.of("fs.s3a.access.key", "credential-key");
}
};

// engineConf carries the customer-provided fs.* options verbatim.
assertEquals(
Map.of(
"fs.s3a.path.style.access", "true",
"fs.s3a.access.key", "customer-key",
"fs.s3a.endpoint", "customer-endpoint"),
table.conf.engineConf());

Engine engine = table.createEngine();
// Customer value overrides the built-in default.
assertEquals("true", engineConfValue(engine, "fs.s3a.path.style.access"));
// Credential manager value overrides the customer value.
assertEquals("credential-key", engineConfValue(engine, "fs.s3a.access.key"));
// Customer value with no default/credential is passed through.
assertEquals("customer-endpoint", engineConfValue(engine, "fs.s3a.endpoint"));
});
}

@Test
void testCredentialManagerFromConf() {
StructType schema = new StructType().add("id", IntegerType.INTEGER);
Expand Down
70 changes: 70 additions & 0 deletions flink/src/test/java/io/delta/flink/table/TableConfTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.flink.table;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;

/** JUnit test suite for {@link TableConf}. */
class TableConfTest {

@Test
void testEngineConfIncludesAllCustomerProvidedFsKeys() {
Map<String, String> raw = new HashMap<>();
raw.put("fs.s3a.access.key", "AKIA");
raw.put("fs.s3a.secret.key", "secret");
raw.put("fs.azure.account.key.acct.dfs.core.windows.net", "azkey");
// Non-fs keys must not leak into engineConf.
raw.put("delta.feature.v2Checkpoint", "supported");
raw.put("io.unitycatalog.tableId", "uuid");
raw.put("checkpoint.frequency", "0.5");

Map<String, String> engineConf = new TableConf(raw).engineConf();

assertEquals(3, engineConf.size());
assertEquals("AKIA", engineConf.get("fs.s3a.access.key"));
assertEquals("secret", engineConf.get("fs.s3a.secret.key"));
assertEquals("azkey", engineConf.get("fs.azure.account.key.acct.dfs.core.windows.net"));
assertFalse(engineConf.containsKey("delta.feature.v2Checkpoint"));
assertFalse(engineConf.containsKey("io.unitycatalog.tableId"));
assertFalse(engineConf.containsKey("checkpoint.frequency"));
}

@Test
void testEngineConfEmptyWhenNoFsKeys() {
Map<String, String> raw = new HashMap<>();
raw.put("delta.appendOnly", "true");
raw.put("checkpoint.frequency", "1.0");

assertTrue(new TableConf(raw).engineConf().isEmpty());
}

@Test
void testEngineConfPicksUpKeysAddedViaUpdate() {
TableConf conf = new TableConf(new HashMap<>());
assertTrue(conf.engineConf().isEmpty());

conf.update(Map.of("fs.gs.project.id", "my-project"));

assertEquals(Map.of("fs.gs.project.id", "my-project"), conf.engineConf());
}
}
Loading