Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions java/lance-jni/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn inner_native_build<'local>(
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_delta_DatasetDelta_listTransactions<'local>(
pub extern "system" fn Java_org_lance_delta_DatasetDelta_nativeListTransactions<'local>(
mut env: JNIEnv<'local>,
j_delta: JObject<'local>,
) -> JObject<'local> {
Expand Down Expand Up @@ -140,7 +140,7 @@ fn inner_list_transactions<'local>(
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_delta_DatasetDelta_getInsertedRows<'local>(
pub extern "system" fn Java_org_lance_delta_DatasetDelta_nativeGetInsertedRows<'local>(
mut env: JNIEnv<'local>,
j_delta: JObject<'local>,
stream_addr: jlong,
Expand All @@ -164,7 +164,7 @@ fn inner_get_inserted_rows<'local>(
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_delta_DatasetDelta_getUpdatedRows<'local>(
pub extern "system" fn Java_org_lance_delta_DatasetDelta_nativeGetUpdatedRows<'local>(
mut env: JNIEnv<'local>,
j_delta: JObject<'local>,
stream_addr: jlong,
Expand Down
135 changes: 86 additions & 49 deletions java/src/test/java/org/lance/DeltaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -41,9 +42,9 @@
public class DeltaTest {

@Test
public void testInsertedRowsComparedAgainst() throws IOException {
public void testInsertedRowsComparedAgainst(@TempDir Path tempDir) throws IOException {
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
String uri = "memory://delta_demo";
String uri = tempDir.resolve("delta_demo").toString();
// Build initial batch (2 rows)
Schema schema =
new Schema(
Expand Down Expand Up @@ -79,7 +80,11 @@ public void testInsertedRowsComparedAgainst() throws IOException {
org.apache.arrow.c.ArrowArrayStream.allocateNew(allocator)) {
Data.exportArrayStream(allocator, reader1, stream1);
Dataset ds =
Dataset.write().stream(stream1).uri(uri).mode(WriteParams.WriteMode.CREATE).execute();
Dataset.write().stream(stream1)
.uri(uri)
.mode(WriteParams.WriteMode.CREATE)
.enableStableRowIds(true)
.execute();

// Append one row (v2)
VectorSchemaRoot root2 = VectorSchemaRoot.create(schema, allocator);
Expand Down Expand Up @@ -107,73 +112,105 @@ public void testInsertedRowsComparedAgainst() throws IOException {
Dataset.write().stream(stream2).uri(uri).mode(WriteParams.WriteMode.APPEND).execute();

DatasetDelta delta = ds2.delta(1L);
try {
try (ArrowReader inserted = delta.getInsertedRows()) {
int total = 0;
boolean foundRow = false;

while (inserted.loadNextBatch()) {
VectorSchemaRoot outRoot = inserted.getVectorSchemaRoot();
Schema outSchema = outRoot.getSchema();
List<String> names =
outSchema.getFields().stream().map(Field::getName).collect(Collectors.toList());
Assertions.assertTrue(names.contains("_row_created_at_version"));
Assertions.assertTrue(names.contains("_row_last_updated_at_version"));

IntVector outId = (IntVector) outRoot.getVector("id");
VarCharVector outVal = (VarCharVector) outRoot.getVector("val");

for (int i = 0; i < outRoot.getRowCount(); i++) {
int id = outId.get(i);
byte[] bytes = outVal.get(i);
String val = new String(bytes, java.nio.charset.StandardCharsets.UTF_8);
if (id == 3 && "c".equals(val)) {
foundRow = true;
}
try (ArrowReader inserted = delta.getInsertedRows()) {
int total = 0;
boolean foundRow = false;

while (inserted.loadNextBatch()) {
VectorSchemaRoot outRoot = inserted.getVectorSchemaRoot();
Schema outSchema = outRoot.getSchema();
List<String> names =
outSchema.getFields().stream().map(Field::getName).collect(Collectors.toList());
Assertions.assertTrue(names.contains("_row_created_at_version"));
Assertions.assertTrue(names.contains("_row_last_updated_at_version"));

IntVector outId = (IntVector) outRoot.getVector("id");
VarCharVector outVal = (VarCharVector) outRoot.getVector("val");

for (int i = 0; i < outRoot.getRowCount(); i++) {
int id = outId.get(i);
byte[] bytes = outVal.get(i);
String val = new String(bytes, java.nio.charset.StandardCharsets.UTF_8);
if (id == 3 && "c".equals(val)) {
foundRow = true;
}

total += outRoot.getRowCount();
}

Assertions.assertEquals(1, total);
Assertions.assertTrue(foundRow, "Inserted row (id=3, val=c) not found in delta");
total += outRoot.getRowCount();
}
} catch (UnsatisfiedLinkError e) {
Assumptions.assumeTrue(
false, "JNI for DatasetDelta.getInsertedRows not available: " + e.getMessage());

Assertions.assertEquals(1, total);
Assertions.assertTrue(foundRow, "Inserted row (id=3, val=c) not found in delta");
}
}
}
}
}

@Test
public void testListTransactionsExplicitRange() {
public void testListTransactionsExplicitRange(@TempDir Path tempDir) throws IOException {
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
String uri = "memory://delta_demo_tx";
// v1
String uri = tempDir.resolve("delta_demo_tx").toString();
Schema schema =
new Schema(
Arrays.asList(
Field.notNullable(
"id", new org.apache.arrow.vector.types.pojo.ArrowType.Int(32, true)),
Field.nullable(
"val", org.apache.arrow.vector.types.pojo.ArrowType.Utf8.INSTANCE)));
try (Dataset ds = Dataset.create(allocator, uri, schema, new WriteParams.Builder().build())) {
// v2
WriteParams params =
new WriteParams.Builder().withMode(WriteParams.WriteMode.APPEND).build();
try (Dataset ds2 = Dataset.create(allocator, uri, schema, params); ) {

// v1: create with two rows.
byte[] batch1 = writeBatch(allocator, schema, new int[] {1, 2}, new String[] {"a", "b"});
try (ArrowStreamReader reader1 =
new ArrowStreamReader(new ByteArrayReadableSeekableByteChannel(batch1), allocator);
ArrowArrayStream stream1 = ArrowArrayStream.allocateNew(allocator)) {
Data.exportArrayStream(allocator, reader1, stream1);
Dataset.write().stream(stream1)
.uri(uri)
.mode(WriteParams.WriteMode.CREATE)
.execute()
.close();
}

// v2: append one row.
byte[] batch2 = writeBatch(allocator, schema, new int[] {3}, new String[] {"c"});
try (ArrowStreamReader reader2 =
new ArrowStreamReader(new ByteArrayReadableSeekableByteChannel(batch2), allocator);
ArrowArrayStream stream2 = ArrowArrayStream.allocateNew(allocator)) {
Data.exportArrayStream(allocator, reader2, stream2);
try (Dataset ds2 =
Dataset.write().stream(stream2).uri(uri).mode(WriteParams.WriteMode.APPEND).execute()) {
DatasetDelta delta = ds2.delta(1L, 2L);
try {
List<Transaction> txs = delta.listTransactions();
Assertions.assertTrue(txs.size() == 1);
} catch (UnsatisfiedLinkError e) {
Assumptions.assumeTrue(
false, "JNI for DatasetDelta.listTransactions not available: " + e.getMessage());
}
List<Transaction> txs = delta.listTransactions();
Assertions.assertEquals(1, txs.size(), "delta v1..v2 should contain exactly one txn");
}
}
}
}

/** Helper: serialize a single Arrow batch with the given schema and (id, val) pairs. */
private static byte[] writeBatch(RootAllocator allocator, Schema schema, int[] ids, String[] vals)
throws IOException {
Assertions.assertEquals(ids.length, vals.length, "ids and vals must align");
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
try {
root.allocateNew();
IntVector idVec = (IntVector) root.getVector("id");
VarCharVector valVec = (VarCharVector) root.getVector("val");
for (int i = 0; i < ids.length; i++) {
idVec.setSafe(i, ids[i]);
valVec.setSafe(i, vals[i].getBytes(java.nio.charset.StandardCharsets.UTF_8));
}
root.setRowCount(ids.length);
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) {
writer.start();
writer.writeBatch();
writer.end();
}
return out.toByteArray();
} finally {
root.close();
}
}
}
Loading