diff --git a/.rat-excludes b/.rat-excludes index 22c97119e81..4816f032e3b 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -38,3 +38,5 @@ build/sbt-config/** **/benchmarks/** **/node_modules/** cpp/cmake/FindSodium.cmake +cpp/build/** +rust/Cargo.lock diff --git a/build/make-distribution.sh b/build/make-distribution.sh index e160b758dda..67e97017e65 100755 --- a/build/make-distribution.sh +++ b/build/make-distribution.sh @@ -146,7 +146,7 @@ function build_service { # Store the command as an array because $MVN variable might have spaces in it. # Normal quoting tricks don't work. # See: http://mywiki.wooledge.org/BashFAQ/050 - BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl master,worker,cli -am $@) + BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl master,worker,cli,lifecycle-manager -am $@) # Actually build the jar echo -e "\nBuilding with..." @@ -158,6 +158,7 @@ function build_service { mkdir -p "$DIST_DIR/master-jars" mkdir -p "$DIST_DIR/worker-jars" mkdir -p "$DIST_DIR/cli-jars" + mkdir -p "$DIST_DIR/lifecycle-manager-jars" ## Copy master jars cp "$PROJECT_DIR"/master/target/celeborn-master_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/master-jars/" @@ -177,6 +178,21 @@ function build_service { for jar in $(ls "$PROJECT_DIR/cli/target/scala-$SCALA_VERSION/jars"); do (cd $DIST_DIR/cli-jars; ln -snf "../jars/$jar" .) done + ## Copy lifecycle-manager jars + # lifecycle-manager depends on celeborn-client which is not a dependency of master/worker, + # so we copy its project-internal dependency jars that are missing from jars/. + for module_jar in \ + "$PROJECT_DIR/lifecycle-manager/target/celeborn-lifecycle-manager_$SCALA_VERSION-$VERSION.jar" \ + "$PROJECT_DIR/client/target/celeborn-client_$SCALA_VERSION-$VERSION.jar"; do + jarname=$(basename "$module_jar") + if [ ! -f "$DIST_DIR/jars/$jarname" ]; then + cp "$module_jar" "$DIST_DIR/jars/" + fi + done + cp "$PROJECT_DIR"/lifecycle-manager/target/celeborn-lifecycle-manager_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/lifecycle-manager-jars/" + for jar in $(ls "$DIST_DIR/jars"); do + (cd $DIST_DIR/lifecycle-manager-jars; ln -snf "../jars/$jar" .) + done } function build_spark_client { @@ -304,12 +320,13 @@ function sbt_build_service { "${BUILD_COMMAND[@]}" - $SBT "celeborn-master/copyJars;celeborn-worker/copyJars;celeborn-cli/copyJars" + $SBT "celeborn-master/copyJars;celeborn-worker/copyJars;celeborn-cli/copyJars;celeborn-lifecycle-manager/copyJars" mkdir -p "$DIST_DIR/jars" mkdir -p "$DIST_DIR/master-jars" mkdir -p "$DIST_DIR/worker-jars" mkdir -p "$DIST_DIR/cli-jars" + mkdir -p "$DIST_DIR/lifecycle-manager-jars" ## Copy master jars cp "$PROJECT_DIR"/master/target/scala-$SCALA_VERSION/celeborn-master_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/master-jars/" @@ -329,6 +346,12 @@ function sbt_build_service { for jar in $(ls "$PROJECT_DIR/cli/target/scala-$SCALA_VERSION/jars"); do (cd $DIST_DIR/cli-jars; ln -snf "../jars/$jar" .) done + ## Copy lifecycle-manager jars + cp "$PROJECT_DIR"/lifecycle-manager/target/scala-$SCALA_VERSION/celeborn-lifecycle-manager_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/lifecycle-manager-jars/" + cp "$PROJECT_DIR"/lifecycle-manager/target/scala-$SCALA_VERSION/jars/*.jar "$DIST_DIR/jars/" + for jar in $(ls "$PROJECT_DIR/lifecycle-manager/target/scala-$SCALA_VERSION/jars"); do + (cd $DIST_DIR/lifecycle-manager-jars; ln -snf "../jars/$jar" .) + done } function sbt_build_client { diff --git a/cpp/celeborn/client/ShuffleClient.cpp b/cpp/celeborn/client/ShuffleClient.cpp index 29ea0accfc1..8d9538dabfd 100644 --- a/cpp/celeborn/client/ShuffleClient.cpp +++ b/cpp/celeborn/client/ShuffleClient.cpp @@ -27,10 +27,11 @@ namespace client { ShuffleClientEndpoint::ShuffleClientEndpoint( const std::shared_ptr& conf) : conf_(conf), - pushDataRetryPool_(std::make_shared( - conf_->clientPushRetryThreads(), - std::make_shared( - "client-pushdata-retrier"))), + pushDataRetryPool_( + std::make_shared( + conf_->clientPushRetryThreads(), + std::make_shared( + "client-pushdata-retrier"))), clientFactory_(std::make_shared(conf_)) { } @@ -153,10 +154,11 @@ int ShuffleClientImpl::pushData( -1, nullptr, protocol::StatusCode::PUSH_DATA_FAIL_NON_CRITICAL_CAUSE)) { - CELEBORN_FAIL(fmt::format( - "Revive for shuffleId {} partitionId {} failed.", - shuffleId, - partitionId)); + CELEBORN_FAIL( + fmt::format( + "Revive for shuffleId {} partitionId {} failed.", + shuffleId, + partitionId)); } partitionLocationOptional = partitionLocationMap->get(partitionId); } @@ -298,10 +300,11 @@ int ShuffleClientImpl::mergeData( -1, nullptr, protocol::StatusCode::PUSH_DATA_FAIL_NON_CRITICAL_CAUSE)) { - CELEBORN_FAIL(fmt::format( - "Revive for shuffleId {} partitionId {} failed.", - shuffleId, - partitionId)); + CELEBORN_FAIL( + fmt::format( + "Revive for shuffleId {} partitionId {} failed.", + shuffleId, + partitionId)); } partitionLocationOptional = partitionLocationMap->get(partitionId); } @@ -780,8 +783,9 @@ std::unique_ptr ShuffleClientImpl::readPartition( std::vector> locations; if (!reducerFileGroupInfo->fileGroups.empty() && reducerFileGroupInfo->fileGroups.count(partitionId)) { - locations = std::move(utils::toVector( - reducerFileGroupInfo->fileGroups.find(partitionId)->second)); + locations = std::move( + utils::toVector( + reducerFileGroupInfo->fileGroups.find(partitionId)->second)); } return std::make_unique( shuffleKey, @@ -926,26 +930,28 @@ void ShuffleClientImpl::registerShuffle( } } } catch (std::exception& e) { - CELEBORN_FAIL(fmt::format( - "registerShuffle encounters error after {} tries, " - "shuffleId {} numMappers {} numPartitions {}, errorMsg: {}", - numRetries, - shuffleId, - numMappers, - numPartitions, - e.what())); + CELEBORN_FAIL( + fmt::format( + "registerShuffle encounters error after {} tries, " + "shuffleId {} numMappers {} numPartitions {}, errorMsg: {}", + numRetries, + shuffleId, + numMappers, + numPartitions, + e.what())); break; } std::this_thread::sleep_for(conf_->clientRegisterShuffleRetryWait()); } partitionLocationMaps_.set(shuffleId, nullptr); - CELEBORN_FAIL(fmt::format( - "registerShuffle failed after {} tries, " - "shuffleId {} numMappers {} numPartitions {}", - maxRetries, - shuffleId, - numMappers, - numPartitions)); + CELEBORN_FAIL( + fmt::format( + "registerShuffle failed after {} tries, " + "shuffleId {} numMappers {} numPartitions {}", + maxRetries, + shuffleId, + numMappers, + numPartitions)); } void ShuffleClientImpl::submitRetryPushData( diff --git a/cpp/celeborn/client/ShuffleClient.h b/cpp/celeborn/client/ShuffleClient.h index 5881ce9cef5..2a4c688016f 100644 --- a/cpp/celeborn/client/ShuffleClient.h +++ b/cpp/celeborn/client/ShuffleClient.h @@ -137,8 +137,9 @@ class ShuffleClientImpl void setupLifecycleManagerRef(std::string& host, int port) override; - void setupLifecycleManagerRef(std::shared_ptr& - lifecycleManagerRef) override; + void setupLifecycleManagerRef( + std::shared_ptr& lifecycleManagerRef) + override; std::shared_ptr( - std::make_exception_ptr(std::system_error( - std::make_error_code(std::errc::connection_refused)))); + auto client = + std::make_shared(std::make_exception_ptr( + std::system_error( + std::make_error_code(std::errc::connection_refused)))); auto factory = std::make_shared(client); auto conf = makeTestConf(); auto excludedWorkers = @@ -333,9 +334,10 @@ TEST(CelebornInputStreamRetryTest, allRetriesExhaustedThrows) { // Verifies that on failure, the retry logic switches from primary to replica. TEST(CelebornInputStreamRetryTest, switchesToPeerOnFailure) { - auto client = std::make_shared( - std::make_exception_ptr(std::system_error( - std::make_error_code(std::errc::connection_refused)))); + auto client = + std::make_shared(std::make_exception_ptr( + std::system_error( + std::make_error_code(std::errc::connection_refused)))); auto factory = std::make_shared(client); auto conf = makeTestConf(); auto excludedWorkers = @@ -374,9 +376,10 @@ TEST(CelebornInputStreamRetryTest, switchesToPeerOnFailure) { // Verifies that critical failures cause workers to be added to the // exclusion list. TEST(CelebornInputStreamRetryTest, excludesCriticalFailures) { - auto client = std::make_shared( - std::make_exception_ptr(std::system_error( - std::make_error_code(std::errc::connection_refused)))); + auto client = + std::make_shared(std::make_exception_ptr( + std::system_error( + std::make_error_code(std::errc::connection_refused)))); auto factory = std::make_shared(client); auto conf = makeTestConf(true); auto excludedWorkers = @@ -448,9 +451,10 @@ TEST(CelebornInputStreamRetryTest, doesNotExcludeNonCriticalFailures) { // Verifies that without a peer, all retries target the same location. TEST(CelebornInputStreamRetryTest, noPeerRetriesSameLocation) { - auto client = std::make_shared( - std::make_exception_ptr(std::system_error( - std::make_error_code(std::errc::connection_refused)))); + auto client = + std::make_shared(std::make_exception_ptr( + std::system_error( + std::make_error_code(std::errc::connection_refused)))); auto factory = std::make_shared(client); // Replication disabled: maxRetry = 2 auto conf = makeTestConf(false); @@ -489,9 +493,10 @@ TEST(CelebornInputStreamRetryTest, noPeerRetriesSameLocation) { // Verifies that with replication enabled, maxRetry is doubled. TEST(CelebornInputStreamRetryTest, replicationDoublesMaxRetries) { - auto client = std::make_shared( - std::make_exception_ptr(std::system_error( - std::make_error_code(std::errc::connection_refused)))); + auto client = + std::make_shared(std::make_exception_ptr( + std::system_error( + std::make_error_code(std::errc::connection_refused)))); auto factory = std::make_shared(client); // Replication enabled: maxRetry = 2 * 2 = 4 auto conf = makeTestConf(true); diff --git a/cpp/celeborn/client/tests/ReviveManagerTest.cpp b/cpp/celeborn/client/tests/ReviveManagerTest.cpp index 7bae1ab2a9a..bfda6e6fd54 100644 --- a/cpp/celeborn/client/tests/ReviveManagerTest.cpp +++ b/cpp/celeborn/client/tests/ReviveManagerTest.cpp @@ -172,7 +172,7 @@ TEST_F(ReviveManagerTest, successOnReviveSuccess) { const std::unordered_set& mapIds, const std::unordered_map& requestsToSend) mutable - -> std::optional> { + -> std::optional> { EXPECT_EQ(testShuffleId, shuffleId); EXPECT_GT(mapIds.count(testMapId), 0); EXPECT_GT(requestsToSend.count(testPartitionId), 0); @@ -220,7 +220,7 @@ TEST_F(ReviveManagerTest, failureOnReviveFailure) { const std::unordered_set& mapIds, const std::unordered_map& requestsToSend) mutable - -> std::optional> { + -> std::optional> { EXPECT_EQ(testShuffleId, shuffleId); EXPECT_GT(mapIds.count(testMapId), 0); EXPECT_GT(requestsToSend.count(testPartitionId), 0); diff --git a/cpp/celeborn/client/writer/PushMergedDataCallback.cpp b/cpp/celeborn/client/writer/PushMergedDataCallback.cpp index 08a9fb5d446..be71c56c93e 100644 --- a/cpp/celeborn/client/writer/PushMergedDataCallback.cpp +++ b/cpp/celeborn/client/writer/PushMergedDataCallback.cpp @@ -123,8 +123,9 @@ void PushMergedDataCallback::onSuccess( response->readToReadOnlyBuffer(response->remainingSize())); PbPushMergedDataSplitPartitionInfo partitionInfo; if (!partitionInfo.ParseFromString(transportMsg->payload())) { - pushState_->setException(std::make_unique( - "Failed to parse PbPushMergedDataSplitPartitionInfo")); + pushState_->setException( + std::make_unique( + "Failed to parse PbPushMergedDataSplitPartitionInfo")); return; } diff --git a/cpp/celeborn/conf/CelebornConf.cpp b/cpp/celeborn/conf/CelebornConf.cpp index ed00cd87154..e7f3e7da05e 100644 --- a/cpp/celeborn/conf/CelebornConf.cpp +++ b/cpp/celeborn/conf/CelebornConf.cpp @@ -29,14 +29,11 @@ std::string bool2String(bool value) { return value ? "true" : "false"; } -#define STR_PROP(_key_, _val_) \ - { std::string(_key_), std::string(_val_) } +#define STR_PROP(_key_, _val_) {std::string(_key_), std::string(_val_)} #define NUM_PROP(_key_, _val_) \ - { std::string(_key_), folly::to(_val_) } -#define BOOL_PROP(_key_, _val_) \ - { std::string(_key_), bool2String(_val_) } -#define NONE_PROP(_key_) \ - { std::string(_key_), folly::none } + {std::string(_key_), folly::to(_val_)} +#define BOOL_PROP(_key_, _val_) {std::string(_key_), bool2String(_val_)} +#define NONE_PROP(_key_) {std::string(_key_), folly::none} enum class CapacityUnit { BYTE, diff --git a/cpp/celeborn/network/TransportClient.cpp b/cpp/celeborn/network/TransportClient.cpp index c754eecd037..d26fd01b279 100644 --- a/cpp/celeborn/network/TransportClient.cpp +++ b/cpp/celeborn/network/TransportClient.cpp @@ -95,8 +95,9 @@ void TransportClient::pushDataAsync( reinterpret_cast(responseMsg.get()); _callback->onSuccess(rpcResponse->body()); } else { - _callback->onFailure(std::make_unique( - "pushData return value type is not rpcResponse")); + _callback->onFailure( + std::make_unique( + "pushData return value type is not rpcResponse")); } }) .thenError([_callback = callback](const folly::exception_wrapper& e) { @@ -132,8 +133,9 @@ void TransportClient::pushMergedDataAsync( reinterpret_cast(responseMsg.get()); _callback->onSuccess(rpcResponse->body()); } else { - _callback->onFailure(std::make_unique( - "pushMergedData return value type is not rpcResponse")); + _callback->onFailure( + std::make_unique( + "pushMergedData return value type is not rpcResponse")); } }) .thenError([_callback = callback](const folly::exception_wrapper& e) { diff --git a/cpp/celeborn/utils/tests/ExceptionTest.cpp b/cpp/celeborn/utils/tests/ExceptionTest.cpp index 85862ee54c0..a8cc942fdc7 100644 --- a/cpp/celeborn/utils/tests/ExceptionTest.cpp +++ b/cpp/celeborn/utils/tests/ExceptionTest.cpp @@ -109,11 +109,12 @@ void testExceptionTraceCollectionControl(bool userException, bool enabled) { false); } } catch (CelebornException& e) { - SCOPED_TRACE(fmt::format( - "enabled: {}, user flag: {}, sys flag: {}", - enabled, - FLAGS_celeborn_exception_user_stacktrace_enabled, - FLAGS_celeborn_exception_system_stacktrace_enabled)); + SCOPED_TRACE( + fmt::format( + "enabled: {}, user flag: {}, sys flag: {}", + enabled, + FLAGS_celeborn_exception_user_stacktrace_enabled, + FLAGS_celeborn_exception_system_stacktrace_enabled)); ASSERT_EQ( userException, e.exceptionType() == CelebornException::Type::kUser); ASSERT_EQ(enabled, e.stackTrace() != nullptr); @@ -174,12 +175,13 @@ void testExceptionTraceCollectionRateControl( false); } } catch (CelebornException& e) { - SCOPED_TRACE(fmt::format( - "userException: {}, hasRateLimit: {}, user limit: {}ms, sys limit: {}ms", - userException, - hasRateLimit, - FLAGS_celeborn_exception_user_stacktrace_rate_limit_ms, - FLAGS_celeborn_exception_system_stacktrace_rate_limit_ms)); + SCOPED_TRACE( + fmt::format( + "userException: {}, hasRateLimit: {}, user limit: {}ms, sys limit: {}ms", + userException, + hasRateLimit, + FLAGS_celeborn_exception_user_stacktrace_rate_limit_ms, + FLAGS_celeborn_exception_system_stacktrace_rate_limit_ms)); ASSERT_EQ( userException, e.exceptionType() == CelebornException::Type::kUser); ASSERT_EQ(!hasRateLimit || ((iter % 2) == 0), e.stackTrace() != nullptr); diff --git a/lifecycle-manager/pom.xml b/lifecycle-manager/pom.xml new file mode 100644 index 00000000000..68177c3e456 --- /dev/null +++ b/lifecycle-manager/pom.xml @@ -0,0 +1,68 @@ + + + + 4.0.0 + + + org.apache.celeborn + celeborn-parent_${scala.binary.version} + ${project.version} + ../pom.xml + + + celeborn-lifecycle-manager_${scala.binary.version} + jar + Celeborn Lifecycle Manager + + + + org.apache.celeborn + celeborn-service_${scala.binary.version} + ${project.version} + + + org.apache.celeborn + celeborn-client_${scala.binary.version} + ${project.version} + + + org.apache.celeborn + celeborn-common_${scala.binary.version} + ${project.version} + + + + + org.apache.celeborn + celeborn-common_${scala.binary.version} + ${project.version} + test-jar + test + + + org.mockito + mockito-core + test + + + org.scalatest + scalatest_${scala.binary.version} + test + + + diff --git a/lifecycle-manager/src/main/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemon.scala b/lifecycle-manager/src/main/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemon.scala new file mode 100644 index 00000000000..52469658166 --- /dev/null +++ b/lifecycle-manager/src/main/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemon.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.lifecyclemanager + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicReference + +import org.apache.celeborn.client.LifecycleManager +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.util.{SignalUtils, Utils} + +object LifecycleManagerDaemon extends Logging { + + private[lifecyclemanager] val shutdownLatch: CountDownLatch = new CountDownLatch(1) + + private[lifecyclemanager] val currentInstance: AtomicReference[LifecycleManager] = + new AtomicReference[LifecycleManager]() + + private[lifecyclemanager] var exitFn: Int => Unit = + (code: Int) => System.exit(code) + + def main(args: Array[String]): Unit = { + SignalUtils.registerLogger(log) + + val parsedArgs = LifecycleManagerDaemonArguments.parse(args) + val conf = new CelebornConf() + + // Load properties file before applying CLI args + Utils.loadDefaultCelebornProperties(conf, parsedArgs.propertiesFile.orNull) + + applyArgsToConf(parsedArgs, conf) + + // Auth check: standalone LM does not support auth (cpp/Rust client lacks SASL) + if (conf.authEnabledOnClient) { + logError( + "Standalone LifecycleManager does not support auth " + + "(cpp/Rust client lacks SASL); set celeborn.auth.enabled=false") + exitFn(1) + return + } + + // Propagate --host to Utils so LifecycleManager binds to the requested hostname + parsedArgs.host.foreach { host => + logInfo(s"Setting custom hostname from --host: $host") + Utils.setCustomHostname(host) + } + + logInfo(s"Parsed args: appId=${parsedArgs.appId}, port=${parsedArgs.port}, " + + s"masterEndpoints=${parsedArgs.masterEndpoints}") + + try { + val lm = new LifecycleManager(parsedArgs.appId, conf) + currentInstance.set(lm) + + installShutdownHook(conf) + + // scalastyle:off println + println(s"LifecycleManager bound at ${lm.getHost}:${lm.getPort}") + // scalastyle:on println + + logInfo("shutdown hook installed; press Ctrl-C to stop.") + + shutdownLatch.await() + exitFn(0) + } catch { + case e: Exception => + logError("Failed to start LifecycleManager", e) + exitFn(1) + } + } + + private[lifecyclemanager] def runUntilStopped(lm: LifecycleManager): Unit = { + currentInstance.set(lm) + shutdownLatch.await() + } + + private[lifecyclemanager] def applyArgsToConf( + args: LifecycleManagerDaemonArguments, + conf: CelebornConf): Unit = { + conf.set(CelebornConf.MASTER_ENDPOINTS.key, args.masterEndpoints) + conf.set(CelebornConf.CLIENT_SHUFFLE_MANAGER_PORT.key, args.port.toString) + } + + private def installShutdownHook(conf: CelebornConf): Unit = { + val shutdownTimeoutMs = conf.appHeartbeatTimeoutMs / 2 + + // Watchdog: force halt if shutdown takes too long + val watchdog = new Thread("celeborn-lm-shutdown-watchdog") { + override def run(): Unit = { + try { + Thread.sleep(shutdownTimeoutMs) + logError(s"Shutdown exceeded ${shutdownTimeoutMs}ms, forcing halt") + Runtime.getRuntime.halt(2) + } catch { + case _: InterruptedException => // normal exit, watchdog no longer needed + } + } + } + watchdog.setDaemon(true) + + Runtime.getRuntime.addShutdownHook(new Thread("celeborn-lm-shutdown") { + override def run(): Unit = { + watchdog.start() + val lm = currentInstance.get() + if (lm != null) { + try { + lm.stop() + } catch { + case t: Throwable => logError("lm.stop() failed", t) + } + } + shutdownLatch.countDown() + } + }) + } +} diff --git a/lifecycle-manager/src/main/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemonArguments.scala b/lifecycle-manager/src/main/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemonArguments.scala new file mode 100644 index 00000000000..e6a02a2a425 --- /dev/null +++ b/lifecycle-manager/src/main/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemonArguments.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.lifecyclemanager + +import scala.annotation.tailrec + +import org.apache.celeborn.common.util.IntParam + +private[lifecyclemanager] case class LifecycleManagerDaemonArguments( + appId: String, + masterEndpoints: String, + port: Int, + host: Option[String], + propertiesFile: Option[String]) + +private[lifecyclemanager] object LifecycleManagerDaemonArguments { + + private val MIN_USER_PORT = 1024 + + def parse(args: Array[String]): LifecycleManagerDaemonArguments = { + var appId: Option[String] = None + var masterEndpoints: Option[String] = None + var port: Option[Int] = None + var host: Option[String] = None + var propertiesFile: Option[String] = None + + @tailrec + def doParse(remaining: List[String]): Unit = remaining match { + case "--app-id" :: value :: tail => + appId = Some(value) + doParse(tail) + + case "--master-endpoints" :: value :: tail => + masterEndpoints = Some(value) + doParse(tail) + + case ("--port" | "-p") :: IntParam(value) :: tail => + port = Some(value) + doParse(tail) + + case ("--host" | "-h") :: value :: tail => + host = Some(value) + doParse(tail) + + case "--properties-file" :: value :: tail => + propertiesFile = Some(value) + doParse(tail) + + case "--help" :: _ => + // scalastyle:off println + System.err.println(usage) + // scalastyle:on println + sys.exit(0) + + case Nil => // done + + case unknown :: _ => + // scalastyle:off println + System.err.println(s"Unknown argument: $unknown") + System.err.println(usage) + // scalastyle:on println + sys.exit(1) + } + + doParse(args.toList) + + if (appId.isEmpty) { + // scalastyle:off println + System.err.println("Error: --app-id is required.") + System.err.println(usage) + // scalastyle:on println + sys.exit(1) + } + if (masterEndpoints.isEmpty) { + // scalastyle:off println + System.err.println("Error: --master-endpoints is required.") + System.err.println(usage) + // scalastyle:on println + sys.exit(1) + } + if (port.isEmpty) { + // scalastyle:off println + System.err.println("Error: --port is required.") + System.err.println(usage) + // scalastyle:on println + sys.exit(1) + } + if (port.get < MIN_USER_PORT) { + // scalastyle:off println + System.err.println(s"Error: --port must be >= $MIN_USER_PORT, got ${port.get}.") + System.err.println(usage) + // scalastyle:on println + sys.exit(1) + } + + LifecycleManagerDaemonArguments( + appId = appId.get, + masterEndpoints = masterEndpoints.get, + port = port.get, + host = host, + propertiesFile = propertiesFile) + } + + val usage: String = + """Usage: LifecycleManagerDaemon [options] + | + |Options: + | --app-id ID Application unique identifier (required) + | --master-endpoints ENDPOINTS Comma-separated master host:port list (required) + | -p PORT, --port PORT Port for LifecycleManager to listen on (required, >= 1024) + | -h HOST, --host HOST Hostname to bind (optional, default: auto-detect) + | --properties-file FILE Path to a custom Celeborn properties file, + | default is conf/celeborn-defaults.conf + |""".stripMargin +} diff --git a/lifecycle-manager/src/test/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemonArgumentsSuite.scala b/lifecycle-manager/src/test/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemonArgumentsSuite.scala new file mode 100644 index 00000000000..d9d35274834 --- /dev/null +++ b/lifecycle-manager/src/test/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemonArgumentsSuite.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.lifecyclemanager + +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.celeborn.common.internal.Logging + +class LifecycleManagerDaemonArgumentsSuite extends AnyFunSuite with Logging { + + test("parse all required arguments") { + val args = Array( + "--app-id", + "test-app-1", + "--master-endpoints", + "host1:9097,host2:9097", + "--port", + "39099") + val parsed = LifecycleManagerDaemonArguments.parse(args) + assert(parsed.appId === "test-app-1") + assert(parsed.masterEndpoints === "host1:9097,host2:9097") + assert(parsed.port === 39099) + assert(parsed.host.isEmpty) + assert(parsed.propertiesFile.isEmpty) + } + + test("parse all arguments including optional ones") { + val args = Array( + "--app-id", + "my-app", + "--master-endpoints", + "localhost:9097", + "--port", + "40000", + "--host", + "my-host", + "--properties-file", + "/tmp/celeborn.conf") + val parsed = LifecycleManagerDaemonArguments.parse(args) + assert(parsed.appId === "my-app") + assert(parsed.masterEndpoints === "localhost:9097") + assert(parsed.port === 40000) + assert(parsed.host === Some("my-host")) + assert(parsed.propertiesFile === Some("/tmp/celeborn.conf")) + } + + test("parse with short flags") { + val args = Array( + "--app-id", + "short-app", + "--master-endpoints", + "host:9097", + "-p", + "2048", + "-h", + "short-host") + val parsed = LifecycleManagerDaemonArguments.parse(args) + assert(parsed.appId === "short-app") + assert(parsed.port === 2048) + assert(parsed.host === Some("short-host")) + } + + test("parse minimum valid port 1024") { + val args = Array( + "--app-id", + "app", + "--master-endpoints", + "host:9097", + "--port", + "1024") + val parsed = LifecycleManagerDaemonArguments.parse(args) + assert(parsed.port === 1024) + } + + test("usage string contains all options") { + val usageText = LifecycleManagerDaemonArguments.usage + assert(usageText.contains("--app-id")) + assert(usageText.contains("--master-endpoints")) + assert(usageText.contains("--port")) + assert(usageText.contains("--host")) + assert(usageText.contains("--properties-file")) + } +} diff --git a/pom.xml b/pom.xml index e78061c36dd..1709e9bb21a 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,7 @@ service master worker + lifecycle-manager cli diff --git a/rust/.gitignore b/rust/.gitignore new file mode 100644 index 00000000000..2c96eb1b651 --- /dev/null +++ b/rust/.gitignore @@ -0,0 +1,2 @@ +target/ +Cargo.lock diff --git a/rust/Cargo.toml b/rust/Cargo.toml new file mode 100644 index 00000000000..8a966b4df95 --- /dev/null +++ b/rust/Cargo.toml @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[workspace] +members = [ + "celeborn-client-sys", + "celeborn-client", +] +resolver = "2" diff --git a/rust/celeborn-client-sys/Cargo.toml b/rust/celeborn-client-sys/Cargo.toml new file mode 100644 index 00000000000..9ef93e3495d --- /dev/null +++ b/rust/celeborn-client-sys/Cargo.toml @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[package] +name = "celeborn-client-sys" +version = "0.1.0" +edition = "2021" +description = "Low-level FFI bindings to Apache Celeborn C++ client via cxx" +publish = false + +[dependencies] +cxx = "1.0" + +[build-dependencies] +cxx-build = "1.0" diff --git a/rust/celeborn-client-sys/build.rs b/rust/celeborn-client-sys/build.rs new file mode 100644 index 00000000000..247770d7911 --- /dev/null +++ b/rust/celeborn-client-sys/build.rs @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::{Path, PathBuf}; +use std::process::Command; + +/// Resolve the Celeborn C++ install prefix. +/// +/// - If the environment variable `CELEBORN_CPP_PREFIX` is set, use that path +/// directly (pre-built library workflow). +/// - Otherwise, locate the in-repo `cpp/` source tree relative to this crate +/// and drive a full cmake configure → build → install cycle, installing into +/// `$OUT_DIR/celeborn-cpp-install`. This mirrors the approach used by +/// `openssl-sys` / `libz-sys` and friends. +fn resolve_cpp_prefix() -> PathBuf { + if let Ok(prefix) = std::env::var("CELEBORN_CPP_PREFIX") { + let path = PathBuf::from(&prefix); + if !path.exists() { + panic!( + "CELEBORN_CPP_PREFIX is set to '{prefix}' but the directory does not exist. \ + Please check the path." + ); + } + eprintln!("cargo:warning=Using pre-built Celeborn C++ libraries from {prefix}"); + return path; + } + + // Fall back to building from the in-repo cpp/ source tree. + let manifest_dir = + PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap()); + let cpp_source_dir = manifest_dir.join("../../cpp").canonicalize().unwrap_or_else(|_| { + panic!( + "CELEBORN_CPP_PREFIX is not set and the in-repo cpp/ directory \ + could not be found relative to {manifest_dir:?}. \ + Either set CELEBORN_CPP_PREFIX or run from within the Celeborn \ + source tree." + ); + }); + + eprintln!( + "cargo:warning=CELEBORN_CPP_PREFIX not set – building Celeborn C++ \ + from source at {}", + cpp_source_dir.display() + ); + + cmake_build_cpp(&cpp_source_dir) +} + +/// Run cmake configure + build + install for the Celeborn C++ project. +/// +/// Returns the install prefix (i.e. the directory that contains `include/` and +/// `lib/`). +fn cmake_build_cpp(source_dir: &Path) -> PathBuf { + let out_dir = PathBuf::from(std::env::var("OUT_DIR").unwrap()); + let build_dir = out_dir.join("celeborn-cpp-build"); + let install_dir = out_dir.join("celeborn-cpp-install"); + + std::fs::create_dir_all(&build_dir).expect("failed to create cmake build directory"); + std::fs::create_dir_all(&install_dir).expect("failed to create cmake install directory"); + + let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap(); + + // ── cmake configure ──────────────────────────────────────────────── + let mut configure_cmd = Command::new("cmake"); + configure_cmd + .current_dir(&build_dir) + .arg(source_dir) + .arg(format!("-DCMAKE_INSTALL_PREFIX={}", install_dir.display())) + .arg("-DCMAKE_BUILD_TYPE=Release") + .arg("-DCELEBORN_BUILD_TESTS=OFF"); + + // On macOS, help cmake find Homebrew-installed dependencies. + // We pass CMAKE_PREFIX_PATH as a single path to avoid a known issue in the + // upstream CMakeLists.txt where `if(EXISTS ${CMAKE_PREFIX_PATH}/lib64)` + // breaks when CMAKE_PREFIX_PATH contains multiple semicolon-separated + // entries. Homebrew's default prefix is already on cmake's search path, + // so we only need to add the keg-only OpenSSL prefix explicitly. + if target_os == "macos" { + let homebrew_prefix = std::env::var("HOMEBREW_PREFIX") + .unwrap_or_else(|_| "/opt/homebrew".to_string()); + configure_cmd.arg(format!( + "-DCMAKE_PREFIX_PATH={homebrew_prefix}" + )); + // OpenSSL is keg-only on Homebrew. The upstream CMakeLists.txt + // overwrites the cmake-level OPENSSL_ROOT_DIR variable with a + // `set()` call, so passing `-DOPENSSL_ROOT_DIR` is not enough. + // Instead we set the *environment* variable which FindOpenSSL + // also inspects, and which cannot be shadowed by a cmake `set()`. + configure_cmd.env( + "OPENSSL_ROOT_DIR", + format!("{homebrew_prefix}/opt/openssl@3"), + ); + } + + let configure_status = configure_cmd + .status() + .expect("failed to execute `cmake` – is cmake installed?"); + if !configure_status.success() { + panic!("cmake configure step failed (exit code: {configure_status})"); + } + + // ── cmake build ──────────────────────────────────────────────────── + let num_jobs = std::env::var("NUM_JOBS").unwrap_or_else(|_| num_cpus().to_string()); + + let build_status = Command::new("cmake") + .current_dir(&build_dir) + .args(["--build", "."]) + .args(["--config", "Release"]) + .args(["--parallel", &num_jobs]) + .status() + .expect("failed to execute cmake --build"); + if !build_status.success() { + panic!("cmake build step failed (exit code: {build_status})"); + } + + // ── cmake install ────────────────────────────────────────────────── + let install_status = Command::new("cmake") + .current_dir(&build_dir) + .args(["--install", "."]) + .status() + .expect("failed to execute cmake --install"); + if !install_status.success() { + panic!("cmake install step failed (exit code: {install_status})"); + } + + install_dir +} + +/// Best-effort CPU count for parallel builds (mirrors `cargo`'s own heuristic). +fn num_cpus() -> usize { + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(4) +} + +fn main() { + let prefix = resolve_cpp_prefix(); + let prefix = prefix.display(); + + let target_arch = std::env::var("CARGO_CFG_TARGET_ARCH").unwrap(); + let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap(); + + let mut builder = cxx_build::bridge("src/lib.rs"); + builder + .file("src/wrapper.cc") + .include(format!("{prefix}/include")) + .include("include") + .flag_if_supported("-std=c++17"); + + // Include paths for system dependencies (folly, boost, etc.) + if target_os == "macos" { + builder.include("/opt/homebrew/include"); + builder.include("/opt/homebrew/opt/openssl@3/include"); + } + + if target_arch == "x86_64" { + builder.flag_if_supported("-msse4.2"); + } + + builder.compile("celeborn_ffi_wrapper"); + + // Link cpp/ static libraries (order matters: dependents first) + println!("cargo:rustc-link-search=native={prefix}/lib"); + for lib in &["client", "protocol", "network", "proto", "memory", "conf", "utils"] { + println!("cargo:rustc-link-lib=static={lib}"); + } + + // System dependencies + let base_dylibs = [ + "folly", "glog", "gflags", "protobuf", "fizz", "wangle", + "ssl", "crypto", "sodium", "lz4", "zstd", "z", + "fmt", "xxhash", "re2", + "double-conversion", "event", + ]; + for lib in &base_dylibs { + println!("cargo:rustc-link-lib=dylib={lib}"); + } + + // abseil libraries required by protobuf v28+ + let abseil_libs = [ + "absl_log_internal_check_op", + "absl_log_internal_conditions", + "absl_log_internal_message", + "absl_log_internal_nullguard", + "absl_log_internal_format", + "absl_log_internal_globals", + "absl_log_internal_log_sink_set", + "absl_log_internal_proto", + "absl_log_internal_fnmatch", + "absl_log_entry", + "absl_log_globals", + "absl_log_initialize", + "absl_log_severity", + "absl_log_sink", + "absl_raw_logging_internal", + "absl_hash", + "absl_low_level_hash", + "absl_city", + "absl_raw_hash_set", + "absl_hashtablez_sampler", + "absl_status", + "absl_statusor", + "absl_strings", + "absl_strings_internal", + "absl_string_view", + "absl_str_format_internal", + "absl_base", + "absl_spinlock_wait", + "absl_int128", + "absl_throw_delegate", + "absl_time", + "absl_time_zone", + "absl_civil_time", + "absl_synchronization", + "absl_stacktrace", + "absl_symbolize", + "absl_debugging_internal", + "absl_demangle_internal", + "absl_malloc_internal", + "absl_exponential_biased", + "absl_strerror", + "absl_cord", + "absl_cord_internal", + "absl_cordz_functions", + "absl_cordz_handle", + "absl_cordz_info", + "absl_cordz_sample_token", + "absl_crc32c", + "absl_crc_cord_state", + "absl_crc_cpu_detect", + "absl_crc_internal", + "absl_die_if_null", + "absl_examine_stack", + "absl_vlog_config_internal", + "absl_kernel_timeout_internal", + ]; + for lib in &abseil_libs { + println!("cargo:rustc-link-lib=dylib={lib}"); + } + + // Boost libraries: macOS Homebrew uses -mt suffix for some components + let boost_components = [ + "context", "system", "filesystem", "thread", + "regex", "atomic", "date_time", "program_options", + ]; + if target_os == "macos" { + for comp in &boost_components { + // Try -mt variant first (macOS Homebrew convention) + println!("cargo:rustc-link-lib=dylib=boost_{comp}-mt"); + } + } else { + for comp in &boost_components { + println!("cargo:rustc-link-lib=dylib=boost_{comp}"); + } + } + + if target_os == "linux" { + println!("cargo:rustc-link-lib=dylib=unwind"); + println!("cargo:rustc-link-lib=dylib=stdc++"); + } else if target_os == "macos" { + println!("cargo:rustc-link-search=native=/opt/homebrew/lib"); + println!("cargo:rustc-link-search=native=/opt/homebrew/opt/openssl@3/lib"); + println!("cargo:rustc-link-lib=dylib=c++"); + } else { + panic!( + "unsupported target_os: {target_os} \ + (first version only supports linux/macos)" + ); + } + + println!("cargo:rerun-if-changed=src/wrapper.cc"); + println!("cargo:rerun-if-changed=include/wrapper.h"); + println!("cargo:rerun-if-env-changed=CELEBORN_CPP_PREFIX"); +} diff --git a/rust/celeborn-client-sys/include/wrapper.h b/rust/celeborn-client-sys/include/wrapper.h new file mode 100644 index 00000000000..50acbf1d6bf --- /dev/null +++ b/rust/celeborn-client-sys/include/wrapper.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include "rust/cxx.h" +#include "celeborn/client/ShuffleClient.h" +#include "celeborn/conf/CelebornConf.h" + +namespace celeborn_ffi { + +struct ShuffleClientHandle { + std::shared_ptr conf; + std::shared_ptr endpoint; + std::shared_ptr client; + std::string app_id; + std::string lifecycle_manager_host; +}; + +std::unique_ptr create_client( + const std::string& app_id, + int32_t push_buffer_max_size, + const std::string& shuffle_compression_codec); + +void setup_lifecycle_manager( + ShuffleClientHandle& handle, const std::string& host, int32_t port); + +void shutdown(ShuffleClientHandle& handle); + +void push_data(ShuffleClientHandle& handle, + int32_t shuffle_id, int32_t map_id, int32_t attempt_id, + int32_t partition_id, + rust::Slice data, + int32_t num_mappers, int32_t num_partitions); + +void mapper_end(ShuffleClientHandle& handle, + int32_t shuffle_id, int32_t map_id, + int32_t attempt_id, int32_t num_mappers); + +void update_reducer_file_group(ShuffleClientHandle& handle, int32_t shuffle_id); + +rust::Vec read_partition_full( + ShuffleClientHandle& handle, + int32_t shuffle_id, + int32_t partition_id, + int32_t attempt_number, + int32_t start_map_index, + int32_t end_map_index); + +} // namespace celeborn_ffi diff --git a/rust/celeborn-client-sys/src/lib.rs b/rust/celeborn-client-sys/src/lib.rs new file mode 100644 index 00000000000..3afe40a88f6 --- /dev/null +++ b/rust/celeborn-client-sys/src/lib.rs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cxx::bridge(namespace = "celeborn_ffi")] +pub mod ffi { + unsafe extern "C++" { + include!("wrapper.h"); + + type ShuffleClientHandle; + + fn create_client( + app_id: &CxxString, + push_buffer_max_size: i32, + shuffle_compression_codec: &CxxString, + ) -> Result>; + + fn setup_lifecycle_manager( + handle: Pin<&mut ShuffleClientHandle>, + host: &CxxString, + port: i32, + ) -> Result<()>; + + fn shutdown(handle: Pin<&mut ShuffleClientHandle>) -> Result<()>; + + fn push_data( + handle: Pin<&mut ShuffleClientHandle>, + shuffle_id: i32, + map_id: i32, + attempt_id: i32, + partition_id: i32, + data: &[u8], + num_mappers: i32, + num_partitions: i32, + ) -> Result<()>; + + fn mapper_end( + handle: Pin<&mut ShuffleClientHandle>, + shuffle_id: i32, + map_id: i32, + attempt_id: i32, + num_mappers: i32, + ) -> Result<()>; + + fn update_reducer_file_group( + handle: Pin<&mut ShuffleClientHandle>, + shuffle_id: i32, + ) -> Result<()>; + + fn read_partition_full( + handle: Pin<&mut ShuffleClientHandle>, + shuffle_id: i32, + partition_id: i32, + attempt_number: i32, + start_map_index: i32, + end_map_index: i32, + ) -> Result>; + } +} diff --git a/rust/celeborn-client-sys/src/wrapper.cc b/rust/celeborn-client-sys/src/wrapper.cc new file mode 100644 index 00000000000..2147b5eafb4 --- /dev/null +++ b/rust/celeborn-client-sys/src/wrapper.cc @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "wrapper.h" +#include +#include +#include +#include + +// Access rust::Vec::set_len which is private in cxx 1.x. +// Uses the well-known C++ explicit-instantiation access technique (ISO 14882 §14.7.2) +// to obtain a pointer-to-member for the private method, then invokes it. +// This avoids per-byte push_back overhead when bulk-copying data into rust::Vec. +namespace detail { +using SetLenFn = void (rust::Vec::*)(size_t); +template struct VecSetLenAccessor { + friend void vec_unsafe_set_len(rust::Vec& v, size_t len) { + (v.*Fn)(len); + } +}; +// The explicit instantiation is granted access to all members per the standard. +template struct VecSetLenAccessor<&rust::Vec::set_len>; +void vec_unsafe_set_len(rust::Vec& v, size_t len); +} // namespace detail + +namespace celeborn_ffi { + +std::unique_ptr create_client( + const std::string& app_id, + int32_t push_buffer_max_size, + const std::string& shuffle_compression_codec) { + try { + auto handle = std::make_unique(); + handle->app_id = app_id; + handle->conf = std::make_shared(); + + if (push_buffer_max_size > 0) { + handle->conf->registerProperty( + celeborn::conf::CelebornConf::kClientPushBufferMaxSize, + std::to_string(push_buffer_max_size) + "b"); + } + + if (!shuffle_compression_codec.empty()) { + handle->conf->registerProperty( + celeborn::conf::CelebornConf::kShuffleCompressionCodec, + shuffle_compression_codec); + } + + handle->endpoint = std::make_shared( + handle->conf); + handle->client = celeborn::client::ShuffleClientImpl::create( + app_id, handle->conf, *(handle->endpoint)); + + return handle; + } catch (const std::exception& e) { + throw std::runtime_error(std::string("celeborn-ffi: ") + e.what()); + } catch (...) { + throw std::runtime_error("celeborn-ffi: unknown C++ exception"); + } +} + +void setup_lifecycle_manager( + ShuffleClientHandle& handle, const std::string& host, int32_t port) { + try { + handle.lifecycle_manager_host = host; + handle.client->setupLifecycleManagerRef(handle.lifecycle_manager_host, port); + } catch (const std::exception& e) { + throw std::runtime_error(std::string("celeborn-ffi: ") + e.what()); + } catch (...) { + throw std::runtime_error("celeborn-ffi: unknown C++ exception"); + } +} + +void shutdown(ShuffleClientHandle& handle) { + try { + handle.client->shutdown(); + } catch (const std::exception& e) { + throw std::runtime_error(std::string("celeborn-ffi: ") + e.what()); + } catch (...) { + throw std::runtime_error("celeborn-ffi: unknown C++ exception"); + } +} + +void push_data(ShuffleClientHandle& handle, + int32_t shuffle_id, int32_t map_id, int32_t attempt_id, + int32_t partition_id, + rust::Slice data, + int32_t num_mappers, int32_t num_partitions) { + try { + handle.client->pushData( + shuffle_id, map_id, attempt_id, partition_id, + data.data(), 0, + static_cast(data.size()), + num_mappers, num_partitions); + } catch (const std::exception& e) { + throw std::runtime_error(std::string("celeborn-ffi: ") + e.what()); + } catch (...) { + throw std::runtime_error("celeborn-ffi: unknown C++ exception"); + } +} + +void mapper_end(ShuffleClientHandle& handle, + int32_t shuffle_id, int32_t map_id, + int32_t attempt_id, int32_t num_mappers) { + try { + handle.client->mapperEnd(shuffle_id, map_id, attempt_id, num_mappers); + } catch (const std::exception& e) { + throw std::runtime_error(std::string("celeborn-ffi: ") + e.what()); + } catch (...) { + throw std::runtime_error("celeborn-ffi: unknown C++ exception"); + } +} + +void update_reducer_file_group(ShuffleClientHandle& handle, int32_t shuffle_id) { + try { + handle.client->updateReducerFileGroup(shuffle_id); + } catch (const std::exception& e) { + throw std::runtime_error(std::string("celeborn-ffi: ") + e.what()); + } catch (...) { + throw std::runtime_error("celeborn-ffi: unknown C++ exception"); + } +} + +rust::Vec read_partition_full( + ShuffleClientHandle& handle, + int32_t shuffle_id, + int32_t partition_id, + int32_t attempt_number, + int32_t start_map_index, + int32_t end_map_index) { + try { + auto stream = handle.client->readPartition( + shuffle_id, partition_id, attempt_number, start_map_index, end_map_index); + + // Accumulate into std::vector (memcpy-based insert) to avoid per-byte + // push_back overhead on rust::Vec during the read loop, then bulk-copy + // into rust::Vec via reserve_total + memcpy + set_len. + constexpr size_t kReadBufSize = 64 * 1024; + std::vector accumulated; + accumulated.reserve(kReadBufSize); + std::vector buf(kReadBufSize); + + while (true) { + int n = stream->read(buf.data(), 0, buf.size()); + if (n == -1) { + break; + } + if (n <= 0) { + throw std::runtime_error( + "celeborn-ffi: CelebornInputStream::read returned unexpected non-positive " + + std::to_string(n)); + } + accumulated.insert(accumulated.end(), buf.data(), buf.data() + n); + } + + rust::Vec out; + out.reserve(accumulated.size()); + // SAFETY: reserve() allocated at least accumulated.size() bytes. + // memcpy writes into that region, then set_len makes them visible. + std::memcpy(out.data(), accumulated.data(), accumulated.size()); + detail::vec_unsafe_set_len(out, accumulated.size()); + return out; + } catch (const std::exception& e) { + throw std::runtime_error(std::string("celeborn-ffi: ") + e.what()); + } catch (...) { + throw std::runtime_error("celeborn-ffi: unknown C++ exception"); + } +} + +} // namespace celeborn_ffi diff --git a/rust/celeborn-client/Cargo.toml b/rust/celeborn-client/Cargo.toml new file mode 100644 index 00000000000..7f6481086cf --- /dev/null +++ b/rust/celeborn-client/Cargo.toml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[package] +name = "celeborn-client" +version = "0.1.0" +edition = "2021" +description = "Rust-friendly wrapper around Apache Celeborn C++ client FFI" +publish = false + +[dependencies] +celeborn-client-sys = { path = "../celeborn-client-sys" } +cxx = "1.0" +thiserror = "1.0" +log = "0.4" + +[dev-dependencies] +env_logger = "0.11" + +[[example]] +name = "data_sum_writer" +path = "../examples/data_sum_writer.rs" + +[[example]] +name = "data_sum_reader" +path = "../examples/data_sum_reader.rs" diff --git a/rust/celeborn-client/src/lib.rs b/rust/celeborn-client/src/lib.rs new file mode 100644 index 00000000000..78bb0ff9e0b --- /dev/null +++ b/rust/celeborn-client/src/lib.rs @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Rust-friendly wrapper around `celeborn-client-sys`. + +use celeborn_client_sys::ffi; +use cxx::UniquePtr; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("celeborn ffi error: {0}")] + Ffi(#[from] cxx::Exception), + #[error("invalid argument: {0}")] + InvalidArg(&'static str), +} + +pub type Result = std::result::Result; + +/// Configuration for connecting to a Celeborn LifecycleManager. +pub struct Config { + pub app_id: String, + /// Max push buffer size in bytes. 0 means use cpp default (64kB). + pub push_buffer_max_size: i32, + /// Compression codec: "NONE", "LZ4", or "ZSTD". + pub shuffle_compression_codec: String, +} + +impl Config { + pub fn new(app_id: String) -> Self { + Self { + app_id, + push_buffer_max_size: 0, + shuffle_compression_codec: "NONE".to_string(), + } + } +} + +/// A Rust-friendly Celeborn shuffle client backed by the C++ implementation. +pub struct ShuffleClient { + inner: UniquePtr, +} + +impl ShuffleClient { + /// Connect to a running LifecycleManager at `lm_host:lm_port`. + pub fn connect(config: Config, lm_host: &str, lm_port: i32) -> Result { + if config.app_id.is_empty() { + return Err(Error::InvalidArg("app_id is empty")); + } + if lm_port <= 0 { + return Err(Error::InvalidArg("lm_port must be > 0")); + } + let valid_codecs = ["NONE", "LZ4", "ZSTD"]; + if !valid_codecs.contains(&config.shuffle_compression_codec.as_str()) { + return Err(Error::InvalidArg( + "shuffle_compression_codec must be NONE, LZ4, or ZSTD", + )); + } + + cxx::let_cxx_string!(app_id_cxx = &config.app_id); + cxx::let_cxx_string!(codec_cxx = &config.shuffle_compression_codec); + let mut handle = + ffi::create_client(&app_id_cxx, config.push_buffer_max_size, &codec_cxx)?; + + cxx::let_cxx_string!(host_cxx = lm_host); + ffi::setup_lifecycle_manager(handle.pin_mut(), &host_cxx, lm_port)?; + + Ok(Self { inner: handle }) + } + + /// Push data for a specific partition. + pub fn push_data( + &mut self, + shuffle_id: i32, + map_id: i32, + attempt_id: i32, + partition_id: i32, + data: &[u8], + num_mappers: i32, + num_partitions: i32, + ) -> Result<()> { + ffi::push_data( + self.inner.pin_mut(), + shuffle_id, + map_id, + attempt_id, + partition_id, + data, + num_mappers, + num_partitions, + )?; + Ok(()) + } + + /// Signal that a mapper has finished writing all its partitions. + pub fn mapper_end( + &mut self, + shuffle_id: i32, + map_id: i32, + attempt_id: i32, + num_mappers: i32, + ) -> Result<()> { + ffi::mapper_end(self.inner.pin_mut(), shuffle_id, map_id, attempt_id, num_mappers)?; + Ok(()) + } + + /// Update reducer file group metadata for a given shuffle. + pub fn update_reducer_file_group(&mut self, shuffle_id: i32) -> Result<()> { + ffi::update_reducer_file_group(self.inner.pin_mut(), shuffle_id)?; + Ok(()) + } + + /// Read all data for a partition with full control over parameters. + pub fn read_partition( + &mut self, + shuffle_id: i32, + partition_id: i32, + attempt_number: i32, + start_map_index: i32, + end_map_index: i32, + ) -> Result> { + let data = ffi::read_partition_full( + self.inner.pin_mut(), + shuffle_id, + partition_id, + attempt_number, + start_map_index, + end_map_index, + )?; + Ok(data) + } + + /// Convenience: read all map outputs for a partition. + #[inline] + pub fn read_partition_all( + &mut self, + shuffle_id: i32, + partition_id: i32, + num_mappers: i32, + ) -> Result> { + self.read_partition(shuffle_id, partition_id, 0, 0, num_mappers) + } + + /// Explicitly shut down the client. Preferred over relying on Drop. + /// + /// After calling `ffi::shutdown`, the underlying C++ handle is intentionally + /// leaked to avoid a SIGSEGV caused by folly's `EventBase` destruction race: + /// `TransportClient` destructor posts a callback to an `EventBase` that may + /// already be torn down by `IOThreadPoolExecutor::join()`. + /// + /// The handle is leaked **before** propagating any error so that `Drop` + /// cannot call `ffi::shutdown` a second time. + pub fn shutdown(mut self) -> Result<()> { + let result = match self.inner.as_mut() { + Some(pinned) => ffi::shutdown(pinned).map_err(Error::from), + None => Ok(()), + }; + // Leak the C++ handle regardless of success/failure to avoid + // folly EventBase use-after-free on destruction and prevent + // Drop from calling ffi::shutdown a second time. + Self::leak_inner(&mut self.inner); + std::mem::forget(self); + result + } + + /// Leak the UniquePtr without running C++ destructors. + fn leak_inner(handle: &mut UniquePtr) { + let ptr = std::mem::replace(handle, UniquePtr::::null()); + // into_raw consumes the UniquePtr without calling the C++ destructor. + cxx::UniquePtr::into_raw(ptr); + } +} + +impl Drop for ShuffleClient { + fn drop(&mut self) { + if self.inner.is_null() { + return; + } + // Best-effort shutdown; then leak to avoid folly SIGSEGV. + if let Some(pinned) = self.inner.as_mut() { + if let Err(e) = ffi::shutdown(pinned) { + log::error!( + "ffi::shutdown failed during Drop: {e}; \ + caller should explicitly call ShuffleClient::shutdown() before drop" + ); + } + } + Self::leak_inner(&mut self.inner); + } +} diff --git a/rust/examples/data_sum_reader.rs b/rust/examples/data_sum_reader.rs new file mode 100644 index 00000000000..94aae636b11 --- /dev/null +++ b/rust/examples/data_sum_reader.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Rust equivalent of cpp/celeborn/tests/DataSumWithReaderClient.cpp +//! Usage: data_sum_reader +//! +//! +//! Set RUST_LOG=info for diagnostic output. + +use celeborn_client::{Config, ShuffleClient}; +use std::env; +use std::fs::File; +use std::io::Write; + +fn main() { + env_logger::init(); + + let args: Vec = env::args().collect(); + if args.len() != 10 { + eprintln!( + "Usage: {} \ + ", + args[0] + ); + std::process::exit(1); + } + + let lm_host = &args[1]; + let lm_port: i32 = args[2].parse().expect("lm_port must be an integer"); + let app_id = args[3].clone(); + let shuffle_id: i32 = args[4].parse().expect("shuffle_id must be an integer"); + let attempt_id: i32 = args[5].parse().expect("attempt_id must be an integer"); + let num_mappers: i32 = args[6].parse().expect("num_mappers must be an integer"); + let num_partitions: i32 = args[7].parse().expect("num_partitions must be an integer"); + let result_file = &args[8]; + let compress_codec = args[9].clone(); + + println!( + "lm_host={lm_host}, lm_port={lm_port}, app_id={app_id}, \ + shuffle_id={shuffle_id}, attempt_id={attempt_id}, \ + num_mappers={num_mappers}, num_partitions={num_partitions}, \ + result_file={result_file}, compress_codec={compress_codec}" + ); + + let mut config = Config::new(app_id); + config.shuffle_compression_codec = compress_codec; + + let mut client = + ShuffleClient::connect(config, lm_host, lm_port).expect("Failed to connect to LM"); + + client + .update_reducer_file_group(shuffle_id) + .expect("update_reducer_file_group failed"); + + let mut result = vec![0i64; num_partitions as usize]; + + for partition_id in 0..num_partitions { + let data = client + .read_partition(shuffle_id, partition_id, attempt_id, 0, num_mappers) + .expect("read_partition failed"); + + let mut current_number: i64 = 0; + let mut data_count: usize = 0; + + for &byte in &data { + let c = byte as char; + match c { + '-' => { + result[partition_id as usize] += current_number; + current_number = 0; + data_count += 1; + } + '+' => {} + '0'..='9' => { + current_number = current_number * 10 + (c as i64 - '0' as i64); + } + _ => { + panic!("Unexpected character in partition data: '{c}'"); + } + } + } + // Add the last number (data after last '-') + result[partition_id as usize] += current_number; + + println!( + "partition {partition_id} sum result = {}, dataCnt = {data_count}", + result[partition_id as usize] + ); + } + + let mut file = File::create(result_file).expect("Failed to create result file"); + for sum in &result { + writeln!(file, "{sum}").expect("Failed to write result"); + } + + client.shutdown().expect("shutdown failed"); + println!("Reader completed successfully."); +} diff --git a/rust/examples/data_sum_writer.rs b/rust/examples/data_sum_writer.rs new file mode 100644 index 00000000000..03897cae15d --- /dev/null +++ b/rust/examples/data_sum_writer.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Rust equivalent of cpp/celeborn/tests/DataSumWithWriterClient.cpp +//! Usage: data_sum_writer +//! +//! +//! Requires env_logger initialized for diagnostic output from celeborn-client Drop path. +//! Set RUST_LOG=info (or debug) for verbose output. + +use celeborn_client::{Config, ShuffleClient}; +use std::env; +use std::fs::File; +use std::io::Write; + +fn main() { + env_logger::init(); + + let args: Vec = env::args().collect(); + if args.len() != 10 { + eprintln!( + "Usage: {} \ + ", + args[0] + ); + std::process::exit(1); + } + + let lm_host = &args[1]; + let lm_port: i32 = args[2].parse().expect("lm_port must be an integer"); + let app_id = args[3].clone(); + let shuffle_id: i32 = args[4].parse().expect("shuffle_id must be an integer"); + let attempt_id: i32 = args[5].parse().expect("attempt_id must be an integer"); + let num_mappers: i32 = args[6].parse().expect("num_mappers must be an integer"); + let num_partitions: i32 = args[7].parse().expect("num_partitions must be an integer"); + let result_file = &args[8]; + let compress_codec = args[9].clone(); + + println!( + "lm_host={lm_host}, lm_port={lm_port}, app_id={app_id}, \ + shuffle_id={shuffle_id}, attempt_id={attempt_id}, \ + num_mappers={num_mappers}, num_partitions={num_partitions}, \ + result_file={result_file}, compress_codec={compress_codec}" + ); + + let mut config = Config::new(app_id); + config.shuffle_compression_codec = compress_codec; + + let mut client = + ShuffleClient::connect(config, lm_host, lm_port).expect("Failed to connect to LM"); + + let max_data: i64 = 1_000_000; + let num_data: usize = 1000; + let mut result = vec![0i64; num_partitions as usize]; + + for map_id in 0..num_mappers { + for partition_id in 0..num_partitions { + let mut partition_data = String::new(); + for _ in 0..num_data { + let data = rand_simple(max_data); + result[partition_id as usize] += data; + partition_data.push('-'); + partition_data.push_str(&data.to_string()); + } + + client + .push_data( + shuffle_id, + map_id, + attempt_id, + partition_id, + partition_data.as_bytes(), + num_mappers, + num_partitions, + ) + .expect("push_data failed"); + } + client + .mapper_end(shuffle_id, map_id, attempt_id, num_mappers) + .expect("mapper_end failed"); + } + + for (partition_id, sum) in result.iter().enumerate() { + println!("partition {partition_id} sum result = {sum}"); + } + + let mut file = File::create(result_file).expect("Failed to create result file"); + for sum in &result { + writeln!(file, "{sum}").expect("Failed to write result"); + } + + client.shutdown().expect("shutdown failed"); + println!("Writer completed successfully."); +} + +/// Simple pseudo-random number generator (deterministic per process, good enough for testing). +fn rand_simple(max_val: i64) -> i64 { + use std::cell::Cell; + thread_local! { + static STATE: Cell = Cell::new(12345); + } + STATE.with(|s| { + let mut x = s.get(); + x ^= x << 13; + x ^= x >> 7; + x ^= x << 17; + s.set(x); + ((x >> 1) as i64) % max_val + }) +} diff --git a/sbin/start-lifecycle-manager.sh b/sbin/start-lifecycle-manager.sh new file mode 100755 index 00000000000..5555d3f0b2e --- /dev/null +++ b/sbin/start-lifecycle-manager.sh @@ -0,0 +1,175 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Starts a standalone Celeborn LifecycleManager daemon in background. +# The RPC port is randomly selected from 30000~50000 if --port is not specified. +# After successful start, the port is exported as CELEBORN_LM_PORT. + +set -euo pipefail + +if [ -z "${CELEBORN_HOME:-}" ]; then + export CELEBORN_HOME="$(cd "$(dirname "$0")/.."; pwd)" +fi + +usage() { + cat < --master-endpoints [--port ] [--host ] [--properties-file ] + + --app-id REQUIRED unique application id + --master-endpoints REQUIRED comma-separated host:port of Celeborn Masters + --port OPTIONAL fixed RPC port to bind (default: random available port in 30000~50000) + --host OPTIONAL bind host (default: hostname) + --properties-file OPTIONAL path to celeborn-defaults.conf +EOF + exit 1 +} + +# Find a random available port in range [30000, 50000] +find_available_port() { + local port + local max_attempts=100 + local attempt=0 + while [ "$attempt" -lt "$max_attempts" ]; do + port=$(( RANDOM % 20001 + 30000 )) + # Check if the port is available (not in use) + if ! (echo >/dev/tcp/127.0.0.1/"$port") 2>/dev/null; then + echo "$port" + return 0 + fi + attempt=$(( attempt + 1 )) + done + echo "Error: failed to find an available port in 30000~50000 after $max_attempts attempts." >&2 + return 1 +} + +# Pre-check: required args must be present +have_app_id=0; have_master=0; have_port=0 +for ((i=1; i<=$#; i++)); do + case "${!i}" in + --app-id) have_app_id=1 ;; + --master-endpoints) have_master=1 ;; + --port) have_port=1 ;; + esac +done + +if [ "$have_app_id" -eq 0 ] || [ "$have_master" -eq 0 ]; then + echo "Error: --app-id and --master-endpoints are required." >&2 + usage +fi + +# If --port is not specified, find a random available port and append it to args +if [ "$have_port" -eq 0 ]; then + CELEBORN_LM_PORT=$(find_available_port) + echo "Auto-selected available port: ${CELEBORN_LM_PORT}" + set -- "$@" --port "${CELEBORN_LM_PORT}" +else + # Extract the port value from args + for ((i=1; i<=$#; i++)); do + if [ "${!i}" = "--port" ]; then + j=$(( i + 1 )) + CELEBORN_LM_PORT="${!j}" + break + fi + done +fi + +export CELEBORN_LM_PORT + +# Load environment (JAVA_HOME, CELEBORN_CONF_DIR, etc.) +# Temporarily disable nounset because load-celeborn-env.sh checks unset vars +if [ -f "${CELEBORN_HOME}/sbin/load-celeborn-env.sh" ]; then + set +u + # shellcheck source=/dev/null + . "${CELEBORN_HOME}/sbin/load-celeborn-env.sh" + set -u +fi + +CELEBORN_CONF_DIR="${CELEBORN_CONF_DIR:-${CELEBORN_HOME}/conf}" + +# Determine Java +if [ -n "${JAVA_HOME:-}" ]; then + JAVA="${JAVA_HOME}/bin/java" +else + JAVA="java" +fi + +# Build classpath: conf + service jars + client jars + common jars +CLASSPATH="${CELEBORN_CONF_DIR}" +for dir in \ + "${CELEBORN_HOME}/lifecycle-manager/target/"*.jar \ + "${CELEBORN_HOME}/service/target/"*.jar \ + "${CELEBORN_HOME}/client/target/"*.jar \ + "${CELEBORN_HOME}/common/target/"*.jar \ + "${CELEBORN_HOME}/spi/target/"*.jar \ + "${CELEBORN_HOME}/jars/"*.jar \ + "${CELEBORN_HOME}/lifecycle-manager-jars/"*.jar \ + "${CELEBORN_HOME}/service-jars/"*.jar \ + "${CELEBORN_HOME}/client-jars/"*.jar; do + if [ -e "$dir" ]; then + CLASSPATH="${CLASSPATH}:${dir}" + fi +done + +# JVM options +CELEBORN_JAVA_OPTS="${CELEBORN_JAVA_OPTS:-}" +if [ -f "${CELEBORN_CONF_DIR}/log4j2.xml" ]; then + CELEBORN_JAVA_OPTS="${CELEBORN_JAVA_OPTS} -Dlog4j2.configurationFile=file:${CELEBORN_CONF_DIR}/log4j2.xml" +fi + +MAIN_CLASS="org.apache.celeborn.server.lifecyclemanager.LifecycleManagerDaemon" + +# --- Background execution --- +LOG_DIR="${CELEBORN_HOME}/logs" +mkdir -p "${LOG_DIR}" +LOG_FILE="${LOG_DIR}/lifecyclemanager-${CELEBORN_LM_PORT}.out" +PID_FILE="${LOG_DIR}/lifecyclemanager-${CELEBORN_LM_PORT}.pid" + +nohup "${JAVA}" -cp "${CLASSPATH}" ${CELEBORN_JAVA_OPTS} "${MAIN_CLASS}" "$@" \ + > "${LOG_FILE}" 2>&1 & + +CELEBORN_LM_PID=$! +echo "${CELEBORN_LM_PID}" > "${PID_FILE}" + +# Wait briefly and verify the process is still alive +sleep 2 +if kill -0 "${CELEBORN_LM_PID}" 2>/dev/null; then + export CELEBORN_LM_PORT + export CELEBORN_LM_PID + + # Write port to a env file so other scripts can source it + ENV_FILE="${LOG_DIR}/lifecyclemanager-${CELEBORN_LM_PID}.env" + cat > "${ENV_FILE}" <&2 + cat "${LOG_FILE}" >&2 + rm -f "${PID_FILE}" + exit 1 +fi diff --git a/sbin/stop-lifecycle-manager.sh b/sbin/stop-lifecycle-manager.sh new file mode 100755 index 00000000000..4f301d28d47 --- /dev/null +++ b/sbin/stop-lifecycle-manager.sh @@ -0,0 +1,142 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Stops a Celeborn LifecycleManager daemon by PID file or port. +# Usage: +# stop-lifecycle-manager.sh --port Stop the instance bound to +# stop-lifecycle-manager.sh --all Stop all LifecycleManager instances + +set -euo pipefail + +if [ -z "${CELEBORN_HOME:-}" ]; then + export CELEBORN_HOME="$(cd "$(dirname "$0")/.."; pwd)" +fi + +LOG_DIR="${CELEBORN_HOME}/logs" +GRACEFUL_TIMEOUT=10 + +usage() { + cat < | --all + + --port Stop the LifecycleManager instance running on the specified port + --all Stop all LifecycleManager instances managed by this script +EOF + exit 1 +} + +# Stop a single instance by its PID file +stop_by_pid_file() { + local pid_file="$1" + local pid + + if [ ! -f "${pid_file}" ]; then + echo "PID file not found: ${pid_file}" >&2 + return 1 + fi + + pid=$(cat "${pid_file}") + if [ -z "${pid}" ]; then + echo "PID file is empty: ${pid_file}, removing." >&2 + rm -f "${pid_file}" + return 1 + fi + + if kill -0 "${pid}" 2>/dev/null; then + echo "Stopping LifecycleManager (PID: ${pid}) with SIGTERM ..." + kill -TERM "${pid}" + + # Wait for graceful shutdown + local waited=0 + while [ "${waited}" -lt "${GRACEFUL_TIMEOUT}" ]; do + if ! kill -0 "${pid}" 2>/dev/null; then + echo "LifecycleManager (PID: ${pid}) stopped." + rm -f "${pid_file}" + # Clean up the env file + rm -f "${LOG_DIR}/lifecyclemanager-${pid}.env" + return 0 + fi + sleep 1 + waited=$(( waited + 1 )) + done + + # Force kill if still alive + echo "LifecycleManager (PID: ${pid}) did not stop after ${GRACEFUL_TIMEOUT}s, sending SIGKILL ..." + kill -9 "${pid}" 2>/dev/null || true + sleep 1 + rm -f "${pid_file}" + rm -f "${LOG_DIR}/lifecyclemanager-${pid}.env" + echo "LifecycleManager (PID: ${pid}) killed." + else + echo "LifecycleManager (PID: ${pid}) is not running. Cleaning up stale PID file." + rm -f "${pid_file}" + rm -f "${LOG_DIR}/lifecyclemanager-${pid}.env" + fi +} + +# Parse arguments +if [ $# -eq 0 ]; then + usage +fi + +MODE="" +TARGET_PORT="" + +while [ $# -gt 0 ]; do + case "$1" in + --port) + MODE="port" + TARGET_PORT="${2:-}" + if [ -z "${TARGET_PORT}" ]; then + echo "Error: --port requires a port number." >&2 + usage + fi + shift 2 + ;; + --all) + MODE="all" + shift + ;; + *) + echo "Unknown option: $1" >&2 + usage + ;; + esac +done + +if [ -z "${MODE}" ]; then + usage +fi + +case "${MODE}" in + port) + PID_FILE="${LOG_DIR}/lifecyclemanager-${TARGET_PORT}.pid" + stop_by_pid_file "${PID_FILE}" + ;; + all) + found=0 + for pid_file in "${LOG_DIR}"/lifecyclemanager-*.pid; do + [ -f "${pid_file}" ] || continue + found=1 + stop_by_pid_file "${pid_file}" + done + if [ "${found}" -eq 0 ]; then + echo "No running LifecycleManager instances found." + fi + ;; +esac