Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions include/comm/ClientServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ namespace FMI::Comm {
//! Uploads objects and keeps track of them.
virtual void upload(channel_data buf, std::string name);

//! List all the currently existing objects, needs to be implemented by channels. Needed by some collectives that check for the existence of files, but do not care about their content.
virtual std::vector<std::string> get_object_names() = 0;

//! Delete the object with the given name, needs to be implemented by channels.
virtual void delete_object(std::string name) = 0;

Comment on lines 44 to 49

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that an object_exists() backend hook would be cleaner and could map to Redis EXISTS / S3 HeadObject. However, I wanted to keep this PR small by reusing the existing API.

Expand Down
2 changes: 0 additions & 2 deletions include/comm/Redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ namespace FMI::Comm {

void delete_object(std::string name) override;

std::vector<std::string> get_object_names() override;

double get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override;

double get_price(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override;
Expand Down
2 changes: 0 additions & 2 deletions include/comm/S3.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ namespace FMI::Comm {

void delete_object(std::string name) override;

std::vector<std::string> get_object_names() override;

double get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override;

double get_price(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override;
Expand Down
15 changes: 10 additions & 5 deletions src/comm/ClientServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,16 @@ void FMI::Comm::ClientServer::barrier() {
upload({&b, sizeof(b)}, file_name);
unsigned int elapsed_time = 0;
while (elapsed_time < max_timeout) {
auto objects = get_object_names();
auto has_barrier_suffix = [barrier_suffix] (const std::string& s){return s.size() > barrier_suffix.size() &&
s.compare(s.size() - barrier_suffix.size(), barrier_suffix.size(), barrier_suffix) == 0 ;};
auto num_arrived = std::count_if(objects.begin(), objects.end(), has_barrier_suffix);
if (num_arrived >= num_peers) {
bool all_arrived = true;
for (int i = 0; i < num_peers; i++) {
char arrived = 0;
std::string expected_name = comm_name + std::to_string(i) + barrier_suffix;
if (!download_object({&arrived, sizeof(arrived)}, expected_name)) {
all_arrived = false;
break;
}
}
Comment on lines +53 to +61

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably the previous solution had worse scaling performance.

if (all_arrived) {
return;
} else {
elapsed_time += timeout;
Comment on lines 50 to 65

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot, elapsed_time was not part of the PR 💯

Expand Down
10 changes: 0 additions & 10 deletions src/comm/Redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,6 @@ void FMI::Comm::Redis::delete_object(std::string name) {
freeReplyObject(reply);
}

std::vector<std::string> FMI::Comm::Redis::get_object_names() {
std::vector<std::string> keys;
std::string command = "KEYS *";
auto* reply = (redisReply*) redisCommand(context, command.c_str());
for (int i = 0; i < reply->elements; i++) {
keys.emplace_back(reply->element[i]->str);
}
return keys;
}

double FMI::Comm::Redis::get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) {
double agg_bandwidth = std::min(producer * consumer * bandwidth_single, bandwidth_multiple);
double trans_time = producer * consumer * ((double) size_in_bytes / 1000000.) / agg_bandwidth;
Expand Down
17 changes: 0 additions & 17 deletions src/comm/S3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <cmath>

char TAG[] = "S3Client";
Expand Down Expand Up @@ -70,22 +69,6 @@ void FMI::Comm::S3::delete_object(std::string name) {
}
}

std::vector<std::string> FMI::Comm::S3::get_object_names() {
std::vector<std::string> object_names;
Aws::S3::Model::ListObjectsRequest request;
request.WithBucket(bucket_name);
auto outcome = client->ListObjects(request);
if (outcome.IsSuccess()) {
auto objects = outcome.GetResult().GetContents();
for (auto& object : objects) {
object_names.push_back(object.GetKey());
}
} else {
BOOST_LOG_TRIVIAL(error) << "Error when listing objects from S3: " << outcome.GetError();
}
return object_names;
}

double FMI::Comm::S3::get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) {
double fixed_overhead = overhead;
double waiting_time = (double) timeout / 2.;
Expand Down