Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions plugin/trino-ip-obfuscation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Trino IP Retention UDF

Prototype Java Trino plugin for the Unilogs IP retention design.

The function exposed to Trino is:

```sql
singular_ip_decrypt(ip_payload varchar) returns varchar
```

It expects an encrypted IP envelope:

```text
<<IP-ENCRYPTED||DAY=<event_day>||IV=<12_byte_iv_hex>||VALUE=<64_byte_ciphertext_hex>>>
```

MySQL `EncryptedVariable` remains the source of truth for daily keys. The daily key task publishes a small active-key snapshot to S3, and the UDF reads that snapshot into private Trino-worker memory. The row path does not call MySQL, Django, S3, or KMS.

Each key is a 64-character SHA256 hex string in `aes_key_sha256` that decodes to the 32-byte daily key.

Deleting the event-day key from MySQL and refreshing the key snapshot simulates 30-day retention expiry. In that case `singular_ip_decrypt(...)` returns `NULL`.

For performance and simplicity, the UDF lazily loads the snapshot on the first decrypt call in each Trino worker and keeps it in private plugin memory with a TTL. Each snapshot entry is the daily key used by the legacy SHA512/XOR envelope. Each UDF row does an event-day map lookup and decrypts the fixed 64-byte ciphertext payload. The original value is treated as an opaque UTF-8 string; the UDF does not parse or normalize IP addresses.

## Key Snapshot Configuration

Production Trino workers should use a local Trino-side properties file. By default the plugin reads:

```text
/etc/trino/singular-ip-udf.properties
```

Example:

```properties
singular.ip.key-snapshot.uri=s3://ip-retention-key-snapshot-prod-us-west-2/active_snapshot.json
singular.ip.key-snapshot.require-s3=true
singular.ip.key-snapshot.s3-region=us-west-2
singular.ip.key-snapshot.refresh-interval-seconds=43200
singular.ip.key-snapshot.request-timeout-seconds=5
```

The config file path can be overridden with JVM property `singular.ip.config.path` or env var `SINGULAR_IP_CONFIG_PATH`.

JVM system properties and environment variables are still supported as overrides, mostly for local testing:

| Environment variable | JVM property | Purpose |
|---|---|---|
| `SINGULAR_IP_KEY_SNAPSHOT_URI` | `singular.ip.key-snapshot.uri` | Key snapshot URI. Production should use `s3://bucket/key`; local tests may use `file://...`. Required on Trino workers. |
| `SINGULAR_IP_KEY_SNAPSHOT_REQUIRE_S3` | `singular.ip.key-snapshot.require-s3` | Reject non-S3 snapshot URIs. Set to `true` in production so stale local files cannot be used accidentally. Default: `false`. |
| `SINGULAR_IP_KEY_SNAPSHOT_S3_REGION` | `singular.ip.key-snapshot.s3-region` | Optional S3 region override. If omitted, AWS SDK default region resolution is used. |
| `SINGULAR_IP_KEY_SNAPSHOT_REFRESH_INTERVAL_SECONDS` | `singular.ip.key-snapshot.refresh-interval-seconds` | Warm-cache refresh interval. Default: `43200` / 12 hours. |
| `SINGULAR_IP_KEY_SNAPSHOT_REQUEST_TIMEOUT_SECONDS` | `singular.ip.key-snapshot.request-timeout-seconds` | Snapshot read timeout. Default: `5`. |

The scheduled refresh is intentionally long because daily keys are generated once per day. If a row references an event day that is not present in the cached snapshot, the UDF returns `NULL`; missing keys do not trigger an extra snapshot refresh.

## Build and test

The host does not need Java/Maven. Use Docker:

```bash
cd /Users/bar_zas/workspace/singular/tools/trino-ip-udf
docker run --rm \
-v "$PWD":/workspace \
-v "$HOME/.m2":/root/.m2 \
-w /workspace \
maven:3.9-eclipse-temurin-24 \
mvn test package
```

The package step creates:

```text
target/trino-ip-udf-plugin/
```

That directory is the one to mount into Trino under `/usr/lib/trino/plugin/singular-ip-udf`.

The plugin runtime is decrypt-only. Encryption fixtures live under `src/test` for unit tests; production encryption belongs in the enrichment writer.

## Runtime logs

The UDF logs decrypt timing aggregates without logging IP values, encrypted payloads, or key material.

- `IpDecryptFunction` logs the first decrypt call and then every `100000` calls by default: total calls, success/missing-key/null/failure counters, total duration, average duration, and last-call duration.
- `KeySnapshotStore` logs `INFO` when a key snapshot has to be fetched/refreshed and how long the fetch took.
- `KeySnapshotStore` logs `DEBUG` when the existing in-memory snapshot is reused and when an event-day key lookup hits or misses the cached snapshot.

The decrypt timing interval can be changed with JVM property:

```text
-Dsingular.ip.decrypt-stats-log-interval-rows=100000
```

Set it to `0` to disable decrypt timing counters in production if we do not want the measurement overhead.
53 changes: 53 additions & 0 deletions plugin/trino-ip-obfuscation/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>482-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>trino-ip-obfuscation</artifactId>
<packaging>trino-plugin</packaging>
<name>${project.artifactId}</name>
<description>Trino - Singular IP obfuscation functions</description>

<dependencies>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.singular.unilogs.trino.ip;

final class DailyKeyMaterial
{
private final byte[] keyBytes;

DailyKeyMaterial(byte[] keyBytes)
{
this.keyBytes = keyBytes.clone();
}

byte[] keyBytes()
{
return keyBytes.clone();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.singular.unilogs.trino.ip;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;

final class FileSnapshotLoader
implements SnapshotLoader
{
private final Path path;

private FileSnapshotLoader(Path path)
{
this.path = path;
}

static FileSnapshotLoader fromUri(URI uri)
{
return new FileSnapshotLoader(Path.of(uri));
}

@Override
public SnapshotBody load(Duration requestTimeout)
{
try {
return new SnapshotBody(
Files.readString(path),
"last_modified=" + Files.getLastModifiedTime(path).toInstant());
}
catch (IOException e) {
throw new IllegalStateException("Failed to read IP retention key snapshot file", e);
}
}

@Override
public String description()
{
return path.toUri().toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.singular.unilogs.trino.ip;

import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HexFormat;

public final class IpCrypto
{
private static final int KEY_BYTES = 32;
private static final ThreadLocal<MessageDigest> SHA512_DIGEST = ThreadLocal.withInitial(IpCrypto::newSha512Digest);

private IpCrypto() {}

static Slice decryptToSlice(IpEnvelope envelope, DailyKeyMaterial keyMaterial)
{
return Slices.wrappedBuffer(decryptBytes(envelope, keyMaterial));
}

private static byte[] decryptBytes(IpEnvelope envelope, DailyKeyMaterial keyMaterial)
{
if (keyMaterial == null) {
throw new IllegalArgumentException("keyMaterial is null");
}

byte[] iv = decodeHex(envelope.ivHex());
byte[] encryptedValue = decodeHex(envelope.valueHex());
byte[] xorKey = sha512(iv, keyMaterial.keyBytes());
byte[] decrypted = xor(encryptedValue, xorKey);

int firstNonPaddingByte = 0;
while (firstNonPaddingByte < decrypted.length && decrypted[firstNonPaddingByte] == ' ') {
firstNonPaddingByte++;
}
byte[] unpadded = new byte[decrypted.length - firstNonPaddingByte];
System.arraycopy(decrypted, firstNonPaddingByte, unpadded, 0, unpadded.length);
return unpadded;
}

static byte[] decodeSha256Key(String encodedKey)
{
byte[] key = HexFormat.of().parseHex(encodedKey);
validateKey(key);
return key;
}

static DailyKeyMaterial keyMaterial(byte[] key)
{
validateKey(key);
return new DailyKeyMaterial(key);
}

private static byte[] decodeHex(byte[] value)
{
return HexFormat.of().parseHex(new String(value, StandardCharsets.US_ASCII));
}

private static void validateKey(byte[] key)
{
if (key == null || key.length != KEY_BYTES) {
throw new IllegalArgumentException("IP retention key must be 32 bytes");
}
}

private static byte[] sha512(byte[] iv, byte[] key)
{
MessageDigest digest = SHA512_DIGEST.get();
digest.reset();
digest.update(iv);
digest.update(key);
return digest.digest();
}

private static byte[] xor(byte[] value, byte[] xorKey)
{
byte[] result = new byte[value.length];
for (int i = 0; i < value.length; i++) {
result[i] = (byte) (value[i] ^ xorKey[i]);
}
return result;
}

private static MessageDigest newSha512Digest()
{
try {
return MessageDigest.getInstance("SHA-512");
}
catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("SHA-512 is not available", e);
}
}
}
Loading