Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ build/sbt-config/**
**/benchmarks/**
**/node_modules/**
cpp/cmake/FindSodium.cmake
cpp/build/**
rust/Cargo.lock
27 changes: 25 additions & 2 deletions build/make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ function build_service {
# Store the command as an array because $MVN variable might have spaces in it.
# Normal quoting tricks don't work.
# See: http://mywiki.wooledge.org/BashFAQ/050
BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl master,worker,cli -am $@)
BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl master,worker,cli,lifecycle-manager -am $@)

# Actually build the jar
echo -e "\nBuilding with..."
Expand All @@ -158,6 +158,7 @@ function build_service {
mkdir -p "$DIST_DIR/master-jars"
mkdir -p "$DIST_DIR/worker-jars"
mkdir -p "$DIST_DIR/cli-jars"
mkdir -p "$DIST_DIR/lifecycle-manager-jars"

## Copy master jars
cp "$PROJECT_DIR"/master/target/celeborn-master_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/master-jars/"
Expand All @@ -177,6 +178,21 @@ function build_service {
for jar in $(ls "$PROJECT_DIR/cli/target/scala-$SCALA_VERSION/jars"); do
(cd $DIST_DIR/cli-jars; ln -snf "../jars/$jar" .)
done
## Copy lifecycle-manager jars
# lifecycle-manager depends on celeborn-client which is not a dependency of master/worker,
# so we copy its project-internal dependency jars that are missing from jars/.
for module_jar in \
"$PROJECT_DIR/lifecycle-manager/target/celeborn-lifecycle-manager_$SCALA_VERSION-$VERSION.jar" \
"$PROJECT_DIR/client/target/celeborn-client_$SCALA_VERSION-$VERSION.jar"; do
jarname=$(basename "$module_jar")
if [ ! -f "$DIST_DIR/jars/$jarname" ]; then
cp "$module_jar" "$DIST_DIR/jars/"
fi
done
cp "$PROJECT_DIR"/lifecycle-manager/target/celeborn-lifecycle-manager_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/lifecycle-manager-jars/"
for jar in $(ls "$DIST_DIR/jars"); do
(cd $DIST_DIR/lifecycle-manager-jars; ln -snf "../jars/$jar" .)
done
}

function build_spark_client {
Expand Down Expand Up @@ -304,12 +320,13 @@ function sbt_build_service {

"${BUILD_COMMAND[@]}"

$SBT "celeborn-master/copyJars;celeborn-worker/copyJars;celeborn-cli/copyJars"
$SBT "celeborn-master/copyJars;celeborn-worker/copyJars;celeborn-cli/copyJars;celeborn-lifecycle-manager/copyJars"

mkdir -p "$DIST_DIR/jars"
mkdir -p "$DIST_DIR/master-jars"
mkdir -p "$DIST_DIR/worker-jars"
mkdir -p "$DIST_DIR/cli-jars"
mkdir -p "$DIST_DIR/lifecycle-manager-jars"

## Copy master jars
cp "$PROJECT_DIR"/master/target/scala-$SCALA_VERSION/celeborn-master_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/master-jars/"
Expand All @@ -329,6 +346,12 @@ function sbt_build_service {
for jar in $(ls "$PROJECT_DIR/cli/target/scala-$SCALA_VERSION/jars"); do
(cd $DIST_DIR/cli-jars; ln -snf "../jars/$jar" .)
done
## Copy lifecycle-manager jars
cp "$PROJECT_DIR"/lifecycle-manager/target/scala-$SCALA_VERSION/celeborn-lifecycle-manager_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/lifecycle-manager-jars/"
cp "$PROJECT_DIR"/lifecycle-manager/target/scala-$SCALA_VERSION/jars/*.jar "$DIST_DIR/jars/"
for jar in $(ls "$PROJECT_DIR/lifecycle-manager/target/scala-$SCALA_VERSION/jars"); do
(cd $DIST_DIR/lifecycle-manager-jars; ln -snf "../jars/$jar" .)
done
}

function sbt_build_client {
Expand Down
64 changes: 35 additions & 29 deletions cpp/celeborn/client/ShuffleClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ namespace client {
ShuffleClientEndpoint::ShuffleClientEndpoint(
const std::shared_ptr<const conf::CelebornConf>& conf)
: conf_(conf),
pushDataRetryPool_(std::make_shared<folly::IOThreadPoolExecutor>(
conf_->clientPushRetryThreads(),
std::make_shared<folly::NamedThreadFactory>(
"client-pushdata-retrier"))),
pushDataRetryPool_(
std::make_shared<folly::IOThreadPoolExecutor>(
conf_->clientPushRetryThreads(),
std::make_shared<folly::NamedThreadFactory>(
"client-pushdata-retrier"))),
clientFactory_(std::make_shared<network::TransportClientFactory>(conf_)) {
}

Expand Down Expand Up @@ -153,10 +154,11 @@ int ShuffleClientImpl::pushData(
-1,
nullptr,
protocol::StatusCode::PUSH_DATA_FAIL_NON_CRITICAL_CAUSE)) {
CELEBORN_FAIL(fmt::format(
"Revive for shuffleId {} partitionId {} failed.",
shuffleId,
partitionId));
CELEBORN_FAIL(
fmt::format(
"Revive for shuffleId {} partitionId {} failed.",
shuffleId,
partitionId));
}
partitionLocationOptional = partitionLocationMap->get(partitionId);
}
Expand Down Expand Up @@ -298,10 +300,11 @@ int ShuffleClientImpl::mergeData(
-1,
nullptr,
protocol::StatusCode::PUSH_DATA_FAIL_NON_CRITICAL_CAUSE)) {
CELEBORN_FAIL(fmt::format(
"Revive for shuffleId {} partitionId {} failed.",
shuffleId,
partitionId));
CELEBORN_FAIL(
fmt::format(
"Revive for shuffleId {} partitionId {} failed.",
shuffleId,
partitionId));
}
partitionLocationOptional = partitionLocationMap->get(partitionId);
}
Expand Down Expand Up @@ -780,8 +783,9 @@ std::unique_ptr<CelebornInputStream> ShuffleClientImpl::readPartition(
std::vector<std::shared_ptr<const protocol::PartitionLocation>> locations;
if (!reducerFileGroupInfo->fileGroups.empty() &&
reducerFileGroupInfo->fileGroups.count(partitionId)) {
locations = std::move(utils::toVector(
reducerFileGroupInfo->fileGroups.find(partitionId)->second));
locations = std::move(
utils::toVector(
reducerFileGroupInfo->fileGroups.find(partitionId)->second));
}
return std::make_unique<CelebornInputStream>(
shuffleKey,
Expand Down Expand Up @@ -926,26 +930,28 @@ void ShuffleClientImpl::registerShuffle(
}
}
} catch (std::exception& e) {
CELEBORN_FAIL(fmt::format(
"registerShuffle encounters error after {} tries, "
"shuffleId {} numMappers {} numPartitions {}, errorMsg: {}",
numRetries,
shuffleId,
numMappers,
numPartitions,
e.what()));
CELEBORN_FAIL(
fmt::format(
"registerShuffle encounters error after {} tries, "
"shuffleId {} numMappers {} numPartitions {}, errorMsg: {}",
numRetries,
shuffleId,
numMappers,
numPartitions,
e.what()));
break;
}
std::this_thread::sleep_for(conf_->clientRegisterShuffleRetryWait());
}
partitionLocationMaps_.set(shuffleId, nullptr);
CELEBORN_FAIL(fmt::format(
"registerShuffle failed after {} tries, "
"shuffleId {} numMappers {} numPartitions {}",
maxRetries,
shuffleId,
numMappers,
numPartitions));
CELEBORN_FAIL(
fmt::format(
"registerShuffle failed after {} tries, "
"shuffleId {} numMappers {} numPartitions {}",
maxRetries,
shuffleId,
numMappers,
numPartitions));
}

void ShuffleClientImpl::submitRetryPushData(
Expand Down
5 changes: 3 additions & 2 deletions cpp/celeborn/client/ShuffleClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ class ShuffleClientImpl

void setupLifecycleManagerRef(std::string& host, int port) override;

void setupLifecycleManagerRef(std::shared_ptr<network::NettyRpcEndpointRef>&
lifecycleManagerRef) override;
void setupLifecycleManagerRef(
std::shared_ptr<network::NettyRpcEndpointRef>& lifecycleManagerRef)
override;

std::shared_ptr<utils::ConcurrentHashMap<
int,
Expand Down
35 changes: 20 additions & 15 deletions cpp/celeborn/client/tests/CelebornInputStreamRetryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,10 @@ class SequencedMockClientFactory : public TransportClientFactory {

// Verifies that createReaderWithRetry exhausts all retries and throws.
TEST(CelebornInputStreamRetryTest, allRetriesExhaustedThrows) {
auto client = std::make_shared<FailingTransportClient>(
std::make_exception_ptr(std::system_error(
std::make_error_code(std::errc::connection_refused))));
auto client =
std::make_shared<FailingTransportClient>(std::make_exception_ptr(
std::system_error(
std::make_error_code(std::errc::connection_refused))));
auto factory = std::make_shared<TrackingTransportClientFactory>(client);
auto conf = makeTestConf();
auto excludedWorkers =
Expand Down Expand Up @@ -333,9 +334,10 @@ TEST(CelebornInputStreamRetryTest, allRetriesExhaustedThrows) {

// Verifies that on failure, the retry logic switches from primary to replica.
TEST(CelebornInputStreamRetryTest, switchesToPeerOnFailure) {
auto client = std::make_shared<FailingTransportClient>(
std::make_exception_ptr(std::system_error(
std::make_error_code(std::errc::connection_refused))));
auto client =
std::make_shared<FailingTransportClient>(std::make_exception_ptr(
std::system_error(
std::make_error_code(std::errc::connection_refused))));
auto factory = std::make_shared<TrackingTransportClientFactory>(client);
auto conf = makeTestConf();
auto excludedWorkers =
Expand Down Expand Up @@ -374,9 +376,10 @@ TEST(CelebornInputStreamRetryTest, switchesToPeerOnFailure) {
// Verifies that critical failures cause workers to be added to the
// exclusion list.
TEST(CelebornInputStreamRetryTest, excludesCriticalFailures) {
auto client = std::make_shared<FailingTransportClient>(
std::make_exception_ptr(std::system_error(
std::make_error_code(std::errc::connection_refused))));
auto client =
std::make_shared<FailingTransportClient>(std::make_exception_ptr(
std::system_error(
std::make_error_code(std::errc::connection_refused))));
auto factory = std::make_shared<TrackingTransportClientFactory>(client);
auto conf = makeTestConf(true);
auto excludedWorkers =
Expand Down Expand Up @@ -448,9 +451,10 @@ TEST(CelebornInputStreamRetryTest, doesNotExcludeNonCriticalFailures) {

// Verifies that without a peer, all retries target the same location.
TEST(CelebornInputStreamRetryTest, noPeerRetriesSameLocation) {
auto client = std::make_shared<FailingTransportClient>(
std::make_exception_ptr(std::system_error(
std::make_error_code(std::errc::connection_refused))));
auto client =
std::make_shared<FailingTransportClient>(std::make_exception_ptr(
std::system_error(
std::make_error_code(std::errc::connection_refused))));
auto factory = std::make_shared<TrackingTransportClientFactory>(client);
// Replication disabled: maxRetry = 2
auto conf = makeTestConf(false);
Expand Down Expand Up @@ -489,9 +493,10 @@ TEST(CelebornInputStreamRetryTest, noPeerRetriesSameLocation) {

// Verifies that with replication enabled, maxRetry is doubled.
TEST(CelebornInputStreamRetryTest, replicationDoublesMaxRetries) {
auto client = std::make_shared<FailingTransportClient>(
std::make_exception_ptr(std::system_error(
std::make_error_code(std::errc::connection_refused))));
auto client =
std::make_shared<FailingTransportClient>(std::make_exception_ptr(
std::system_error(
std::make_error_code(std::errc::connection_refused))));
auto factory = std::make_shared<TrackingTransportClientFactory>(client);
// Replication enabled: maxRetry = 2 * 2 = 4
auto conf = makeTestConf(true);
Expand Down
4 changes: 2 additions & 2 deletions cpp/celeborn/client/tests/ReviveManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ TEST_F(ReviveManagerTest, successOnReviveSuccess) {
const std::unordered_set<int>& mapIds,
const std::unordered_map<int, ShuffleClientImpl::PtrReviveRequest>&
requestsToSend) mutable
-> std::optional<std::unordered_map<int, int>> {
-> std::optional<std::unordered_map<int, int>> {
EXPECT_EQ(testShuffleId, shuffleId);
EXPECT_GT(mapIds.count(testMapId), 0);
EXPECT_GT(requestsToSend.count(testPartitionId), 0);
Expand Down Expand Up @@ -220,7 +220,7 @@ TEST_F(ReviveManagerTest, failureOnReviveFailure) {
const std::unordered_set<int>& mapIds,
const std::unordered_map<int, ShuffleClientImpl::PtrReviveRequest>&
requestsToSend) mutable
-> std::optional<std::unordered_map<int, int>> {
-> std::optional<std::unordered_map<int, int>> {
EXPECT_EQ(testShuffleId, shuffleId);
EXPECT_GT(mapIds.count(testMapId), 0);
EXPECT_GT(requestsToSend.count(testPartitionId), 0);
Expand Down
5 changes: 3 additions & 2 deletions cpp/celeborn/client/writer/PushMergedDataCallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ void PushMergedDataCallback::onSuccess(
response->readToReadOnlyBuffer(response->remainingSize()));
PbPushMergedDataSplitPartitionInfo partitionInfo;
if (!partitionInfo.ParseFromString(transportMsg->payload())) {
pushState_->setException(std::make_unique<std::runtime_error>(
"Failed to parse PbPushMergedDataSplitPartitionInfo"));
pushState_->setException(
std::make_unique<std::runtime_error>(
"Failed to parse PbPushMergedDataSplitPartitionInfo"));
return;
}

Expand Down
11 changes: 4 additions & 7 deletions cpp/celeborn/conf/CelebornConf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,11 @@ std::string bool2String(bool value) {
return value ? "true" : "false";
}

#define STR_PROP(_key_, _val_) \
{ std::string(_key_), std::string(_val_) }
#define STR_PROP(_key_, _val_) {std::string(_key_), std::string(_val_)}
#define NUM_PROP(_key_, _val_) \
{ std::string(_key_), folly::to<std::string>(_val_) }
#define BOOL_PROP(_key_, _val_) \
{ std::string(_key_), bool2String(_val_) }
#define NONE_PROP(_key_) \
{ std::string(_key_), folly::none }
{std::string(_key_), folly::to<std::string>(_val_)}
#define BOOL_PROP(_key_, _val_) {std::string(_key_), bool2String(_val_)}
#define NONE_PROP(_key_) {std::string(_key_), folly::none}

enum class CapacityUnit {
BYTE,
Expand Down
10 changes: 6 additions & 4 deletions cpp/celeborn/network/TransportClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ void TransportClient::pushDataAsync(
reinterpret_cast<RpcResponse*>(responseMsg.get());
_callback->onSuccess(rpcResponse->body());
} else {
_callback->onFailure(std::make_unique<std::runtime_error>(
"pushData return value type is not rpcResponse"));
_callback->onFailure(
std::make_unique<std::runtime_error>(
"pushData return value type is not rpcResponse"));
}
})
.thenError([_callback = callback](const folly::exception_wrapper& e) {
Expand Down Expand Up @@ -132,8 +133,9 @@ void TransportClient::pushMergedDataAsync(
reinterpret_cast<RpcResponse*>(responseMsg.get());
_callback->onSuccess(rpcResponse->body());
} else {
_callback->onFailure(std::make_unique<std::runtime_error>(
"pushMergedData return value type is not rpcResponse"));
_callback->onFailure(
std::make_unique<std::runtime_error>(
"pushMergedData return value type is not rpcResponse"));
}
})
.thenError([_callback = callback](const folly::exception_wrapper& e) {
Expand Down
24 changes: 13 additions & 11 deletions cpp/celeborn/utils/tests/ExceptionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,12 @@ void testExceptionTraceCollectionControl(bool userException, bool enabled) {
false);
}
} catch (CelebornException& e) {
SCOPED_TRACE(fmt::format(
"enabled: {}, user flag: {}, sys flag: {}",
enabled,
FLAGS_celeborn_exception_user_stacktrace_enabled,
FLAGS_celeborn_exception_system_stacktrace_enabled));
SCOPED_TRACE(
fmt::format(
"enabled: {}, user flag: {}, sys flag: {}",
enabled,
FLAGS_celeborn_exception_user_stacktrace_enabled,
FLAGS_celeborn_exception_system_stacktrace_enabled));
ASSERT_EQ(
userException, e.exceptionType() == CelebornException::Type::kUser);
ASSERT_EQ(enabled, e.stackTrace() != nullptr);
Expand Down Expand Up @@ -174,12 +175,13 @@ void testExceptionTraceCollectionRateControl(
false);
}
} catch (CelebornException& e) {
SCOPED_TRACE(fmt::format(
"userException: {}, hasRateLimit: {}, user limit: {}ms, sys limit: {}ms",
userException,
hasRateLimit,
FLAGS_celeborn_exception_user_stacktrace_rate_limit_ms,
FLAGS_celeborn_exception_system_stacktrace_rate_limit_ms));
SCOPED_TRACE(
fmt::format(
"userException: {}, hasRateLimit: {}, user limit: {}ms, sys limit: {}ms",
userException,
hasRateLimit,
FLAGS_celeborn_exception_user_stacktrace_rate_limit_ms,
FLAGS_celeborn_exception_system_stacktrace_rate_limit_ms));
ASSERT_EQ(
userException, e.exceptionType() == CelebornException::Type::kUser);
ASSERT_EQ(!hasRateLimit || ((iter % 2) == 0), e.stackTrace() != nullptr);
Expand Down
Loading
Loading