diff --git a/include/comm/ClientServer.h b/include/comm/ClientServer.h index 04bd9eb..ebcd735 100644 --- a/include/comm/ClientServer.h +++ b/include/comm/ClientServer.h @@ -44,9 +44,6 @@ namespace FMI::Comm { //! Uploads objects and keeps track of them. virtual void upload(channel_data buf, std::string name); - //! List all the currently existing objects, needs to be implemented by channels. Needed by some collectives that check for the existence of files, but do not care about their content. - virtual std::vector get_object_names() = 0; - //! Delete the object with the given name, needs to be implemented by channels. virtual void delete_object(std::string name) = 0; diff --git a/include/comm/Redis.h b/include/comm/Redis.h index 16edc56..f3bbf3c 100644 --- a/include/comm/Redis.h +++ b/include/comm/Redis.h @@ -20,8 +20,6 @@ namespace FMI::Comm { void delete_object(std::string name) override; - std::vector get_object_names() override; - double get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override; double get_price(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override; diff --git a/include/comm/S3.h b/include/comm/S3.h index cb694ab..dbd34c4 100644 --- a/include/comm/S3.h +++ b/include/comm/S3.h @@ -22,8 +22,6 @@ namespace FMI::Comm { void delete_object(std::string name) override; - std::vector get_object_names() override; - double get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override; double get_price(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override; diff --git a/src/comm/ClientServer.cpp b/src/comm/ClientServer.cpp index ef9f5c8..8776944 100644 --- a/src/comm/ClientServer.cpp +++ b/src/comm/ClientServer.cpp @@ -50,11 +50,16 @@ void FMI::Comm::ClientServer::barrier() { upload({&b, sizeof(b)}, file_name); unsigned int elapsed_time = 0; while (elapsed_time < max_timeout) { - auto objects = get_object_names(); - auto has_barrier_suffix = [barrier_suffix] (const std::string& s){return s.size() > barrier_suffix.size() && - s.compare(s.size() - barrier_suffix.size(), barrier_suffix.size(), barrier_suffix) == 0 ;}; - auto num_arrived = std::count_if(objects.begin(), objects.end(), has_barrier_suffix); - if (num_arrived >= num_peers) { + bool all_arrived = true; + for (int i = 0; i < num_peers; i++) { + char arrived = 0; + std::string expected_name = comm_name + std::to_string(i) + barrier_suffix; + if (!download_object({&arrived, sizeof(arrived)}, expected_name)) { + all_arrived = false; + break; + } + } + if (all_arrived) { return; } else { elapsed_time += timeout; diff --git a/src/comm/Redis.cpp b/src/comm/Redis.cpp index 74e36db..d1ef1b9 100644 --- a/src/comm/Redis.cpp +++ b/src/comm/Redis.cpp @@ -59,16 +59,6 @@ void FMI::Comm::Redis::delete_object(std::string name) { freeReplyObject(reply); } -std::vector FMI::Comm::Redis::get_object_names() { - std::vector keys; - std::string command = "KEYS *"; - auto* reply = (redisReply*) redisCommand(context, command.c_str()); - for (int i = 0; i < reply->elements; i++) { - keys.emplace_back(reply->element[i]->str); - } - return keys; -} - double FMI::Comm::Redis::get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) { double agg_bandwidth = std::min(producer * consumer * bandwidth_single, bandwidth_multiple); double trans_time = producer * consumer * ((double) size_in_bytes / 1000000.) / agg_bandwidth; diff --git a/src/comm/S3.cpp b/src/comm/S3.cpp index 7dde3a5..e7cb7a3 100644 --- a/src/comm/S3.cpp +++ b/src/comm/S3.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include char TAG[] = "S3Client"; @@ -70,22 +69,6 @@ void FMI::Comm::S3::delete_object(std::string name) { } } -std::vector FMI::Comm::S3::get_object_names() { - std::vector object_names; - Aws::S3::Model::ListObjectsRequest request; - request.WithBucket(bucket_name); - auto outcome = client->ListObjects(request); - if (outcome.IsSuccess()) { - auto objects = outcome.GetResult().GetContents(); - for (auto& object : objects) { - object_names.push_back(object.GetKey()); - } - } else { - BOOST_LOG_TRIVIAL(error) << "Error when listing objects from S3: " << outcome.GetError(); - } - return object_names; -} - double FMI::Comm::S3::get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) { double fixed_overhead = overhead; double waiting_time = (double) timeout / 2.;