Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 111 additions & 25 deletions src/ml_flashpoint/replication/transfer_service/connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ ScopedConnection::~ScopedConnection() { Release(); }

void ScopedConnection::Release() {
if (pool_ != nullptr && sockfd_ >= 0) {
pool_->ReleaseConnection(sockfd_, true);
pool_->ReleaseConnection(sockfd_, reuse_);
} else if (sockfd_ >= 0) {
close(sockfd_);
}
Expand Down Expand Up @@ -81,7 +81,7 @@ ConnectionPool::~ConnectionPool() {
cv_.notify_all();
std::unique_lock<std::mutex> lock(mtx_);
while (!available_connections_.empty()) {
close(available_connections_.front());
close(available_connections_.top());
available_connections_.pop();
}
}
Expand All @@ -96,7 +96,7 @@ bool ConnectionPool::Initialize() {
int fd = CreateConnection();
if (fd < 0) {
while (!available_connections_.empty()) {
close(available_connections_.front());
close(available_connections_.top());
available_connections_.pop();
}
return false;
Expand Down Expand Up @@ -154,50 +154,122 @@ int ConnectionPool::CreateConnection() {
return -1;
}

bool ConnectionPool::IsConnectionAlive(int sockfd) {
if (sockfd < 0) return false;

char buf;
// We use MSG_PEEK | MSG_DONTWAIT to check the status of the TCP connection
// without actually consuming any data from the socket's receive buffer.
// This is a fast, zero-copy way to ask the kernel if the connection has
// been closed or has encountered an error while it was idle in the pool.
ssize_t r = recv(sockfd, &buf, 1, MSG_PEEK | MSG_DONTWAIT);

if (r == 0) {
// If recv returns 0, it means the remote peer has performed an orderly
// shutdown (sent a FIN packet). The connection is no longer usable for
// sending new requests.
return false;
} else if (r < 0) {
// If recv returns -1, we check errno to distinguish between "no data"
// and a real network error.
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// EAGAIN/EWOULDBLOCK means the connection is still alive and healthy,
// but there is currently no data waiting to be read. This is the
// expected state for an idle connection in the pool.
return true;
}
// Any other error (like ECONNRESET or EPIPE) indicates that the
// connection has been broken or timed out.
return false;
}
// If r > 0, there is actual data waiting in the buffer. While unusual for
// an idle pool connection, it indicates the connection is definitely alive.
return true;
}

std::optional<ScopedConnection> ConnectionPool::GetConnection(int timeout_ms) {
CHECK_GT(timeout_ms, 0) << "timeout_ms must be positive";
std::unique_lock<std::mutex> lock(mtx_);
if (!cv_.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this] {
return !available_connections_.empty() || stopping_;
})) {
LOG(WARNING) << "ConnectionPool::GetConnection: timeout";
return std::nullopt;
}
if (stopping_) {
LOG(WARNING) << "ConnectionPool::GetConnection: stopping";
return std::nullopt;
}
if (available_connections_.empty()) {
// TODO: Handle the case when we run out of connections
LOG(WARNING) << "ConnectionPool::GetConnection: no available connections";
return std::nullopt;

// Calculate the absolute deadline to ensure we respect the user-provided
// timeout even if we have to loop through several dead connections.
auto start_time = std::chrono::steady_clock::now();
auto end_time = start_time + std::chrono::milliseconds(timeout_ms);

while (true) {
std::unique_lock<std::mutex> lock(mtx_);

// Re-calculate the remaining wait time for each iteration of the loop.
auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - std::chrono::steady_clock::now());

if (remaining.count() <= 0 || !cv_.wait_for(lock, remaining, [this] {
return !available_connections_.empty() || stopping_;
})) {
LOG(WARNING) << "ConnectionPool::GetConnection: timeout reached while "
"searching for a healthy connection";
return std::nullopt;
}

if (stopping_) {
LOG(WARNING) << "ConnectionPool::GetConnection: pool is shutting down";
return std::nullopt;
}

// Pop the most recently used connection from the LIFO stack.
// LIFO (Last-In, First-Out) is preferred for connection pools as it
// increases the likelihood of reusing "hot" connections that still have
// active TCP state (e.g., large congestion windows) and are still cached
// in the kernel/CPU.
int fd = available_connections_.top();
available_connections_.pop();

// Verify the connection's health before handing it to the caller.
// This protects against "stale" connections that were closed by the
// peer or a firewall while sitting idle in the pool.
if (IsConnectionAlive(fd)) {
return ScopedConnection(fd, this);
}

// The connection is dead. We close it and attempt to retrieve another
// one from the stack.
LOG(INFO) << "ConnectionPool::GetConnection: discarded dead connection; "
"retrying with next available connection";
close(fd);

// To maintain the desired pool size, we immediately attempt to open a
// replacement connection. This ensures the pool doesn't slowly drain
// if many connections go stale at once.
int new_fd = CreateConnection();
if (new_fd >= 0) {
available_connections_.push(new_fd);
// The loop will continue and pick up this or another connection.
}
}
Comment on lines +243 to 247
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Calling CreateConnection() while holding the pool's mutex (mtx_) can cause significant contention, as it's a blocking network operation. This will block any other thread trying to get or release a connection. Consider releasing the lock before this call and re-acquiring it after to push the new connection to the pool. Remember to re-check stopping_ after re-acquiring the lock.

int fd = available_connections_.front();
available_connections_.pop();
return ScopedConnection(fd, this);
}

// Returns a connection to the pool, allowing it to be reused.
//
// If `reuse` is true and the pool is not full, the connection is added back to
// the queue of available connections. Otherwise, the connection is closed.
// the stack of available connections. Otherwise, the connection is closed.
void ConnectionPool::ReleaseConnection(int sockfd, bool reuse) {
if (sockfd < 0) {
LOG(WARNING) << "ConnectionPool::ReleaseConnection: invalid sockfd";
return;
}

std::unique_lock<std::mutex> lock(mtx_);
if (stopping_) {
LOG(WARNING)
<< "ConnectionPool::ReleaseConnection: stopping, close connection";
close(sockfd);
return;
}

if (reuse) {
if (available_connections_.size() < max_size_) {
LOG(INFO) << "ConnectionPool::ReleaseConnection: reuse connection";
// TODO: Check if we need cleanup for the connection before return it to
// the pool
// We push to the stack to ensure this connection is the first to be
// reused by the next caller (LIFO).
available_connections_.push(sockfd);
cv_.notify_one();
} else {
Expand All @@ -207,8 +279,22 @@ void ConnectionPool::ReleaseConnection(int sockfd, bool reuse) {
}
} else {
LOG(INFO)
<< "ConnectionPool::ReleaseConnection: do not reuse, close connection";
<< "ConnectionPool::ReleaseConnection: connection marked as unusable, "
"closing and replenishing pool";
close(sockfd);

// Since we are discarding a connection that was previously part of the
// pool's "active" set, we create a new one to maintain the fixed pool size.
// This prevents the pool from permanently shrinking when network errors
// occur.
int new_fd = CreateConnection();
if (new_fd >= 0) {
available_connections_.push(new_fd);
cv_.notify_one();
} else {
LOG(ERROR) << "ConnectionPool::ReleaseConnection: failed to replenish "
"pool after discarding unusable connection";
}
Comment on lines +290 to +297
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

As in GetConnection, CreateConnection() is called here while holding the pool's mutex. This blocking call can harm concurrency. The lock should be released before creating the connection and re-acquired after. Don't forget to handle the stopping_ case after re-acquiring the lock.

}
}
} // namespace ml_flashpoint::replication::transfer_service
13 changes: 11 additions & 2 deletions src/ml_flashpoint/replication/transfer_service/connection_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <stack>
#include <string>

namespace ml_flashpoint::replication::transfer_service {
Expand All @@ -57,11 +57,17 @@ class ScopedConnection {

int fd() const { return sockfd_; }
bool IsValid() const { return sockfd_ >= 0; }

// Marks the connection as unusable (e.g., after a socket error).
// This prevents it from being returned to the pool for reuse.
void SetUnusable() { reuse_ = false; }

void Release();

private:
int sockfd_;
ConnectionPool* pool_;
bool reuse_ = true;
};

// Manages a thread-safe pool of TCP connections to a single peer.
Expand Down Expand Up @@ -108,6 +114,9 @@ class ConnectionPool {
// Releases a connection back to the pool or closes it.
void ReleaseConnection(int sockfd, bool reuse = true);

// Checks if a connection is still alive by performing a non-blocking peek.
bool IsConnectionAlive(int sockfd);

// Creates a new connection to the peer.
// Returns the socket file descriptor on a successful connection, or -1 on
// failure.
Expand All @@ -116,7 +125,7 @@ class ConnectionPool {
std::string peer_host_;
int peer_port_;
size_t max_size_;
std::queue<int> available_connections_; // Guarded by mtx_.
std::stack<int> available_connections_; // Guarded by mtx_.
std::mutex mtx_; // Protects available_connections_ and stopping_.
std::condition_variable
cv_; // Signaled when a connection is released or the pool is stopping.
Expand Down
Loading
Loading