Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions java/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use arrow::array::{RecordBatch, RecordBatchIterator, StructArray};
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi_and_data_type};
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use arrow_schema::DataType;
use arrow_schema::{DataType, Schema as ArrowSchema};
use jni::objects::{JIntArray, JValue, JValueGen};
use jni::{
JNIEnv,
Expand All @@ -19,7 +19,7 @@ use lance_io::utils::CachedFileSize;
use lance_table::rowids::{RowIdSequence, write_row_ids};
use std::iter::once;

use lance::dataset::fragment::FileFragment;
use lance::dataset::fragment::write::FragmentCreateBuilder;
use lance::io::ObjectStoreParams;
use lance_datafusion::utils::StreamingWriteSource;
use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsProvider};
Expand Down Expand Up @@ -108,6 +108,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>(
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
schema_addr: jlong,
) -> JObject<'local> {
ok_or_throw_with_return!(
env,
Expand All @@ -130,6 +131,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>(
table_id_obj,
allow_external_blob_outside_bases,
blob_pack_file_size_threshold,
schema_addr,
),
JObject::default()
)
Expand All @@ -155,6 +157,7 @@ fn inner_create_with_ffi_array<'local>(
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
schema_addr: jlong,
) -> Result<JObject<'local>> {
let c_array_ptr = arrow_array_addr as *mut FFI_ArrowArray;
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
Expand Down Expand Up @@ -186,6 +189,7 @@ fn inner_create_with_ffi_array<'local>(
table_id_obj,
allow_external_blob_outside_bases,
blob_pack_file_size_threshold,
schema_addr,
reader,
)
}
Expand All @@ -210,6 +214,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>(
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
schema_addr: jlong,
) -> JObject<'a> {
ok_or_throw_with_return!(
env,
Expand All @@ -231,6 +236,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>(
table_id_obj,
allow_external_blob_outside_bases,
blob_pack_file_size_threshold,
schema_addr,
),
JObject::null()
)
Expand All @@ -255,6 +261,7 @@ fn inner_create_with_ffi_stream<'local>(
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
schema_addr: jlong,
) -> Result<JObject<'local>> {
let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
Expand All @@ -276,6 +283,7 @@ fn inner_create_with_ffi_stream<'local>(
table_id_obj,
allow_external_blob_outside_bases,
blob_pack_file_size_threshold,
schema_addr,
reader,
)
}
Expand All @@ -298,6 +306,7 @@ fn create_fragment<'a>(
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
schema_addr: jlong,
source: impl StreamingWriteSource,
) -> Result<JObject<'a>> {
let path_str = dataset_uri.extract(env)?;
Expand Down Expand Up @@ -345,11 +354,19 @@ fn create_fragment<'a>(
});
}

let fragments = RT.block_on(FileFragment::create_fragments(
&path_str,
source,
Some(write_params),
))?;
let mut builder = FragmentCreateBuilder::new(&path_str).write_params(&write_params);
let schema;
if schema_addr != 0 {
let c_schema_ptr = schema_addr as *mut FFI_ArrowSchema;
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
let arrow_schema = ArrowSchema::try_from(&c_schema)?;
// Schema::try_from restores Lance field IDs from the LANCE_FIELD_ID_KEY
// metadata inserted by LanceSchema.asArrowSchemaWithFieldIds().
schema = Schema::try_from(&arrow_schema)?;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to restore the field ids here (i.e. move them from metadata with LANCE_FIELD_ID_KEY to Lance field ids)? Or is that already done in helpers somewhere?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, yes, IDs inserted/restored by LanceSchema.asArrowSchemaWithFieldIds(), a lso add a test to check this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace can we merge? thanks!

builder = builder.schema(&schema);
}

let fragments = RT.block_on(builder.write_fragments(source))?;
export_vec(env, &fragments)
}

Expand Down
85 changes: 81 additions & 4 deletions java/src/main/java/org/lance/Fragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.lance.ipc.LanceScanner;
import org.lance.ipc.ScanOptions;
import org.lance.namespace.LanceNamespace;
import org.lance.schema.LanceSchema;

import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowArrayStream;
Expand Down Expand Up @@ -260,13 +261,49 @@ static List<FragmentMetadata> create(
WriteParams params,
LanceNamespace namespaceClient,
List<String> tableId) {
return create(datasetUri, allocator, root, params, namespaceClient, tableId, null);
}

/** Create a fragment from the given arrow array and schema. */
static List<FragmentMetadata> create(
String datasetUri,
BufferAllocator allocator,
VectorSchemaRoot root,
WriteParams params,
LanceNamespace namespaceClient,
List<String> tableId,
LanceSchema schema) {
Preconditions.checkNotNull(datasetUri);
Preconditions.checkNotNull(allocator);
Preconditions.checkNotNull(root);
Preconditions.checkNotNull(params);
try (ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
ArrowArray arrowArray = ArrowArray.allocateNew(allocator)) {
Data.exportVectorSchemaRoot(allocator, root, null, arrowArray, arrowSchema);
if (schema != null) {
try (ArrowSchema lanceSchema = ArrowSchema.allocateNew(allocator)) {
Data.exportSchema(allocator, schema.asArrowSchemaWithFieldIds(), null, lanceSchema);
return createWithFfiArray(
datasetUri,
arrowArray.memoryAddress(),
arrowSchema.memoryAddress(),
params.getMaxRowsPerFile(),
params.getMaxRowsPerGroup(),
params.getMaxBytesPerFile(),
params.getMode(),
params.getEnableStableRowIds(),
params.getDataStorageVersion(),
params.getStorageOptions(),
params.getBaseStoreParams(),
params.getInitialBases(),
params.getTargetBases(),
namespaceClient,
tableId,
params.getAllowExternalBlobOutsideBases(),
params.getBlobPackFileSizeThreshold(),
lanceSchema.memoryAddress());
}
}
return createWithFfiArray(
datasetUri,
arrowArray.memoryAddress(),
Expand All @@ -284,7 +321,8 @@ static List<FragmentMetadata> create(
namespaceClient,
tableId,
params.getAllowExternalBlobOutsideBases(),
params.getBlobPackFileSizeThreshold());
params.getBlobPackFileSizeThreshold(),
0L);
}
}

Expand All @@ -295,9 +333,45 @@ static List<FragmentMetadata> create(
WriteParams params,
LanceNamespace namespaceClient,
List<String> tableId) {
return create(datasetUri, null, stream, params, namespaceClient, tableId, null);
}

/** Create a fragment from the given arrow stream. */
static List<FragmentMetadata> create(
String datasetUri,
BufferAllocator allocator,
ArrowArrayStream stream,
WriteParams params,
LanceNamespace namespaceClient,
List<String> tableId,
LanceSchema schema) {
Preconditions.checkNotNull(datasetUri);
Preconditions.checkNotNull(stream);
Preconditions.checkNotNull(params);
if (schema != null) {
Preconditions.checkNotNull(allocator, "allocator is required with schema");
try (ArrowSchema lanceSchema = ArrowSchema.allocateNew(allocator)) {
Data.exportSchema(allocator, schema.asArrowSchemaWithFieldIds(), null, lanceSchema);
return createWithFfiStream(
datasetUri,
stream.memoryAddress(),
params.getMaxRowsPerFile(),
params.getMaxRowsPerGroup(),
params.getMaxBytesPerFile(),
params.getMode(),
params.getEnableStableRowIds(),
params.getDataStorageVersion(),
params.getStorageOptions(),
params.getBaseStoreParams(),
params.getInitialBases(),
params.getTargetBases(),
namespaceClient,
tableId,
params.getAllowExternalBlobOutsideBases(),
params.getBlobPackFileSizeThreshold(),
lanceSchema.memoryAddress());
}
}
return createWithFfiStream(
datasetUri,
stream.memoryAddress(),
Expand All @@ -314,7 +388,8 @@ static List<FragmentMetadata> create(
namespaceClient,
tableId,
params.getAllowExternalBlobOutsideBases(),
params.getBlobPackFileSizeThreshold());
params.getBlobPackFileSizeThreshold(),
0L);
}

/** Create a fragment from the given arrow array and schema. */
Expand All @@ -335,7 +410,8 @@ private static native List<FragmentMetadata> createWithFfiArray(
LanceNamespace namespaceClient,
List<String> tableId,
Optional<Boolean> allowExternalBlobOutsideBases,
Optional<Long> blobPackFileSizeThreshold);
Optional<Long> blobPackFileSizeThreshold,
long schemaMemoryAddress);

/** Create a fragment from the given arrow stream. */
private static native List<FragmentMetadata> createWithFfiStream(
Expand All @@ -354,5 +430,6 @@ private static native List<FragmentMetadata> createWithFfiStream(
LanceNamespace namespaceClient,
List<String> tableId,
Optional<Boolean> allowExternalBlobOutsideBases,
Optional<Long> blobPackFileSizeThreshold);
Optional<Long> blobPackFileSizeThreshold,
long schemaMemoryAddress);
}
36 changes: 34 additions & 2 deletions java/src/main/java/org/lance/WriteFragmentBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.lance;

import org.lance.namespace.LanceNamespace;
import org.lance.schema.LanceSchema;

import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.memory.BufferAllocator;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class WriteFragmentBuilder {
private BufferAllocator allocator;
private VectorSchemaRoot vectorSchemaRoot;
private ArrowArrayStream arrowArrayStream;
private LanceSchema schema;
private WriteParams writeParams;
private WriteParams.Builder writeParamsBuilder;
private LanceNamespace namespaceClient;
Expand Down Expand Up @@ -100,6 +102,22 @@ public WriteFragmentBuilder data(ArrowArrayStream stream) {
return this;
}

/**
* Set the Lance dataset schema to use when writing fragments.
*
* <p>This is useful for distributed writes where workers create uncommitted fragments and a
* coordinator commits them later. When this schema is supplied, lance-core does not need to open
* the existing dataset to infer the schema in APPEND mode. The schema should come from the target
* dataset so Lance field IDs are preserved.
*
* @param schema the target Lance dataset schema
* @return this builder
*/
public WriteFragmentBuilder schema(LanceSchema schema) {
this.schema = schema;
return this;
}

/**
* Set the write parameters.
*
Expand Down Expand Up @@ -278,10 +296,22 @@ public List<FragmentMetadata> execute() {
// storage options provider when these are non-null for credential refresh
if (vectorSchemaRoot != null) {
return Fragment.create(
datasetUri, allocator, vectorSchemaRoot, finalWriteParams, namespaceClient, tableId);
datasetUri,
allocator,
vectorSchemaRoot,
finalWriteParams,
namespaceClient,
tableId,
schema);
} else {
return Fragment.create(
datasetUri, arrowArrayStream, finalWriteParams, namespaceClient, tableId);
datasetUri,
allocator,
arrowArrayStream,
finalWriteParams,
namespaceClient,
tableId,
schema);
}
}

Expand Down Expand Up @@ -312,6 +342,8 @@ private void validate() {
Preconditions.checkState(
vectorSchemaRoot == null || allocator != null,
"allocator is required when using VectorSchemaRoot");
Preconditions.checkState(
schema == null || allocator != null, "allocator is required with schema");
Preconditions.checkState(
writeParams == null || writeParamsBuilder == null,
"Cannot use both writeParams() and individual parameter methods");
Expand Down
20 changes: 20 additions & 0 deletions java/src/main/java/org/lance/schema/LanceField.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -156,6 +157,25 @@ public Field asArrowField() {
name, new FieldType(nullable, type, dictionaryEncoding, metadata), arrowChildren);
}

Field asArrowFieldWithFieldIds() {
List<Field> arrowChildren =
children.stream().map(LanceField::asArrowFieldWithFieldIds).collect(Collectors.toList());

if (type instanceof ArrowType.FixedSizeList) {
arrowChildren.addAll(childrenForFixedSizeList());
}

if (id < 0) {
throw new IllegalStateException("Lance field id is required for schema override: " + name);
}
Map<String, String> metadataWithFieldId = new HashMap<>(metadata);
metadataWithFieldId.put(LanceSchema.LANCE_FIELD_ID_KEY, Integer.toString(id));
return new Field(
name,
new FieldType(nullable, type, dictionaryEncoding, metadataWithFieldId),
arrowChildren);
}

private List<Field> childrenForFixedSizeList() {
if (logicalType == null || logicalType.isEmpty()) {
return Collections.emptyList();
Expand Down
7 changes: 7 additions & 0 deletions java/src/main/java/org/lance/schema/LanceSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.stream.Collectors;

public class LanceSchema {
static final String LANCE_FIELD_ID_KEY = "lance:field_id";
private final List<LanceField> fields;
private final Map<String, String> metadata;

Expand Down Expand Up @@ -68,6 +69,12 @@ public Schema asArrowSchema() {
fields.stream().map(LanceField::asArrowField).collect(Collectors.toList()), metadata);
}

public Schema asArrowSchemaWithFieldIds() {
return new Schema(
fields.stream().map(LanceField::asArrowFieldWithFieldIds).collect(Collectors.toList()),
metadata);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
Loading
Loading