[CELEBORN-2319] Standalone LifecycleManager && rust sdk#3677
[CELEBORN-2319] Standalone LifecycleManager && rust sdk#3677gavin9402 wants to merge 10 commits into
Conversation
|
This is amazing. What compute engine do you use? |
@FMX We plan to integrate it into the Daft engine. |
|
This is really good @gavin9402 , thanks for contributing. I will drop some review comments soon if I have any. |
afterincomparableyum
left a comment
There was a problem hiding this comment.
added some small comments, overall though looks good to me.
I am fine with service --> client dependency leaks into master/worker, but others may think the Daemon doesn't need to live in service, and would propose that you consider putting it in a new module that depends on both service and client without these dependency leaks.
| shutdownLatch.await() | ||
| } | ||
|
|
||
| private[lifecyclemanager] def applyArgsToConf( |
There was a problem hiding this comment.
The --host argument is basically ignored. LifecycleManagerDaemonArguments parses --host into host: Option[String], but applyArgsToConf only writes MASTER_ENDPOINTS and CLIENT_SHUFFLE_MANAGER_PORT. The host is never propagated. LifecycleManager binds to lifecycleHost = Utils.localHostName(conf) (LifecycleManager.scala:81), and Utils.localHostName only looks at the CELEBORN_LOCAL_HOSTNAME env or the auto resolved hostname. There's no conf key path for it.
So someone running start-lifecycle-manager.sh --host 10.0.0.5 gets the default hostname with no warning. Either drop the option, set CELEBORN_LOCAL_HOSTNAME from the script, or call Utils.setCustomHostname(host) before constructing LifecycleManager.
| /// already be torn down by `IOThreadPoolExecutor::join()`. | ||
| pub fn shutdown(mut self) -> Result<()> { | ||
| if let Some(pinned) = self.inner.as_mut() { | ||
| ffi::shutdown(pinned)?; |
There was a problem hiding this comment.
If ffi::shutdown returns Err, the ? propagates and self is dropped normally. Drop::drop then calls ffi::shutdown(pinned) a second time.
The comment on shutdown says the handle is intentionally leaked to avoid a folly EventBase use-after-free during destruction, so calling shutdown twice could re-trigger that teardown path.
I propose either ensuring the handle is leaked even when ffi::shutdown returns an error (so Drop cannot call ffi::shutdown a second time), or adding a “shutdown attempted” flag so Drop skips the shutdown call and only leaks the handle.
| rust::Vec<uint8_t> out; | ||
| out.reserve(64 * 1024); | ||
| std::vector<uint8_t> buf(64 * 1024); | ||
|
|
||
| while (true) { | ||
| int n = stream->read(buf.data(), 0, buf.size()); | ||
| if (n == -1) { | ||
| break; | ||
| } | ||
| if (n <= 0) { | ||
| throw std::runtime_error( | ||
| "celeborn-ffi: CelebornInputStream::read returned unexpected non-positive " + | ||
| std::to_string(n)); | ||
| } | ||
| for (int i = 0; i < n; ++i) { | ||
| out.push_back(buf[i]); |
There was a problem hiding this comment.
Per-byte push_back into rust::Vec<uint8_t> can introduce noticeable overhead for large partition reads, and the initial reserve(64 * 1024) only avoids reallocations for the first buffer chunk.
Consider accumulating into a std::vector<uint8_t> (or appending in larger chunks) and copying once at the end, or resizing the destination per chunk and using memcpy instead of pushing one byte at a time.
This doesn’t need to be addressed in the current PR, but it may be worth leaving a TODO here to revisit for performance improvements.
| return | ||
| } | ||
|
|
||
| if (daemonArgs.port < 1024) { |
There was a problem hiding this comment.
LifecycleManagerDaemonArguments already sys.exit(1) on this condition. Dead code.
|
@afterincomparableyum Thank you for your thorough review. I have made the requested changes by your suggestions above. |
391b486 to
c08f576
Compare
What changes were proposed in this pull request?
This PR introduces two major features to support non-JVM (C++/Rust) clients using Apache Celeborn for shuffle:
1. Standalone LifecycleManager Daemon (Scala/JVM)
LifecycleManagerDaemon— a standalone JVM process that hosts aLifecycleManagerindependently from any compute engine (Spark/Flink) Driver.LifecycleManagerDaemonArgumentsfor CLI argument parsing (--app-id,--master-endpoints,--port,--host,--properties-file).sbin/start-lifecycle-manager.shlaunch script with classpath assembly, environment loading, and required-argument validation.CelebornBuild.scalaandservice/pom.xmlto addceleborn-clientas a dependency of theservicemodule (needed because the Daemon instantiatesLifecycleManagerfrom the client module).2. Rust SDK via C++ FFI (
rust/directory)celeborn-client-sys: Low-level FFI crate using cxx to bridge Rust ↔ C++ Celeborn client. Includes:wrapper.h/wrapper.cc: C++ shim exposing 7 functions (create_client,setup_lifecycle_manager,shutdown,push_data,mapper_end,update_reducer_file_group,read_partition_full).build.rs: Build script linking Celeborn C++ static libs and system dependencies (folly, protobuf, abseil, boost, etc.) for macOS and Linux.celeborn-client: Safe, ergonomic Rust wrapper providingShuffleClientwith:Drop-safe shutdown (prevents doubleffi::shutdownviaUniquePtr::null()swap).read_partition_all.data_sum_writer.rs,data_sum_reader.rs) mirroring the existing C++DataSumWithWriterClient/DataSumWithReaderClienttest programs.Why are the changes needed?
Currently,
LifecycleManagercan only run embedded inside a JVM-based compute engine Driver (e.g., Spark Driver). This makes it impossible for non-JVM applications (Daft engine, etc.) to use Celeborn as their shuffle service, because:LifecycleManagerto coordinate shuffle metadata (register shuffles, allocate slots, manage partition locations) with Celeborn Masters and Workers.LifecycleManager, non-JVM applications have no way to bootstrap this coordination layer.By decoupling the
LifecycleManagerinto a standalone daemon process, any client — regardless of language runtime — can connect to it via RPC. The Rust SDK then leverages this architecture to provide first-class Rust support by bridging to the existing, battle-tested C++ client implementation via FFI.Does this PR resolve a correctness bug?
No
Does this PR introduce any user-facing change?
Yes.
LifecycleManagerdaemon viasbin/start-lifecycle-manager.sh --app-id <id> --master-endpoints <eps> --port <port>.celeborn-clientcrate to perform shuffle read/write operations against a Celeborn cluster.LifecycleManagerdoes not support auth (celeborn.auth.enabledmust befalse), as the C++/Rust clients lack SASL support.How was this patch tested?
data_sum_writeranddata_sum_readerexample programs, which are Rust ports of the existing C++ integration tests (DataSumWithWriterClient.cpp/DataSumWithReaderClient.cpp). These write random numeric data across partitions and verify correctness by comparing partition sums between writer and reader.LifecycleManagerDaemonwas tested by starting it against a local Celeborn cluster (Master + Workers) and verifying that the Rust examples can successfully connect, push data, and read data through the daemon.