diff --git a/src/stream_engine_base.cpp b/src/stream_engine_base.cpp index 132c85d70d..d3b7665197 100644 --- a/src/stream_engine_base.cpp +++ b/src/stream_engine_base.cpp @@ -399,7 +399,10 @@ void zmq::stream_engine_base_t::restart_output () bool zmq::stream_engine_base_t::restart_input () { - zmq_assert (_input_stopped); + // Engine can be replaced while a write_activated notification is pending. + if (!_input_stopped) + return true; + zmq_assert (_session != NULL); zmq_assert (_decoder != NULL); diff --git a/tests/test_heartbeats.cpp b/tests/test_heartbeats.cpp index b68e60c217..5ce19abe4c 100644 --- a/tests/test_heartbeats.cpp +++ b/tests/test_heartbeats.cpp @@ -384,6 +384,108 @@ void test_setsockopt_heartbeat_ttl_near_zero () test_setsockopt_heartbeat_success (deciseconds_per_millisecond - 1); } +// Test for issue #3937, #4767, #4229, #3596, #4364 +// Reproduces assertion failure: _input_stopped (src/stream_engine_base.cpp) +// +// Root cause: After heartbeat-triggered engine replacement, write_activated() +// calls restart_input() on the new engine which has _input_stopped=false, +// triggering the assertion. +// +// Trigger sequence: +// 1. SUB with ZMQ_RCVHWM=1 and heartbeat connects to PUB +// 2. Application stops receiving - pipe fills immediately +// 3. Engine sets _input_stopped=true and disables POLLIN (backpressure) +// 4. Engine can't observe incoming heartbeats (they're in kernel buffer) +// 5. Heartbeat timeout fires -> engine_error() -> engine destroyed/reconnected +// 6. New engine created with _input_stopped=false +// 7. Application resumes recv(), draining the pipe +// 8. Pipe becomes writable -> write_activated() -> restart_input() +// 9. Assertion fails: restart_input() expects _input_stopped=true +void test_heartbeat_timeout_slow_subscriber () +{ + char pub_endpoint[MAX_SOCKET_STRING]; + + // Create PUB socket + void *pub = test_context_socket (ZMQ_PUB); + int linger = 0; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pub, ZMQ_LINGER, &linger, sizeof (linger))); + bind_loopback_ipv4 (pub, pub_endpoint, sizeof (pub_endpoint)); + + // Create SUB socket with small HWM and short heartbeat + void *sub = test_context_socket (ZMQ_SUB); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (sub, ZMQ_LINGER, &linger, sizeof (linger))); + + int rcvhwm = 1; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (sub, ZMQ_RCVHWM, &rcvhwm, sizeof (rcvhwm))); + + int heartbeat_ivl = 100; // 100ms heartbeat interval + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + sub, ZMQ_HEARTBEAT_IVL, &heartbeat_ivl, sizeof (heartbeat_ivl))); + + int heartbeat_timeout = 200; // 200ms timeout + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_HEARTBEAT_TIMEOUT, + &heartbeat_timeout, + sizeof (heartbeat_timeout))); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, pub_endpoint)); + + // Give time for connection to establish + msleep (SETTLE_TIME); + + // Send messages continuously from PUB to fill the pipe + // PUB will drop messages when SUB's pipe is full, which is fine + char msg[64]; + for (int i = 0; i < 100; i++) { + snprintf (msg, sizeof (msg), "Message %d", i); + zmq_send (pub, msg, strlen (msg), ZMQ_DONTWAIT); + msleep (10); + } + + // Receive a few messages to establish flow + char buf[256]; + int rcvtimeo = 1000; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (sub, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (rcvtimeo))); + + for (int i = 0; i < 5; i++) { + int rc = zmq_recv (sub, buf, sizeof (buf), 0); + if (rc == -1) + break; + } + + // Keep sending while we pause receiving + // This fills the pipe and triggers backpressure (_input_stopped=true) + for (int i = 0; i < 50; i++) { + snprintf (msg, sizeof (msg), "Fill %d", i); + zmq_send (pub, msg, strlen (msg), ZMQ_DONTWAIT); + msleep (10); + } + + // Now pause receiving for longer than heartbeat timeout + // This causes: + // 1. Engine stops reading (backpressure) so can't see heartbeats + // 2. Heartbeat timeout fires + // 3. Engine is replaced with new one (_input_stopped=false) + msleep (1000); // 1 second pause - well over heartbeat timeout + + // Resume receiving - this triggers the bug + // write_activated() calls restart_input() on engine with _input_stopped=false + // --> Assertion failed: _input_stopped + for (int i = 0; i < 20; i++) { + int rc = zmq_recv (sub, buf, sizeof (buf), 0); + if (rc == -1 && zmq_errno () == EAGAIN) + break; + } + + // If we get here, the bug is fixed + test_context_socket_close (sub); + test_context_socket_close (pub); +} + int main (void) { // The test cases are very long-running. The default timeout of 60 seconds @@ -428,5 +530,8 @@ int main (void) RUN_TEST (test_heartbeat_notimeout_gather_scatter_with_curve); #endif + // Test for issue #3937 - heartbeat timeout with slow subscriber + RUN_TEST (test_heartbeat_timeout_slow_subscriber); + return UNITY_END (); }