diff --git a/plugin/trino-ip-obfuscation/README.md b/plugin/trino-ip-obfuscation/README.md new file mode 100644 index 000000000000..e7a45ac34fcd --- /dev/null +++ b/plugin/trino-ip-obfuscation/README.md @@ -0,0 +1,95 @@ +# Trino IP Retention UDF + +Prototype Java Trino plugin for the Unilogs IP retention design. + +The function exposed to Trino is: + +```sql +singular_ip_decrypt(ip_payload varchar) returns varchar +``` + +It expects an encrypted IP envelope: + +```text +<||IV=<12_byte_iv_hex>||VALUE=<64_byte_ciphertext_hex>>> +``` + +MySQL `EncryptedVariable` remains the source of truth for daily keys. The daily key task publishes a small active-key snapshot to S3, and the UDF reads that snapshot into private Trino-worker memory. The row path does not call MySQL, Django, S3, or KMS. + +Each key is a 64-character SHA256 hex string in `aes_key_sha256` that decodes to the 32-byte daily key. + +Deleting the event-day key from MySQL and refreshing the key snapshot simulates 30-day retention expiry. In that case `singular_ip_decrypt(...)` returns `NULL`. + +For performance and simplicity, the UDF lazily loads the snapshot on the first decrypt call in each Trino worker and keeps it in private plugin memory with a TTL. Each snapshot entry is the daily key used by the legacy SHA512/XOR envelope. Each UDF row does an event-day map lookup and decrypts the fixed 64-byte ciphertext payload. The original value is treated as an opaque UTF-8 string; the UDF does not parse or normalize IP addresses. + +## Key Snapshot Configuration + +Production Trino workers should use a local Trino-side properties file. By default the plugin reads: + +```text +/etc/trino/singular-ip-udf.properties +``` + +Example: + +```properties +singular.ip.key-snapshot.uri=s3://ip-retention-key-snapshot-prod-us-west-2/active_snapshot.json +singular.ip.key-snapshot.require-s3=true +singular.ip.key-snapshot.s3-region=us-west-2 +singular.ip.key-snapshot.refresh-interval-seconds=43200 +singular.ip.key-snapshot.request-timeout-seconds=5 +``` + +The config file path can be overridden with JVM property `singular.ip.config.path` or env var `SINGULAR_IP_CONFIG_PATH`. + +JVM system properties and environment variables are still supported as overrides, mostly for local testing: + +| Environment variable | JVM property | Purpose | +|---|---|---| +| `SINGULAR_IP_KEY_SNAPSHOT_URI` | `singular.ip.key-snapshot.uri` | Key snapshot URI. Production should use `s3://bucket/key`; local tests may use `file://...`. Required on Trino workers. | +| `SINGULAR_IP_KEY_SNAPSHOT_REQUIRE_S3` | `singular.ip.key-snapshot.require-s3` | Reject non-S3 snapshot URIs. Set to `true` in production so stale local files cannot be used accidentally. Default: `false`. | +| `SINGULAR_IP_KEY_SNAPSHOT_S3_REGION` | `singular.ip.key-snapshot.s3-region` | Optional S3 region override. If omitted, AWS SDK default region resolution is used. | +| `SINGULAR_IP_KEY_SNAPSHOT_REFRESH_INTERVAL_SECONDS` | `singular.ip.key-snapshot.refresh-interval-seconds` | Warm-cache refresh interval. Default: `43200` / 12 hours. | +| `SINGULAR_IP_KEY_SNAPSHOT_REQUEST_TIMEOUT_SECONDS` | `singular.ip.key-snapshot.request-timeout-seconds` | Snapshot read timeout. Default: `5`. | + +The scheduled refresh is intentionally long because daily keys are generated once per day. If a row references an event day that is not present in the cached snapshot, the UDF returns `NULL`; missing keys do not trigger an extra snapshot refresh. + +## Build and test + +The host does not need Java/Maven. Use Docker: + +```bash +cd /Users/bar_zas/workspace/singular/tools/trino-ip-udf +docker run --rm \ + -v "$PWD":/workspace \ + -v "$HOME/.m2":/root/.m2 \ + -w /workspace \ + maven:3.9-eclipse-temurin-24 \ + mvn test package +``` + +The package step creates: + +```text +target/trino-ip-udf-plugin/ +``` + +That directory is the one to mount into Trino under `/usr/lib/trino/plugin/singular-ip-udf`. + +The plugin runtime is decrypt-only. Encryption fixtures live under `src/test` for unit tests; production encryption belongs in the enrichment writer. + +## Runtime logs + +The UDF logs decrypt timing aggregates without logging IP values, encrypted payloads, or key material. + +- `IpDecryptFunction` logs the first decrypt call and then every `100000` calls by default: total calls, success/missing-key/null/failure counters, total duration, average duration, and last-call duration. +- `KeySnapshotStore` logs `INFO` when a key snapshot has to be fetched/refreshed and how long the fetch took. +- `KeySnapshotStore` logs `DEBUG` when the existing in-memory snapshot is reused and when an event-day key lookup hits or misses the cached snapshot. + +The decrypt timing interval can be changed with JVM property: + +```text +-Dsingular.ip.decrypt-stats-log-interval-rows=100000 +``` + +Set it to `0` to disable decrypt timing counters in production if we do not want the measurement overhead. diff --git a/plugin/trino-ip-obfuscation/pom.xml b/plugin/trino-ip-obfuscation/pom.xml new file mode 100644 index 000000000000..be290192f66a --- /dev/null +++ b/plugin/trino-ip-obfuscation/pom.xml @@ -0,0 +1,53 @@ + + + 4.0.0 + + + io.trino + trino-root + 482-SNAPSHOT + ../../pom.xml + + + trino-ip-obfuscation + trino-plugin + ${project.artifactId} + Trino - Singular IP obfuscation functions + + + + + software.amazon.awssdk + s3 + + + io.airlift + slice + provided + + + + io.trino + trino-spi + provided + + + + org.assertj + assertj-core + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.junit.jupiter + junit-jupiter-engine + test + + + diff --git a/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/DailyKeyMaterial.java b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/DailyKeyMaterial.java new file mode 100644 index 000000000000..d27a6eeef3aa --- /dev/null +++ b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/DailyKeyMaterial.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.singular.unilogs.trino.ip; + +final class DailyKeyMaterial +{ + private final byte[] keyBytes; + + DailyKeyMaterial(byte[] keyBytes) + { + this.keyBytes = keyBytes.clone(); + } + + byte[] keyBytes() + { + return keyBytes.clone(); + } +} diff --git a/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/FileSnapshotLoader.java b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/FileSnapshotLoader.java new file mode 100644 index 000000000000..770538fca1a3 --- /dev/null +++ b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/FileSnapshotLoader.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.singular.unilogs.trino.ip; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; + +final class FileSnapshotLoader + implements SnapshotLoader +{ + private final Path path; + + private FileSnapshotLoader(Path path) + { + this.path = path; + } + + static FileSnapshotLoader fromUri(URI uri) + { + return new FileSnapshotLoader(Path.of(uri)); + } + + @Override + public SnapshotBody load(Duration requestTimeout) + { + try { + return new SnapshotBody( + Files.readString(path), + "last_modified=" + Files.getLastModifiedTime(path).toInstant()); + } + catch (IOException e) { + throw new IllegalStateException("Failed to read IP retention key snapshot file", e); + } + } + + @Override + public String description() + { + return path.toUri().toString(); + } +} diff --git a/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/IpCrypto.java b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/IpCrypto.java new file mode 100644 index 000000000000..2ca8639da59f --- /dev/null +++ b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/IpCrypto.java @@ -0,0 +1,108 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.singular.unilogs.trino.ip; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.HexFormat; + +public final class IpCrypto +{ + private static final int KEY_BYTES = 32; + private static final ThreadLocal SHA512_DIGEST = ThreadLocal.withInitial(IpCrypto::newSha512Digest); + + private IpCrypto() {} + + static Slice decryptToSlice(IpEnvelope envelope, DailyKeyMaterial keyMaterial) + { + return Slices.wrappedBuffer(decryptBytes(envelope, keyMaterial)); + } + + private static byte[] decryptBytes(IpEnvelope envelope, DailyKeyMaterial keyMaterial) + { + if (keyMaterial == null) { + throw new IllegalArgumentException("keyMaterial is null"); + } + + byte[] iv = decodeHex(envelope.ivHex()); + byte[] encryptedValue = decodeHex(envelope.valueHex()); + byte[] xorKey = sha512(iv, keyMaterial.keyBytes()); + byte[] decrypted = xor(encryptedValue, xorKey); + + int firstNonPaddingByte = 0; + while (firstNonPaddingByte < decrypted.length && decrypted[firstNonPaddingByte] == ' ') { + firstNonPaddingByte++; + } + byte[] unpadded = new byte[decrypted.length - firstNonPaddingByte]; + System.arraycopy(decrypted, firstNonPaddingByte, unpadded, 0, unpadded.length); + return unpadded; + } + + static byte[] decodeSha256Key(String encodedKey) + { + byte[] key = HexFormat.of().parseHex(encodedKey); + validateKey(key); + return key; + } + + static DailyKeyMaterial keyMaterial(byte[] key) + { + validateKey(key); + return new DailyKeyMaterial(key); + } + + private static byte[] decodeHex(byte[] value) + { + return HexFormat.of().parseHex(new String(value, StandardCharsets.US_ASCII)); + } + + private static void validateKey(byte[] key) + { + if (key == null || key.length != KEY_BYTES) { + throw new IllegalArgumentException("IP retention key must be 32 bytes"); + } + } + + private static byte[] sha512(byte[] iv, byte[] key) + { + MessageDigest digest = SHA512_DIGEST.get(); + digest.reset(); + digest.update(iv); + digest.update(key); + return digest.digest(); + } + + private static byte[] xor(byte[] value, byte[] xorKey) + { + byte[] result = new byte[value.length]; + for (int i = 0; i < value.length; i++) { + result[i] = (byte) (value[i] ^ xorKey[i]); + } + return result; + } + + private static MessageDigest newSha512Digest() + { + try { + return MessageDigest.getInstance("SHA-512"); + } + catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-512 is not available", e); + } + } +} diff --git a/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/IpDecryptFunction.java b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/IpDecryptFunction.java new file mode 100644 index 000000000000..91f9dd7a0615 --- /dev/null +++ b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/IpDecryptFunction.java @@ -0,0 +1,179 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.singular.unilogs.trino.ip; + +import io.airlift.slice.Slice; +import io.trino.spi.function.Description; +import io.trino.spi.function.ScalarFunction; +import io.trino.spi.function.SqlNullable; +import io.trino.spi.function.SqlType; +import io.trino.spi.type.StandardTypes; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +public final class IpDecryptFunction +{ + private static final System.Logger LOGGER = System.getLogger(IpDecryptFunction.class.getName()); + private static final long DECRYPT_STATS_LOG_INTERVAL = Long.getLong("singular.ip.decrypt-stats-log-interval-rows", 100_000L); + private static final LongAdder decryptCalls = new LongAdder(); + private static final LongAdder decryptSuccesses = new LongAdder(); + private static final LongAdder decryptMissingKeys = new LongAdder(); + private static final LongAdder decryptNullInputs = new LongAdder(); + private static final LongAdder decryptPassthrough = new LongAdder(); + private static final LongAdder decryptFailures = new LongAdder(); + private static final LongAdder decryptDurationNanos = new LongAdder(); + private static final AtomicLong nextDecryptStatsLogAt = new AtomicLong(1); + + private static volatile KeySnapshotStore keySnapshotStore; + + private IpDecryptFunction() {} + + @ScalarFunction(value = "singular_ip_decrypt", deterministic = false) + @Description("Decrypts a Singular Unilogs IP retention payload if the event-day key is available") + @SqlNullable + @SqlType(StandardTypes.VARCHAR) + public static Slice decrypt(@SqlNullable @SqlType(StandardTypes.VARCHAR) Slice payload) + { + long startNanos = shouldRecordDecryptStats() ? System.nanoTime() : 0; + DecryptOutcome outcome = DecryptOutcome.FAILURE; + + try { + if (payload == null) { + outcome = DecryptOutcome.NULL_INPUT; + return null; + } + if (payload.length() == 0) { + outcome = DecryptOutcome.PASSTHROUGH; + return payload; + } + + IpEnvelope envelope = IpEnvelope.parse(payload); + DailyKeyMaterial keyMaterial = keySnapshotStore().getKeyMaterial(envelope.eventDay()); + if (keyMaterial == null) { + outcome = DecryptOutcome.MISSING_KEY; + return null; + } + Slice decrypted = IpCrypto.decryptToSlice(envelope, keyMaterial); + outcome = DecryptOutcome.SUCCESS; + return decrypted; + } + catch (NotEncryptedIpException e) { + outcome = DecryptOutcome.PASSTHROUGH; + return payload; + } + catch (RuntimeException e) { + outcome = DecryptOutcome.FAILURE; + return null; + } + finally { + recordDecryptStats(outcome, startNanos); + } + } + + private static KeySnapshotStore keySnapshotStore() + { + KeySnapshotStore current = keySnapshotStore; + if (current != null) { + return current; + } + + synchronized (IpDecryptFunction.class) { + current = keySnapshotStore; + if (current == null) { + current = KeySnapshotStore.fromEnvironment(); + keySnapshotStore = current; + } + return current; + } + } + + static void resetKeySnapshotStoreForTests() + { + keySnapshotStore = null; + decryptCalls.reset(); + decryptSuccesses.reset(); + decryptMissingKeys.reset(); + decryptNullInputs.reset(); + decryptPassthrough.reset(); + decryptFailures.reset(); + decryptDurationNanos.reset(); + nextDecryptStatsLogAt.set(1); + } + + private static boolean shouldRecordDecryptStats() + { + return DECRYPT_STATS_LOG_INTERVAL > 0; + } + + private static void recordDecryptStats(DecryptOutcome outcome, long startNanos) + { + if (!shouldRecordDecryptStats()) { + return; + } + + long durationNanos = System.nanoTime() - startNanos; + decryptCalls.increment(); + decryptDurationNanos.add(durationNanos); + switch (outcome) { + case SUCCESS -> decryptSuccesses.increment(); + case MISSING_KEY -> decryptMissingKeys.increment(); + case NULL_INPUT -> decryptNullInputs.increment(); + case PASSTHROUGH -> decryptPassthrough.increment(); + case FAILURE -> decryptFailures.increment(); + } + + long calls = decryptCalls.sum(); + if (!shouldLogDecryptStats(calls)) { + return; + } + + long totalNanos = decryptDurationNanos.sum(); + LOGGER.log( + System.Logger.Level.INFO, + "IP retention decrypt stats: calls=" + calls + + ", success=" + decryptSuccesses.sum() + + ", missing_key=" + decryptMissingKeys.sum() + + ", null_input=" + decryptNullInputs.sum() + + ", passthrough=" + decryptPassthrough.sum() + + ", failure=" + decryptFailures.sum() + + ", total_ms=" + totalNanos / 1_000_000 + + ", avg_us=" + totalNanos / Math.max(calls, 1) / 1_000 + + ", last_us=" + durationNanos / 1_000); + } + + private static boolean shouldLogDecryptStats(long calls) + { + while (true) { + long nextLogAt = nextDecryptStatsLogAt.get(); + if (calls < nextLogAt) { + return false; + } + + long next = nextLogAt == 1 ? DECRYPT_STATS_LOG_INTERVAL : nextLogAt + DECRYPT_STATS_LOG_INTERVAL; + if (nextDecryptStatsLogAt.compareAndSet(nextLogAt, next)) { + return true; + } + } + } + + private enum DecryptOutcome + { + SUCCESS, + MISSING_KEY, + NULL_INPUT, + PASSTHROUGH, + FAILURE, + } +} diff --git a/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/IpEnvelope.java b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/IpEnvelope.java new file mode 100644 index 000000000000..973e2b18c916 --- /dev/null +++ b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/IpEnvelope.java @@ -0,0 +1,141 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.singular.unilogs.trino.ip; + +import io.airlift.slice.Slice; + +import java.nio.charset.StandardCharsets; + +public final class IpEnvelope +{ + private static final String PREFIX = "<> getFunctions() + { + return Set.of(IpDecryptFunction.class); + } +} diff --git a/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/KeySnapshotStore.java b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/KeySnapshotStore.java new file mode 100644 index 000000000000..37617ea7a500 --- /dev/null +++ b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/KeySnapshotStore.java @@ -0,0 +1,393 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.singular.unilogs.trino.ip; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +final class KeySnapshotStore +{ + static final String CONFIG_PATH_PROPERTY = "singular.ip.config.path"; + static final String CONFIG_PATH_ENV = "SINGULAR_IP_CONFIG_PATH"; + static final String DEFAULT_CONFIG_PATH = "/etc/trino/singular-ip-udf.properties"; + static final String SNAPSHOT_URI_PROPERTY = "singular.ip.key-snapshot.uri"; + static final String SNAPSHOT_URI_ENV = "SINGULAR_IP_KEY_SNAPSHOT_URI"; + static final String REQUIRE_S3_SNAPSHOT_PROPERTY = "singular.ip.key-snapshot.require-s3"; + static final String REQUIRE_S3_SNAPSHOT_ENV = "SINGULAR_IP_KEY_SNAPSHOT_REQUIRE_S3"; + static final String SNAPSHOT_S3_REGION_PROPERTY = "singular.ip.key-snapshot.s3-region"; + static final String SNAPSHOT_S3_REGION_ENV = "SINGULAR_IP_KEY_SNAPSHOT_S3_REGION"; + static final String REFRESH_INTERVAL_SECONDS_PROPERTY = "singular.ip.key-snapshot.refresh-interval-seconds"; + static final String REFRESH_INTERVAL_SECONDS_ENV = "SINGULAR_IP_KEY_SNAPSHOT_REFRESH_INTERVAL_SECONDS"; + static final String REQUEST_TIMEOUT_SECONDS_PROPERTY = "singular.ip.key-snapshot.request-timeout-seconds"; + static final String REQUEST_TIMEOUT_SECONDS_ENV = "SINGULAR_IP_KEY_SNAPSHOT_REQUEST_TIMEOUT_SECONDS"; + + private static final System.Logger LOGGER = System.getLogger(KeySnapshotStore.class.getName()); + private static final Duration DEFAULT_REFRESH_INTERVAL = Duration.ofHours(12); + private static final Duration DEFAULT_REFRESH_FAILURE_COOLDOWN = Duration.ofSeconds(30); + private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(5); + private static final Pattern STRING_FIELD_PATTERN = Pattern.compile("\"%s\"\\s*:\\s*\"([^\"]*)\""); + private static final Pattern KEY_OBJECT_PATTERN = Pattern.compile("\\{[^{}]*\"event_day\"[^{}]*}"); + + private final SnapshotLoader snapshotLoader; + private final Duration refreshInterval; + private final Duration requestTimeout; + private final Clock clock; + private volatile Snapshot snapshot = Snapshot.empty(); + private volatile Instant nextRefresh = Instant.EPOCH; + private final AtomicBoolean loggedCachedSnapshotUse = new AtomicBoolean(); + private final ReentrantLock refreshLock = new ReentrantLock(); + + KeySnapshotStore( + SnapshotLoader snapshotLoader, + Duration refreshInterval, + Duration requestTimeout, + Clock clock) + { + this.snapshotLoader = snapshotLoader; + this.refreshInterval = refreshInterval; + this.requestTimeout = requestTimeout; + this.clock = clock; + } + + static KeySnapshotStore fromEnvironment() + { + Config config = Config.load(); + String snapshotUri = config.read(SNAPSHOT_URI_PROPERTY, SNAPSHOT_URI_ENV); + if (snapshotUri.isBlank()) { + throw new IllegalStateException("Missing key snapshot URI"); + } + validateSnapshotUri(snapshotUri, configuredBoolean(config, REQUIRE_S3_SNAPSHOT_PROPERTY, REQUIRE_S3_SNAPSHOT_ENV, false)); + + return new KeySnapshotStore( + SnapshotLoader.fromUri(snapshotUri, config.read(SNAPSHOT_S3_REGION_PROPERTY, SNAPSHOT_S3_REGION_ENV)), + configuredDuration(config, REFRESH_INTERVAL_SECONDS_PROPERTY, REFRESH_INTERVAL_SECONDS_ENV, DEFAULT_REFRESH_INTERVAL), + configuredDuration(config, REQUEST_TIMEOUT_SECONDS_PROPERTY, REQUEST_TIMEOUT_SECONDS_ENV, DEFAULT_REQUEST_TIMEOUT), + Clock.systemUTC()); + } + + private static void validateSnapshotUri(String snapshotUri, boolean requireS3Snapshot) + { + if (requireS3Snapshot && !snapshotUri.regionMatches(true, 0, "s3://", 0, "s3://".length())) { + throw new IllegalStateException("Production key snapshot URI must use s3://"); + } + } + + public DailyKeyMaterial getKeyMaterial(String eventDay) + { + boolean refreshed = refreshIfNeeded(); + LocalDate today = today(); + Snapshot currentSnapshot = snapshot; + DailyKeyMaterial keyMaterial = currentSnapshot.keyMaterial(eventDay, today); + if (keyMaterial != null) { + logKeyLookup(refreshed ? "hit_after_refresh" : "hit_cached_snapshot", eventDay, currentSnapshot); + return keyMaterial; + } + + logKeyLookup(refreshed ? "miss_after_refresh" : "miss_cached_snapshot", eventDay, currentSnapshot); + return keyMaterial; + } + + private boolean refreshIfNeeded() + { + Instant now = clock.instant(); + if (now.isBefore(nextRefresh)) { + logFirstCachedSnapshotUse(nextRefresh); + logCachedSnapshotUse("warm_cache", now, nextRefresh); + return false; + } + + boolean blockForRefresh = shouldBlockForRefresh(); + boolean locked = false; + if (blockForRefresh) { + refreshLock.lock(); + locked = true; + } + else if (refreshLock.tryLock()) { + locked = true; + } + else { + logCachedSnapshotUse("refresh_already_running", now, nextRefresh); + return false; + } + + try { + now = clock.instant(); + if (now.isBefore(nextRefresh)) { + logFirstCachedSnapshotUse(nextRefresh); + logCachedSnapshotUse("warm_cache_after_lock", now, nextRefresh); + return false; + } + + Snapshot previousSnapshot = snapshot; + LOGGER.log( + System.Logger.Level.INFO, + "IP retention key snapshot cache miss: reason=scheduled_or_cold_refresh, keys_before=" + + previousSnapshot.keys().size()); + RefreshResult refreshResult = fetchSnapshot(); + if (refreshResult.snapshot().isPresent()) { + snapshot = refreshResult.snapshot().get(); + nextRefresh = now.plus(refreshInterval); + loggedCachedSnapshotUse.set(false); + LOGGER.log( + System.Logger.Level.INFO, + "Loaded IP retention key snapshot: keys=" + snapshot.keys().size() + + ", duration_ms=" + refreshResult.duration().toMillis() + + ", source=" + refreshResult.sourceVersion().orElse(snapshotLoader.description())); + } + else { + nextRefresh = now.plus(DEFAULT_REFRESH_FAILURE_COOLDOWN); + LOGGER.log( + System.Logger.Level.WARNING, + "Failed to load IP retention key snapshot: duration_ms=" + + refreshResult.duration().toMillis() + + ", source=" + snapshotLoader.description()); + } + + return true; + } + finally { + if (locked) { + refreshLock.unlock(); + } + } + } + + private boolean shouldBlockForRefresh() + { + return nextRefresh.equals(Instant.EPOCH); + } + + private void logFirstCachedSnapshotUse(Instant nextAllowedRefresh) + { + if (!loggedCachedSnapshotUse.compareAndSet(false, true)) { + return; + } + LOGGER.log( + System.Logger.Level.INFO, + "Using cached IP retention key snapshot: keys=" + snapshot.keys().size() + + ", next_allowed_refresh=" + nextAllowedRefresh); + } + + private void logCachedSnapshotUse(String reason, Instant now, Instant nextAllowedRefresh) + { + if (!LOGGER.isLoggable(System.Logger.Level.DEBUG)) { + return; + } + LOGGER.log( + System.Logger.Level.DEBUG, + "Using cached IP retention key snapshot: reason=" + reason + + ", keys=" + snapshot.keys().size() + + ", now=" + now + + ", next_allowed_refresh=" + nextAllowedRefresh); + } + + private static void logKeyLookup(String state, String eventDay, Snapshot snapshot) + { + if (!LOGGER.isLoggable(System.Logger.Level.DEBUG)) { + return; + } + LOGGER.log( + System.Logger.Level.DEBUG, + "IP retention key lookup: state=" + state + + ", event_day=" + eventDay + + ", cached_keys=" + snapshot.keys().size()); + } + + private RefreshResult fetchSnapshot() + { + Instant start = clock.instant(); + try { + SnapshotLoader.SnapshotBody snapshotBody = snapshotLoader.load(requestTimeout); + return RefreshResult.success(parseSnapshot(snapshotBody.body()), durationSince(start), snapshotBody.version()); + } + catch (RuntimeException e) { + return RefreshResult.failure(durationSince(start)); + } + } + + private Duration durationSince(Instant start) + { + return Duration.between(start, clock.instant()); + } + + private LocalDate today() + { + return LocalDate.now(clock.withZone(ZoneOffset.UTC)); + } + + static Snapshot parseSnapshot(String body) + { + if (body == null || body.isBlank()) { + return Snapshot.empty(); + } + + String keyEncoding = stringField(body, "key_encoding").orElse("sha256"); + if (!"sha256".equals(keyEncoding)) { + return Snapshot.empty(); + } + + Map keys = new HashMap<>(); + + Matcher matcher = KEY_OBJECT_PATTERN.matcher(body); + while (matcher.find()) { + String keyObject = matcher.group(); + Optional eventDay = stringField(keyObject, "event_day"); + Optional aesKeySha256 = stringField(keyObject, "aes_key_sha256"); + if (eventDay.isEmpty() || aesKeySha256.isEmpty()) { + continue; + } + + try { + LocalDate notAfter = stringField(keyObject, "not_after") + .map(LocalDate::parse) + .orElse(null); + keys.put( + eventDay.get(), + new CachedKey(IpCrypto.keyMaterial(IpCrypto.decodeSha256Key(aesKeySha256.get())), notAfter)); + } + catch (RuntimeException ignored) { + // Invalid snapshot entries behave like missing keys. + } + } + + return new Snapshot(Map.copyOf(keys)); + } + + private static Optional stringField(String value, String field) + { + Matcher matcher = Pattern.compile(String.format(STRING_FIELD_PATTERN.pattern(), Pattern.quote(field))).matcher(value); + if (!matcher.find()) { + return Optional.empty(); + } + return Optional.of(matcher.group(1)); + } + + private static Duration configuredDuration(Config config, String property, String env, Duration defaultDuration) + { + String value = config.read(property, env); + if (value.isBlank()) { + return defaultDuration; + } + return Duration.ofSeconds(Long.parseLong(value)); + } + + private static boolean configuredBoolean(Config config, String property, String env, boolean defaultValue) + { + String value = config.read(property, env); + if (value.isBlank()) { + return defaultValue; + } + return Boolean.parseBoolean(value); + } + + private record Config(Properties fileProperties) + { + static Config load() + { + return new Config(loadProperties(configPath())); + } + + String read(String property, String env) + { + String value = System.getProperty(property); + if (value == null || value.isBlank()) { + value = System.getenv(env); + } + if (value == null || value.isBlank()) { + value = fileProperties.getProperty(property); + } + return value == null ? "" : value.trim(); + } + + private static Path configPath() + { + String configuredPath = System.getProperty(CONFIG_PATH_PROPERTY); + if (configuredPath == null || configuredPath.isBlank()) { + configuredPath = System.getenv(CONFIG_PATH_ENV); + } + if (configuredPath == null || configuredPath.isBlank()) { + configuredPath = DEFAULT_CONFIG_PATH; + } + return Path.of(configuredPath.trim()); + } + + private static Properties loadProperties(Path path) + { + Properties properties = new Properties(); + if (!Files.exists(path)) { + return properties; + } + + try (var inputStream = Files.newInputStream(path)) { + properties.load(inputStream); + } + catch (IOException e) { + LOGGER.log(System.Logger.Level.WARNING, "Failed to load IP retention UDF config file: " + path); + } + return properties; + } + } + + record Snapshot(Map keys) + { + static Snapshot empty() + { + return new Snapshot(Map.of()); + } + + DailyKeyMaterial keyMaterial(String eventDay, LocalDate today) + { + CachedKey cachedKey = keys.get(eventDay); + if (cachedKey == null || cachedKey.isExpired(today)) { + return null; + } + return cachedKey.keyMaterial(); + } + } + + record CachedKey(DailyKeyMaterial keyMaterial, LocalDate notAfter) + { + boolean isExpired(LocalDate today) + { + return notAfter != null && today.isAfter(notAfter); + } + } + + private record RefreshResult(Optional snapshot, Duration duration, Optional sourceVersion) + { + static RefreshResult success(Snapshot snapshot, Duration duration, String sourceVersion) + { + return new RefreshResult(Optional.of(snapshot), duration, Optional.ofNullable(sourceVersion)); + } + + static RefreshResult failure(Duration duration) + { + return new RefreshResult(Optional.empty(), duration, Optional.empty()); + } + } +} diff --git a/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/NotEncryptedIpException.java b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/NotEncryptedIpException.java new file mode 100644 index 000000000000..5c0b6f075ee1 --- /dev/null +++ b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/NotEncryptedIpException.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.singular.unilogs.trino.ip; + +final class NotEncryptedIpException + extends RuntimeException +{ + NotEncryptedIpException(String message) + { + super(message); + } +} diff --git a/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/S3SnapshotLoader.java b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/S3SnapshotLoader.java new file mode 100644 index 000000000000..011a43359fa4 --- /dev/null +++ b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/S3SnapshotLoader.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.singular.unilogs.trino.ip; + +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Duration; + +final class S3SnapshotLoader + implements SnapshotLoader +{ + private final S3Client s3Client; + private final String bucket; + private final String key; + + private S3SnapshotLoader(S3Client s3Client, String bucket, String key) + { + this.s3Client = s3Client; + this.bucket = bucket; + this.key = key; + } + + static S3SnapshotLoader fromUri(URI uri, String s3Region) + { + String bucket = uri.getHost(); + String key = uri.getPath() == null ? "" : uri.getPath().replaceFirst("^/", ""); + if (bucket == null || bucket.isBlank() || key.isBlank()) { + throw new IllegalArgumentException("S3 key snapshot URI must include bucket and key"); + } + + S3ClientBuilder builder = S3Client.builder(); + if (s3Region != null && !s3Region.isBlank()) { + builder.region(Region.of(s3Region)); + } + return new S3SnapshotLoader(builder.build(), bucket, key); + } + + @Override + public SnapshotBody load(Duration requestTimeout) + { + GetObjectRequest request = GetObjectRequest.builder() + .bucket(bucket) + .key(key) + .overrideConfiguration(configuration -> configuration.apiCallTimeout(requestTimeout)) + .build(); + ResponseBytes response = s3Client.getObjectAsBytes(request); + return new SnapshotBody( + new String(response.asByteArray(), StandardCharsets.UTF_8), + "etag=" + response.response().eTag() + ", last_modified=" + response.response().lastModified()); + } + + @Override + public String description() + { + return "s3://" + bucket + "/" + key; + } +} diff --git a/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/SnapshotLoader.java b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/SnapshotLoader.java new file mode 100644 index 000000000000..2fcf65973e14 --- /dev/null +++ b/plugin/trino-ip-obfuscation/src/main/java/com/singular/unilogs/trino/ip/SnapshotLoader.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.singular.unilogs.trino.ip; + +import java.net.URI; +import java.time.Duration; + +interface SnapshotLoader +{ + SnapshotBody load(Duration requestTimeout); + + String description(); + + static SnapshotLoader fromUri(String snapshotUri, String s3Region) + { + URI uri = URI.create(snapshotUri); + String scheme = uri.getScheme(); + if ("s3".equalsIgnoreCase(scheme)) { + return S3SnapshotLoader.fromUri(uri, s3Region); + } + if ("file".equalsIgnoreCase(scheme)) { + return FileSnapshotLoader.fromUri(uri); + } + throw new IllegalArgumentException("Unsupported key snapshot URI scheme: " + scheme); + } + + record SnapshotBody(String body, String version) {} +} diff --git a/plugin/trino-ip-obfuscation/src/test/java/com/singular/unilogs/trino/ip/IpCryptoFixtures.java b/plugin/trino-ip-obfuscation/src/test/java/com/singular/unilogs/trino/ip/IpCryptoFixtures.java new file mode 100644 index 000000000000..88e5e710defe --- /dev/null +++ b/plugin/trino-ip-obfuscation/src/test/java/com/singular/unilogs/trino/ip/IpCryptoFixtures.java @@ -0,0 +1,99 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.singular.unilogs.trino.ip; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.HexFormat; + +final class IpCryptoFixtures +{ + private static final int KEY_BYTES = 32; + private static final int IV_BYTES = 12; + private static final int CIPHER_BYTES = 64; + private static final SecureRandom SECURE_RANDOM = new SecureRandom(); + + private IpCryptoFixtures() {} + + static byte[] generateKey() + { + byte[] key = new byte[KEY_BYTES]; + SECURE_RANDOM.nextBytes(key); + return key; + } + + static byte[] generateAesKey() + { + return generateKey(); + } + + static String encrypt(String originalValue, String eventDay, byte[] key) + { + if (originalValue == null || originalValue.isEmpty()) { + throw new IllegalArgumentException("originalValue is blank"); + } + + byte[] iv = new byte[IV_BYTES]; + SECURE_RANDOM.nextBytes(iv); + byte[] xorKey = sha512(iv, key); + byte[] padded = leftPad(originalValue.getBytes(StandardCharsets.UTF_8)); + byte[] encryptedValue = xor(padded, xorKey); + return new IpEnvelope(eventDay, hex(iv), hex(encryptedValue)).serialize(); + } + + static String sha256Key(byte[] key) + { + return HexFormat.of().formatHex(key); + } + + private static byte[] leftPad(byte[] value) + { + byte[] padded = new byte[CIPHER_BYTES]; + int paddingBytes = Math.max(0, CIPHER_BYTES - value.length); + for (int i = 0; i < paddingBytes; i++) { + padded[i] = ' '; + } + System.arraycopy(value, 0, padded, paddingBytes, Math.min(value.length, CIPHER_BYTES)); + return padded; + } + + private static byte[] sha512(byte[] iv, byte[] key) + { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-512"); + digest.update(iv); + digest.update(key); + return digest.digest(); + } + catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-512 is not available", e); + } + } + + private static byte[] xor(byte[] value, byte[] xorKey) + { + byte[] result = new byte[value.length]; + for (int i = 0; i < value.length; i++) { + result[i] = (byte) (value[i] ^ xorKey[i]); + } + return result; + } + + private static String hex(byte[] value) + { + return HexFormat.of().formatHex(value); + } +} diff --git a/plugin/trino-ip-obfuscation/src/test/java/com/singular/unilogs/trino/ip/IpCryptoTest.java b/plugin/trino-ip-obfuscation/src/test/java/com/singular/unilogs/trino/ip/IpCryptoTest.java new file mode 100644 index 000000000000..e66db40b52e9 --- /dev/null +++ b/plugin/trino-ip-obfuscation/src/test/java/com/singular/unilogs/trino/ip/IpCryptoTest.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.singular.unilogs.trino.ip; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class IpCryptoTest +{ + @Test + void roundTripsIpv4Exactly() + { + byte[] key = IpCryptoFixtures.generateAesKey(); + String payload = IpCryptoFixtures.encrypt("1.2.3.4", "2026-04-26", key); + + assertThat(decrypt(payload, key)).isEqualTo("1.2.3.4"); + } + + @Test + void roundTripsIpv6Exactly() + { + byte[] key = IpCryptoFixtures.generateAesKey(); + String originalIp = "2001:0db8:85a3:0000:0000:8a2e:0370:7334"; + String payload = IpCryptoFixtures.encrypt(originalIp, "2026-04-26", key); + + assertThat(decrypt(payload, key)).isEqualTo(originalIp); + } + + @Test + void decryptsArbitraryStringWithLegacyLeftPadding() + { + byte[] key = IpCryptoFixtures.generateAesKey(); + String originalValue = " raw value with spaces "; + String payload = IpCryptoFixtures.encrypt(originalValue, "2026-04-26", key); + + assertThat(decrypt(payload, key)).isEqualTo(originalValue.stripLeading()); + } + + @Test + void encryptionUsesRandomNonce() + { + byte[] key = IpCryptoFixtures.generateAesKey(); + String first = IpCryptoFixtures.encrypt("1.2.3.4", "2026-04-26", key); + String second = IpCryptoFixtures.encrypt("1.2.3.4", "2026-04-26", key); + + assertThat(first).isNotEqualTo(second); + } + + @Test + void wrongKeyDecryptsToDifferentGarbageValue() + { + String payload = IpCryptoFixtures.encrypt("1.2.3.4", "2026-04-26", IpCryptoFixtures.generateAesKey()); + + assertThat(IpCrypto.decryptToSlice( + IpEnvelope.parse(payload), + IpCrypto.keyMaterial(IpCryptoFixtures.generateAesKey())).toStringUtf8()) + .isNotEqualTo("1.2.3.4"); + } + + @Test + void eventDayIsOnlyUsedForKeyLookup() + { + byte[] key = IpCryptoFixtures.generateAesKey(); + String payload = IpCryptoFixtures.encrypt("1.2.3.4", "2026-04-26", key); + String tamperedPayload = payload.replace("DAY=2026-04-26", "DAY=2026-04-27"); + + assertThat(IpCrypto.decryptToSlice( + IpEnvelope.parse(tamperedPayload), + IpCrypto.keyMaterial(key)).toStringUtf8()) + .isEqualTo("1.2.3.4"); + } + + @Test + void decryptsWithCachedKeyMaterial() + { + byte[] key = IpCryptoFixtures.generateAesKey(); + String payload = IpCryptoFixtures.encrypt("1.2.3.4", "2026-04-26", key); + + assertThat(IpCrypto.decryptToSlice( + IpEnvelope.parse(payload), + IpCrypto.keyMaterial(key)).toStringUtf8()) + .isEqualTo("1.2.3.4"); + } + + @Test + void decodesSha256KeysFromDjangoKeyService() + { + byte[] key = IpCryptoFixtures.generateAesKey(); + String payload = IpCryptoFixtures.encrypt("1.2.3.4", "2026-04-26", key); + + assertThat(IpCrypto.decryptToSlice( + IpEnvelope.parse(payload), + IpCrypto.keyMaterial(IpCrypto.decodeSha256Key(IpCryptoFixtures.sha256Key(key)))).toStringUtf8()) + .isEqualTo("1.2.3.4"); + } + + @Test + void payloadDoesNotContainRawIp() + { + byte[] key = IpCryptoFixtures.generateAesKey(); + String payload = IpCryptoFixtures.encrypt("1.2.3.4", "2026-04-26", key); + + assertThat(payload).startsWith("< + assertThat(IpDecryptFunction.decrypt(Slices.utf8Slice(payload)).toStringUtf8()).isEqualTo("1.2.3.4")); + } + + @Test + void returnsNullWhenKeyIsMissing() + throws Exception + { + String payload = IpCryptoFixtures.encrypt("1.2.3.4", "2026-04-26", IpCryptoFixtures.generateAesKey()); + + withSnapshot(emptySnapshotBody(), () -> + assertThat(IpDecryptFunction.decrypt(Slices.utf8Slice(payload))).isNull()); + } + + @Test + void returnsNullForMalformedPayload() + { + IpDecryptFunction.resetKeySnapshotStoreForTests(); + + assertThat(IpDecryptFunction.decrypt(Slices.utf8Slice("not-an-envelope"))).isNull(); + } + + private static void withSnapshot(String body, ThrowingRunnable assertion) + throws Exception + { + Path snapshotPath = Files.createTempFile("ip-retention-key-snapshot", ".json"); + String previousSnapshotUri = System.getProperty(KeySnapshotStore.SNAPSHOT_URI_PROPERTY); + try { + Files.writeString(snapshotPath, body); + System.setProperty( + KeySnapshotStore.SNAPSHOT_URI_PROPERTY, + snapshotPath.toUri().toString()); + IpDecryptFunction.resetKeySnapshotStoreForTests(); + assertion.run(); + } + finally { + restoreProperty(KeySnapshotStore.SNAPSHOT_URI_PROPERTY, previousSnapshotUri); + IpDecryptFunction.resetKeySnapshotStoreForTests(); + Files.deleteIfExists(snapshotPath); + } + } + + private static void restoreProperty(String key, String value) + { + if (value == null) { + System.clearProperty(key); + } + else { + System.setProperty(key, value); + } + } + + private static String emptySnapshotBody() + { + return "{" + + "\"generated_at\":\"2026-05-01T00:00:00+00:00\"," + + "\"retention_days\":30," + + "\"forward_prefetch_days\":10," + + "\"key_encoding\":\"sha256\"," + + "\"keys\":[]" + + "}"; + } + + private static String snapshotBody(String eventDay, String keySha256, String notAfter) + { + return "{" + + "\"generated_at\":\"2026-05-01T00:00:00+00:00\"," + + "\"retention_days\":30," + + "\"forward_prefetch_days\":10," + + "\"key_encoding\":\"sha256\"," + + "\"keys\":[{" + + "\"event_day\":\"" + eventDay + "\"," + + "\"key_name\":\"IP_OBFUSCATION_DAY_" + eventDay + "\"," + + "\"aes_key_sha256\":\"" + keySha256 + "\"," + + "\"not_after\":\"" + notAfter + "\"" + + "}]" + + "}"; + } + + @FunctionalInterface + private interface ThrowingRunnable + { + void run() + throws Exception; + } +} diff --git a/plugin/trino-ip-obfuscation/src/test/java/com/singular/unilogs/trino/ip/KeySnapshotStoreTest.java b/plugin/trino-ip-obfuscation/src/test/java/com/singular/unilogs/trino/ip/KeySnapshotStoreTest.java new file mode 100644 index 000000000000..0118f9981075 --- /dev/null +++ b/plugin/trino-ip-obfuscation/src/test/java/com/singular/unilogs/trino/ip/KeySnapshotStoreTest.java @@ -0,0 +1,311 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.singular.unilogs.trino.ip; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class KeySnapshotStoreTest +{ + @TempDir + Path tempDir; + + @Test + void parsesDjangoKeySnapshotWithSha256Keys() + { + byte[] key = IpCryptoFixtures.generateAesKey(); + String payload = IpCryptoFixtures.encrypt("1.2.3.4", "2026-04-26", key); + + KeySnapshotStore.Snapshot snapshot = KeySnapshotStore.parseSnapshot(snapshotBody( + "2026-04-26", + IpCryptoFixtures.sha256Key(key), + "2026-05-26")); + + DailyKeyMaterial keyMaterial = snapshot.keyMaterial("2026-04-26", LocalDate.parse("2026-05-01")); + assertThat(keyMaterial).isNotNull(); + assertThat(IpCrypto.decryptToSlice(IpEnvelope.parse(payload), keyMaterial).toStringUtf8()).isEqualTo("1.2.3.4"); + } + + @Test + void notAfterExpiresCachedKey() + { + KeySnapshotStore.Snapshot snapshot = KeySnapshotStore.parseSnapshot(snapshotBody( + "2026-04-26", + IpCryptoFixtures.sha256Key(IpCryptoFixtures.generateAesKey()), + "2026-05-26")); + + assertThat(snapshot.keyMaterial("2026-04-26", LocalDate.parse("2026-05-26"))).isNotNull(); + assertThat(snapshot.keyMaterial("2026-04-26", LocalDate.parse("2026-05-27"))).isNull(); + } + + @Test + void unsupportedEncodingBehavesLikeMissingKeys() + { + String body = snapshotBody("2026-04-26", IpCryptoFixtures.sha256Key(IpCryptoFixtures.generateAesKey()), "2026-05-26") + .replace("\"key_encoding\":\"sha256\"", "\"key_encoding\":\"base64\""); + + KeySnapshotStore.Snapshot snapshot = KeySnapshotStore.parseSnapshot(body); + + assertThat(snapshot.keyMaterial("2026-04-26", LocalDate.parse("2026-05-01"))).isNull(); + } + + @Test + void loadsSnapshotThroughConfiguredLoader() + { + byte[] key = IpCryptoFixtures.generateAesKey(); + KeySnapshotStore store = new KeySnapshotStore( + new StaticSnapshotLoader(snapshotBody("2026-04-26", IpCryptoFixtures.sha256Key(key), "2026-05-26")), + Duration.ofMinutes(5), + Duration.ofSeconds(5), + Clock.fixed(Instant.parse("2026-05-01T00:00:00Z"), ZoneOffset.UTC)); + + assertThat(store.getKeyMaterial("2026-04-26")).isNotNull(); + } + + @Test + void loadsConfigurationFromTrinoPropertiesFile() + throws Exception + { + byte[] key = IpCryptoFixtures.generateAesKey(); + Path snapshotPath = tempDir.resolve("ip-retention-key-snapshot.json"); + Path configPath = tempDir.resolve("singular-ip-udf.properties"); + String previousConfigPath = System.getProperty(KeySnapshotStore.CONFIG_PATH_PROPERTY); + try { + Files.writeString(snapshotPath, snapshotBody("2026-04-26", IpCryptoFixtures.sha256Key(key), "2026-05-26")); + Files.writeString( + configPath, + KeySnapshotStore.SNAPSHOT_URI_PROPERTY + "=" + snapshotPath.toUri() + "\n" + + KeySnapshotStore.REFRESH_INTERVAL_SECONDS_PROPERTY + "=43200\n"); + System.setProperty(KeySnapshotStore.CONFIG_PATH_PROPERTY, configPath.toString()); + + KeySnapshotStore store = KeySnapshotStore.fromEnvironment(); + + assertThat(store.getKeyMaterial("2026-04-26")).isNotNull(); + } + finally { + restoreProperty(KeySnapshotStore.CONFIG_PATH_PROPERTY, previousConfigPath); + } + } + + @Test + void rejectsFileSnapshotWhenS3IsRequired() + throws Exception + { + Path snapshotPath = tempDir.resolve("ip-retention-key-snapshot.json"); + Path configPath = tempDir.resolve("singular-ip-udf.properties"); + String previousConfigPath = System.getProperty(KeySnapshotStore.CONFIG_PATH_PROPERTY); + try { + Files.writeString(snapshotPath, snapshotBody("2026-04-26", IpCryptoFixtures.sha256Key(IpCryptoFixtures.generateAesKey()), "2026-05-26")); + Files.writeString( + configPath, + KeySnapshotStore.SNAPSHOT_URI_PROPERTY + "=" + snapshotPath.toUri() + "\n" + + KeySnapshotStore.REQUIRE_S3_SNAPSHOT_PROPERTY + "=true\n"); + System.setProperty(KeySnapshotStore.CONFIG_PATH_PROPERTY, configPath.toString()); + + assertThatThrownBy(KeySnapshotStore::fromEnvironment) + .isInstanceOf(IllegalStateException.class); + } + finally { + restoreProperty(KeySnapshotStore.CONFIG_PATH_PROPERTY, previousConfigPath); + } + } + + @Test + void scheduledRefreshKeepsServingCachedKeysWhenRefreshIsAlreadyRunning() + throws Exception + { + byte[] key = IpCryptoFixtures.generateAesKey(); + BlockingRefreshSnapshotLoader loader = new BlockingRefreshSnapshotLoader( + snapshotBody("2026-04-26", IpCryptoFixtures.sha256Key(key), "2026-05-26")); + KeySnapshotStore store = new KeySnapshotStore( + loader, + Duration.ZERO, + Duration.ofSeconds(5), + Clock.fixed(Instant.parse("2026-05-01T00:00:00Z"), ZoneOffset.UTC)); + + assertThat(store.getKeyMaterial("2026-04-26")).isNotNull(); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Future refresh = executor.submit(() -> assertThat(store.getKeyMaterial("2026-04-26")).isNotNull()); + assertThat(loader.awaitBlockingRefresh()).isTrue(); + + long startNanos = System.nanoTime(); + assertThat(store.getKeyMaterial("2026-04-26")).isNotNull(); + long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + + assertThat(durationMillis) + .as("cached key lookup should not wait for scheduled refresh") + .isLessThan(500); + + loader.releaseRefresh(); + refresh.get(5, TimeUnit.SECONDS); + } + finally { + executor.shutdownNow(); + } + } + + @Test + void missingDayDoesNotForceRefresh() + { + byte[] key = IpCryptoFixtures.generateAesKey(); + CountingSnapshotLoader loader = new CountingSnapshotLoader( + snapshotBody("2026-04-26", IpCryptoFixtures.sha256Key(key), "2026-05-26")); + KeySnapshotStore store = new KeySnapshotStore( + loader, + Duration.ofHours(12), + Duration.ofSeconds(5), + Clock.fixed(Instant.parse("2026-05-01T00:00:00Z"), ZoneOffset.UTC)); + + assertThat(store.getKeyMaterial("2026-04-26")).isNotNull(); + assertThat(store.getKeyMaterial("2026-04-27")).isNull(); + assertThat(loader.loads()).isEqualTo(1); + } + + private record StaticSnapshotLoader(String body) + implements SnapshotLoader + { + @Override + public SnapshotBody load(Duration requestTimeout) + { + return new SnapshotBody(body, "test"); + } + + @Override + public String description() + { + return "test"; + } + } + + private static final class CountingSnapshotLoader + implements SnapshotLoader + { + private final String body; + private int loads; + + private CountingSnapshotLoader(String body) + { + this.body = body; + } + + @Override + public SnapshotBody load(Duration requestTimeout) + { + loads++; + return new SnapshotBody(body, "test"); + } + + int loads() + { + return loads; + } + + @Override + public String description() + { + return "test"; + } + } + + private static final class BlockingRefreshSnapshotLoader + implements SnapshotLoader + { + private final String body; + private final CountDownLatch refreshStarted = new CountDownLatch(1); + private final CountDownLatch releaseRefresh = new CountDownLatch(1); + private int loads; + + private BlockingRefreshSnapshotLoader(String body) + { + this.body = body; + } + + @Override + public synchronized SnapshotBody load(Duration requestTimeout) + { + loads++; + if (loads > 1) { + refreshStarted.countDown(); + try { + releaseRefresh.await(5, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + return new SnapshotBody(body, "test"); + } + + boolean awaitBlockingRefresh() + throws InterruptedException + { + return refreshStarted.await(5, TimeUnit.SECONDS); + } + + void releaseRefresh() + { + releaseRefresh.countDown(); + } + + @Override + public String description() + { + return "test"; + } + } + + private static void restoreProperty(String key, String value) + { + if (value == null) { + System.clearProperty(key); + } + else { + System.setProperty(key, value); + } + } + + private static String snapshotBody(String eventDay, String keySha256, String notAfter) + { + return "{" + + "\"generated_at\":\"2026-05-01T00:00:00+00:00\"," + + "\"retention_days\":30," + + "\"forward_prefetch_days\":10," + + "\"key_encoding\":\"sha256\"," + + "\"keys\":[{" + + "\"event_day\":\"" + eventDay + "\"," + + "\"key_name\":\"IP_OBFUSCATION_DAY_" + eventDay + "\"," + + "\"aes_key_sha256\":\"" + keySha256 + "\"," + + "\"not_after\":\"" + notAfter + "\"" + + "}]" + + "}"; + } +} diff --git a/pom.xml b/pom.xml index 375a2dcd33c1..8c3c5394d196 100644 --- a/pom.xml +++ b/pom.xml @@ -85,6 +85,7 @@ plugin/trino-hudi plugin/trino-iceberg plugin/trino-ignite + plugin/trino-ip-obfuscation plugin/trino-jmx plugin/trino-kafka plugin/trino-kafka-event-listener