-
Notifications
You must be signed in to change notification settings - Fork 7
feat(replication): implement connection health checks and task retry logic #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
g-husam
wants to merge
5
commits into
main
Choose a base branch
from
feature/replication-robustness
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 2 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,7 +51,7 @@ ScopedConnection::~ScopedConnection() { Release(); } | |
|
|
||
| void ScopedConnection::Release() { | ||
| if (pool_ != nullptr && sockfd_ >= 0) { | ||
| pool_->ReleaseConnection(sockfd_, true); | ||
| pool_->ReleaseConnection(sockfd_, reuse_); | ||
| } else if (sockfd_ >= 0) { | ||
| close(sockfd_); | ||
| } | ||
|
|
@@ -154,27 +154,93 @@ int ConnectionPool::CreateConnection() { | |
| return -1; | ||
| } | ||
|
|
||
| bool ConnectionPool::IsConnectionAlive(int sockfd) { | ||
| if (sockfd < 0) return false; | ||
|
|
||
| char buf; | ||
| // We use MSG_PEEK | MSG_DONTWAIT to check the status of the TCP connection | ||
| // without actually consuming any data from the socket's receive buffer. | ||
| // This is a fast, zero-copy way to ask the kernel if the connection has | ||
| // been closed or has encountered an error while it was idle in the pool. | ||
| ssize_t r = recv(sockfd, &buf, 1, MSG_PEEK | MSG_DONTWAIT); | ||
|
|
||
| if (r == 0) { | ||
| // If recv returns 0, it means the remote peer has performed an orderly | ||
| // shutdown (sent a FIN packet). The connection is no longer usable for | ||
| // sending new requests. | ||
| return false; | ||
| } else if (r < 0) { | ||
| // If recv returns -1, we check errno to distinguish between "no data" | ||
| // and a real network error. | ||
| if (errno == EAGAIN || errno == EWOULDBLOCK) { | ||
| // EAGAIN/EWOULDBLOCK means the connection is still alive and healthy, | ||
| // but there is currently no data waiting to be read. This is the | ||
| // expected state for an idle connection in the pool. | ||
| return true; | ||
| } | ||
| // Any other error (like ECONNRESET or EPIPE) indicates that the | ||
| // connection has been broken or timed out. | ||
| return false; | ||
| } | ||
| // If r > 0, there is actual data waiting in the buffer. While unusual for | ||
| // an idle pool connection, it indicates the connection is definitely alive. | ||
| return true; | ||
| } | ||
|
|
||
| std::optional<ScopedConnection> ConnectionPool::GetConnection(int timeout_ms) { | ||
| CHECK_GT(timeout_ms, 0) << "timeout_ms must be positive"; | ||
| std::unique_lock<std::mutex> lock(mtx_); | ||
| if (!cv_.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this] { | ||
| return !available_connections_.empty() || stopping_; | ||
| })) { | ||
| LOG(WARNING) << "ConnectionPool::GetConnection: timeout"; | ||
| return std::nullopt; | ||
| } | ||
| if (stopping_) { | ||
| LOG(WARNING) << "ConnectionPool::GetConnection: stopping"; | ||
| return std::nullopt; | ||
| } | ||
| if (available_connections_.empty()) { | ||
| // TODO: Handle the case when we run out of connections | ||
| LOG(WARNING) << "ConnectionPool::GetConnection: no available connections"; | ||
| return std::nullopt; | ||
|
|
||
| // Calculate the absolute deadline to ensure we respect the user-provided | ||
| // timeout even if we have to loop through several dead connections. | ||
| auto start_time = std::chrono::steady_clock::now(); | ||
| auto end_time = start_time + std::chrono::milliseconds(timeout_ms); | ||
|
|
||
| while (true) { | ||
| std::unique_lock<std::mutex> lock(mtx_); | ||
|
|
||
| // Re-calculate the remaining wait time for each iteration of the loop. | ||
| auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>( | ||
| end_time - std::chrono::steady_clock::now()); | ||
|
|
||
| if (remaining.count() <= 0 || !cv_.wait_for(lock, remaining, [this] { | ||
| return !available_connections_.empty() || stopping_; | ||
| })) { | ||
| LOG(WARNING) << "ConnectionPool::GetConnection: timeout reached while " | ||
| "searching for a healthy connection"; | ||
| return std::nullopt; | ||
| } | ||
|
|
||
| if (stopping_) { | ||
| LOG(WARNING) << "ConnectionPool::GetConnection: pool is shutting down"; | ||
| return std::nullopt; | ||
| } | ||
|
|
||
| // Pop the oldest connection from the FIFO queue. | ||
| int fd = available_connections_.front(); | ||
| available_connections_.pop(); | ||
|
|
||
| // Verify the connection's health before handing it to the caller. | ||
| // This protects against "stale" connections that were closed by the | ||
| // peer or a firewall while sitting idle in the pool. | ||
| if (IsConnectionAlive(fd)) { | ||
| return ScopedConnection(fd, this); | ||
| } | ||
|
|
||
| // The connection is dead. We close it and attempt to retrieve another | ||
| // one from the queue. | ||
| LOG(INFO) << "ConnectionPool::GetConnection: discarded dead connection; " | ||
| "retrying with next available connection"; | ||
| close(fd); | ||
|
|
||
| // To maintain the desired pool size, we immediately attempt to open a | ||
| // replacement connection. This ensures the pool doesn't slowly drain | ||
| // if many connections go stale at once. | ||
| int new_fd = CreateConnection(); | ||
| if (new_fd >= 0) { | ||
| available_connections_.push(new_fd); | ||
| // The loop will continue and pick up this or another connection. | ||
| } | ||
| } | ||
| int fd = available_connections_.front(); | ||
| available_connections_.pop(); | ||
| return ScopedConnection(fd, this); | ||
| } | ||
|
|
||
| // Returns a connection to the pool, allowing it to be reused. | ||
|
|
@@ -186,18 +252,18 @@ void ConnectionPool::ReleaseConnection(int sockfd, bool reuse) { | |
| LOG(WARNING) << "ConnectionPool::ReleaseConnection: invalid sockfd"; | ||
| return; | ||
| } | ||
|
|
||
| std::unique_lock<std::mutex> lock(mtx_); | ||
| if (stopping_) { | ||
| LOG(WARNING) | ||
| << "ConnectionPool::ReleaseConnection: stopping, close connection"; | ||
| close(sockfd); | ||
| return; | ||
| } | ||
|
|
||
| if (reuse) { | ||
| if (available_connections_.size() < max_size_) { | ||
| LOG(INFO) << "ConnectionPool::ReleaseConnection: reuse connection"; | ||
| // TODO: Check if we need cleanup for the connection before return it to | ||
| // the pool | ||
| available_connections_.push(sockfd); | ||
| cv_.notify_one(); | ||
| } else { | ||
|
|
@@ -207,8 +273,22 @@ void ConnectionPool::ReleaseConnection(int sockfd, bool reuse) { | |
| } | ||
| } else { | ||
| LOG(INFO) | ||
| << "ConnectionPool::ReleaseConnection: do not reuse, close connection"; | ||
| << "ConnectionPool::ReleaseConnection: connection marked as unusable, " | ||
| "closing and replenishing pool"; | ||
| close(sockfd); | ||
|
|
||
| // Since we are discarding a connection that was previously part of the | ||
| // pool's "active" set, we create a new one to maintain the fixed pool size. | ||
| // This prevents the pool from permanently shrinking when network errors | ||
| // occur. | ||
| int new_fd = CreateConnection(); | ||
| if (new_fd >= 0) { | ||
| available_connections_.push(new_fd); | ||
| cv_.notify_one(); | ||
| } else { | ||
| LOG(ERROR) << "ConnectionPool::ReleaseConnection: failed to replenish " | ||
| "pool after discarding unusable connection"; | ||
| } | ||
|
Comment on lines
+290
to
+297
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
| } | ||
| } // namespace ml_flashpoint::replication::transfer_service | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling
CreateConnection()while holding the pool's mutex (mtx_) can cause significant contention, as it's a blocking network operation. This will block any other thread trying to get or release a connection. Consider releasing the lock before this call and re-acquiring it after to push the new connection to the pool. Remember to re-checkstopping_after re-acquiring the lock.