Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions project/CelebornBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@ object CelebornClient {
object CelebornService {
lazy val service = Project("celeborn-service", file("service"))
.dependsOn(CelebornCommon.common % "test->test;compile->compile")
.dependsOn(CelebornClient.client)
.dependsOn(CelebornOpenApi.openApiClient)
.settings (
commonSettings,
Expand Down
2 changes: 2 additions & 0 deletions rust/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
target/
Cargo.lock
6 changes: 6 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[workspace]
members = [
"celeborn-client-sys",
"celeborn-client",
]
resolver = "2"
12 changes: 12 additions & 0 deletions rust/celeborn-client-sys/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "celeborn-client-sys"
version = "0.1.0"
edition = "2021"
description = "Low-level FFI bindings to Apache Celeborn C++ client via cxx"
publish = false

[dependencies]
cxx = "1.0"

[build-dependencies]
cxx-build = "1.0"
139 changes: 139 additions & 0 deletions rust/celeborn-client-sys/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
fn main() {
let prefix = std::env::var("CELEBORN_CPP_PREFIX")
.expect("set CELEBORN_CPP_PREFIX to cpp/build/installed first");

let target_arch = std::env::var("CARGO_CFG_TARGET_ARCH").unwrap();
let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap();

let mut builder = cxx_build::bridge("src/lib.rs");
builder
.file("src/wrapper.cc")
.include(format!("{prefix}/include"))
.include("include")
.flag_if_supported("-std=c++17");

// Include paths for system dependencies (folly, boost, etc.)
if target_os == "macos" {
builder.include("/opt/homebrew/include");
builder.include("/opt/homebrew/opt/openssl@3/include");
}

if target_arch == "x86_64" {
builder.flag_if_supported("-msse4.2");
}

builder.compile("celeborn_ffi_wrapper");

// Link cpp/ static libraries (order matters: dependents first)
println!("cargo:rustc-link-search=native={prefix}/lib");
for lib in &["client", "protocol", "network", "proto", "memory", "conf", "utils"] {
println!("cargo:rustc-link-lib=static={lib}");
}

// System dependencies
let base_dylibs = [
"folly", "glog", "gflags", "protobuf", "fizz", "wangle",
"ssl", "crypto", "sodium", "lz4", "zstd", "z",
"fmt", "xxhash", "re2",
"double-conversion", "event",
];
for lib in &base_dylibs {
println!("cargo:rustc-link-lib=dylib={lib}");
}

// abseil libraries required by protobuf v28+
let abseil_libs = [
"absl_log_internal_check_op",
"absl_log_internal_conditions",
"absl_log_internal_message",
"absl_log_internal_nullguard",
"absl_log_internal_format",
"absl_log_internal_globals",
"absl_log_internal_log_sink_set",
"absl_log_internal_proto",
"absl_log_internal_fnmatch",
"absl_log_entry",
"absl_log_globals",
"absl_log_initialize",
"absl_log_severity",
"absl_log_sink",
"absl_raw_logging_internal",
"absl_hash",
"absl_low_level_hash",
"absl_city",
"absl_raw_hash_set",
"absl_hashtablez_sampler",
"absl_status",
"absl_statusor",
"absl_strings",
"absl_strings_internal",
"absl_string_view",
"absl_str_format_internal",
"absl_base",
"absl_spinlock_wait",
"absl_int128",
"absl_throw_delegate",
"absl_time",
"absl_time_zone",
"absl_civil_time",
"absl_synchronization",
"absl_stacktrace",
"absl_symbolize",
"absl_debugging_internal",
"absl_demangle_internal",
"absl_malloc_internal",
"absl_exponential_biased",
"absl_strerror",
"absl_cord",
"absl_cord_internal",
"absl_cordz_functions",
"absl_cordz_handle",
"absl_cordz_info",
"absl_cordz_sample_token",
"absl_crc32c",
"absl_crc_cord_state",
"absl_crc_cpu_detect",
"absl_crc_internal",
"absl_die_if_null",
"absl_examine_stack",
"absl_vlog_config_internal",
"absl_kernel_timeout_internal",
];
for lib in &abseil_libs {
println!("cargo:rustc-link-lib=dylib={lib}");
}

// Boost libraries: macOS Homebrew uses -mt suffix for some components
let boost_components = [
"context", "system", "filesystem", "thread",
"regex", "atomic", "date_time", "program_options",
];
if target_os == "macos" {
for comp in &boost_components {
// Try -mt variant first (macOS Homebrew convention)
println!("cargo:rustc-link-lib=dylib=boost_{comp}-mt");
}
} else {
for comp in &boost_components {
println!("cargo:rustc-link-lib=dylib=boost_{comp}");
}
}

if target_os == "linux" {
println!("cargo:rustc-link-lib=dylib=unwind");
println!("cargo:rustc-link-lib=dylib=stdc++");
} else if target_os == "macos" {
println!("cargo:rustc-link-search=native=/opt/homebrew/lib");
println!("cargo:rustc-link-search=native=/opt/homebrew/opt/openssl@3/lib");
println!("cargo:rustc-link-lib=dylib=c++");
} else {
panic!(
"unsupported target_os: {target_os} \
(first version only supports linux/macos)"
);
}

println!("cargo:rerun-if-changed=src/wrapper.cc");
println!("cargo:rerun-if-changed=include/wrapper.h");
println!("cargo:rerun-if-env-changed=CELEBORN_CPP_PREFIX");
}
49 changes: 49 additions & 0 deletions rust/celeborn-client-sys/include/wrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "rust/cxx.h"
#include "celeborn/client/ShuffleClient.h"
#include "celeborn/conf/CelebornConf.h"

namespace celeborn_ffi {

struct ShuffleClientHandle {
std::shared_ptr<celeborn::conf::CelebornConf> conf;
std::shared_ptr<celeborn::client::ShuffleClientEndpoint> endpoint;
std::shared_ptr<celeborn::client::ShuffleClientImpl> client;
std::string app_id;
std::string lifecycle_manager_host;
};

std::unique_ptr<ShuffleClientHandle> create_client(
const std::string& app_id,
int32_t push_buffer_max_size,
const std::string& shuffle_compression_codec);

void setup_lifecycle_manager(
ShuffleClientHandle& handle, const std::string& host, int32_t port);

void shutdown(ShuffleClientHandle& handle);

void push_data(ShuffleClientHandle& handle,
int32_t shuffle_id, int32_t map_id, int32_t attempt_id,
int32_t partition_id,
rust::Slice<const uint8_t> data,
int32_t num_mappers, int32_t num_partitions);

void mapper_end(ShuffleClientHandle& handle,
int32_t shuffle_id, int32_t map_id,
int32_t attempt_id, int32_t num_mappers);

void update_reducer_file_group(ShuffleClientHandle& handle, int32_t shuffle_id);

rust::Vec<uint8_t> read_partition_full(
ShuffleClientHandle& handle,
int32_t shuffle_id,
int32_t partition_id,
int32_t attempt_number,
int32_t start_map_index,
int32_t end_map_index);

} // namespace celeborn_ffi
55 changes: 55 additions & 0 deletions rust/celeborn-client-sys/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#[cxx::bridge(namespace = "celeborn_ffi")]
pub mod ffi {
unsafe extern "C++" {
include!("wrapper.h");

type ShuffleClientHandle;

fn create_client(
app_id: &CxxString,
push_buffer_max_size: i32,
shuffle_compression_codec: &CxxString,
) -> Result<UniquePtr<ShuffleClientHandle>>;

fn setup_lifecycle_manager(
handle: Pin<&mut ShuffleClientHandle>,
host: &CxxString,
port: i32,
) -> Result<()>;

fn shutdown(handle: Pin<&mut ShuffleClientHandle>) -> Result<()>;

fn push_data(
handle: Pin<&mut ShuffleClientHandle>,
shuffle_id: i32,
map_id: i32,
attempt_id: i32,
partition_id: i32,
data: &[u8],
num_mappers: i32,
num_partitions: i32,
) -> Result<()>;

fn mapper_end(
handle: Pin<&mut ShuffleClientHandle>,
shuffle_id: i32,
map_id: i32,
attempt_id: i32,
num_mappers: i32,
) -> Result<()>;

fn update_reducer_file_group(
handle: Pin<&mut ShuffleClientHandle>,
shuffle_id: i32,
) -> Result<()>;

fn read_partition_full(
handle: Pin<&mut ShuffleClientHandle>,
shuffle_id: i32,
partition_id: i32,
attempt_number: i32,
start_map_index: i32,
end_map_index: i32,
) -> Result<Vec<u8>>;
}
}
Loading
Loading