diff --git a/java/lance-jni/src/delta.rs b/java/lance-jni/src/delta.rs index 21a4f726ed1..d5a6b0f3a27 100755 --- a/java/lance-jni/src/delta.rs +++ b/java/lance-jni/src/delta.rs @@ -109,7 +109,7 @@ fn inner_native_build<'local>( } #[unsafe(no_mangle)] -pub extern "system" fn Java_org_lance_delta_DatasetDelta_listTransactions<'local>( +pub extern "system" fn Java_org_lance_delta_DatasetDelta_nativeListTransactions<'local>( mut env: JNIEnv<'local>, j_delta: JObject<'local>, ) -> JObject<'local> { @@ -140,7 +140,7 @@ fn inner_list_transactions<'local>( } #[unsafe(no_mangle)] -pub extern "system" fn Java_org_lance_delta_DatasetDelta_getInsertedRows<'local>( +pub extern "system" fn Java_org_lance_delta_DatasetDelta_nativeGetInsertedRows<'local>( mut env: JNIEnv<'local>, j_delta: JObject<'local>, stream_addr: jlong, @@ -164,7 +164,7 @@ fn inner_get_inserted_rows<'local>( } #[unsafe(no_mangle)] -pub extern "system" fn Java_org_lance_delta_DatasetDelta_getUpdatedRows<'local>( +pub extern "system" fn Java_org_lance_delta_DatasetDelta_nativeGetUpdatedRows<'local>( mut env: JNIEnv<'local>, j_delta: JObject<'local>, stream_addr: jlong, diff --git a/java/src/test/java/org/lance/DeltaTest.java b/java/src/test/java/org/lance/DeltaTest.java index 72537207524..ac7056840e4 100755 --- a/java/src/test/java/org/lance/DeltaTest.java +++ b/java/src/test/java/org/lance/DeltaTest.java @@ -28,11 +28,12 @@ import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.file.Path; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -41,9 +42,9 @@ public class DeltaTest { @Test - public void testInsertedRowsComparedAgainst() throws IOException { + public void testInsertedRowsComparedAgainst(@TempDir Path tempDir) throws IOException { try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { - String uri = "memory://delta_demo"; + String uri = tempDir.resolve("delta_demo").toString(); // Build initial batch (2 rows) Schema schema = new Schema( @@ -79,7 +80,11 @@ public void testInsertedRowsComparedAgainst() throws IOException { org.apache.arrow.c.ArrowArrayStream.allocateNew(allocator)) { Data.exportArrayStream(allocator, reader1, stream1); Dataset ds = - Dataset.write().stream(stream1).uri(uri).mode(WriteParams.WriteMode.CREATE).execute(); + Dataset.write().stream(stream1) + .uri(uri) + .mode(WriteParams.WriteMode.CREATE) + .enableStableRowIds(true) + .execute(); // Append one row (v2) VectorSchemaRoot root2 = VectorSchemaRoot.create(schema, allocator); @@ -107,40 +112,35 @@ public void testInsertedRowsComparedAgainst() throws IOException { Dataset.write().stream(stream2).uri(uri).mode(WriteParams.WriteMode.APPEND).execute(); DatasetDelta delta = ds2.delta(1L); - try { - try (ArrowReader inserted = delta.getInsertedRows()) { - int total = 0; - boolean foundRow = false; - - while (inserted.loadNextBatch()) { - VectorSchemaRoot outRoot = inserted.getVectorSchemaRoot(); - Schema outSchema = outRoot.getSchema(); - List names = - outSchema.getFields().stream().map(Field::getName).collect(Collectors.toList()); - Assertions.assertTrue(names.contains("_row_created_at_version")); - Assertions.assertTrue(names.contains("_row_last_updated_at_version")); - - IntVector outId = (IntVector) outRoot.getVector("id"); - VarCharVector outVal = (VarCharVector) outRoot.getVector("val"); - - for (int i = 0; i < outRoot.getRowCount(); i++) { - int id = outId.get(i); - byte[] bytes = outVal.get(i); - String val = new String(bytes, java.nio.charset.StandardCharsets.UTF_8); - if (id == 3 && "c".equals(val)) { - foundRow = true; - } + try (ArrowReader inserted = delta.getInsertedRows()) { + int total = 0; + boolean foundRow = false; + + while (inserted.loadNextBatch()) { + VectorSchemaRoot outRoot = inserted.getVectorSchemaRoot(); + Schema outSchema = outRoot.getSchema(); + List names = + outSchema.getFields().stream().map(Field::getName).collect(Collectors.toList()); + Assertions.assertTrue(names.contains("_row_created_at_version")); + Assertions.assertTrue(names.contains("_row_last_updated_at_version")); + + IntVector outId = (IntVector) outRoot.getVector("id"); + VarCharVector outVal = (VarCharVector) outRoot.getVector("val"); + + for (int i = 0; i < outRoot.getRowCount(); i++) { + int id = outId.get(i); + byte[] bytes = outVal.get(i); + String val = new String(bytes, java.nio.charset.StandardCharsets.UTF_8); + if (id == 3 && "c".equals(val)) { + foundRow = true; } - - total += outRoot.getRowCount(); } - Assertions.assertEquals(1, total); - Assertions.assertTrue(foundRow, "Inserted row (id=3, val=c) not found in delta"); + total += outRoot.getRowCount(); } - } catch (UnsatisfiedLinkError e) { - Assumptions.assumeTrue( - false, "JNI for DatasetDelta.getInsertedRows not available: " + e.getMessage()); + + Assertions.assertEquals(1, total); + Assertions.assertTrue(foundRow, "Inserted row (id=3, val=c) not found in delta"); } } } @@ -148,10 +148,9 @@ public void testInsertedRowsComparedAgainst() throws IOException { } @Test - public void testListTransactionsExplicitRange() { + public void testListTransactionsExplicitRange(@TempDir Path tempDir) throws IOException { try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { - String uri = "memory://delta_demo_tx"; - // v1 + String uri = tempDir.resolve("delta_demo_tx").toString(); Schema schema = new Schema( Arrays.asList( @@ -159,21 +158,59 @@ public void testListTransactionsExplicitRange() { "id", new org.apache.arrow.vector.types.pojo.ArrowType.Int(32, true)), Field.nullable( "val", org.apache.arrow.vector.types.pojo.ArrowType.Utf8.INSTANCE))); - try (Dataset ds = Dataset.create(allocator, uri, schema, new WriteParams.Builder().build())) { - // v2 - WriteParams params = - new WriteParams.Builder().withMode(WriteParams.WriteMode.APPEND).build(); - try (Dataset ds2 = Dataset.create(allocator, uri, schema, params); ) { + + // v1: create with two rows. + byte[] batch1 = writeBatch(allocator, schema, new int[] {1, 2}, new String[] {"a", "b"}); + try (ArrowStreamReader reader1 = + new ArrowStreamReader(new ByteArrayReadableSeekableByteChannel(batch1), allocator); + ArrowArrayStream stream1 = ArrowArrayStream.allocateNew(allocator)) { + Data.exportArrayStream(allocator, reader1, stream1); + Dataset.write().stream(stream1) + .uri(uri) + .mode(WriteParams.WriteMode.CREATE) + .execute() + .close(); + } + + // v2: append one row. + byte[] batch2 = writeBatch(allocator, schema, new int[] {3}, new String[] {"c"}); + try (ArrowStreamReader reader2 = + new ArrowStreamReader(new ByteArrayReadableSeekableByteChannel(batch2), allocator); + ArrowArrayStream stream2 = ArrowArrayStream.allocateNew(allocator)) { + Data.exportArrayStream(allocator, reader2, stream2); + try (Dataset ds2 = + Dataset.write().stream(stream2).uri(uri).mode(WriteParams.WriteMode.APPEND).execute()) { DatasetDelta delta = ds2.delta(1L, 2L); - try { - List txs = delta.listTransactions(); - Assertions.assertTrue(txs.size() == 1); - } catch (UnsatisfiedLinkError e) { - Assumptions.assumeTrue( - false, "JNI for DatasetDelta.listTransactions not available: " + e.getMessage()); - } + List txs = delta.listTransactions(); + Assertions.assertEquals(1, txs.size(), "delta v1..v2 should contain exactly one txn"); } } } } + + /** Helper: serialize a single Arrow batch with the given schema and (id, val) pairs. */ + private static byte[] writeBatch(RootAllocator allocator, Schema schema, int[] ids, String[] vals) + throws IOException { + Assertions.assertEquals(ids.length, vals.length, "ids and vals must align"); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + try { + root.allocateNew(); + IntVector idVec = (IntVector) root.getVector("id"); + VarCharVector valVec = (VarCharVector) root.getVector("val"); + for (int i = 0; i < ids.length; i++) { + idVec.setSafe(i, ids[i]); + valVec.setSafe(i, vals[i].getBytes(java.nio.charset.StandardCharsets.UTF_8)); + } + root.setRowCount(ids.length); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) { + writer.start(); + writer.writeBatch(); + writer.end(); + } + return out.toByteArray(); + } finally { + root.close(); + } + } }