diff --git a/src/gssapi_mechanism_base.cpp b/src/gssapi_mechanism_base.cpp index 89b778031d..4677ee1331 100644 --- a/src/gssapi_mechanism_base.cpp +++ b/src/gssapi_mechanism_base.cpp @@ -52,6 +52,10 @@ int zmq::gssapi_mechanism_base_t::encode_message (msg_t *msg_) flags |= 0x01; if (msg_->flags () & msg_t::command) flags |= 0x02; + if (msg_->is_subscribe ()) + flags |= 0x04; + else if (msg_->is_cancel ()) + flags |= 0x08; uint8_t *plaintext_buffer = static_cast (malloc (msg_->size () + 1)); @@ -174,6 +178,10 @@ int zmq::gssapi_mechanism_base_t::decode_message (msg_t *msg_) msg_->set_flags (msg_t::more); if (flags & 0x02) msg_->set_flags (msg_t::command); + if (flags & 0x04) + msg_->set_flags (msg_t::subscribe); + else if (flags & 0x08) + msg_->set_flags (msg_t::cancel); memcpy (msg_->data (), static_cast (plaintext.value) + 1, plaintext.length - 1); diff --git a/tests/test_security_gssapi.cpp b/tests/test_security_gssapi.cpp index 479ae87c7e..3c9aa2854a 100644 --- a/tests/test_security_gssapi.cpp +++ b/tests/test_security_gssapi.cpp @@ -141,6 +141,131 @@ void tearDown () zmq_threadclose (zap_thread); } +static void setup_gssapi_client_socket (void *client_) +{ + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + client_, ZMQ_GSSAPI_SERVICE_PRINCIPAL, name, strlen (name) + 1)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + client_, ZMQ_GSSAPI_PRINCIPAL, name, strlen (name) + 1)); + int name_type = ZMQ_GSSAPI_NT_HOSTBASED; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + client_, ZMQ_GSSAPI_PRINCIPAL_NAMETYPE, &name_type, sizeof (name_type))); +} + +// Test that PUB/SUB subscriptions work correctly with GSSAPI encryption. +// This is a regression test for the bug where subscribe/cancel flags were +// lost during GSSAPI encode_message(), causing subscriptions to be silently +// dropped or corrupted after reconnection. +void test_pubsub_subscription () +{ + // Create a PUB socket acting as server + void *pub = test_context_socket (ZMQ_PUB); + int as_server = 1; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pub, ZMQ_GSSAPI_SERVER, &as_server, sizeof (int))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pub, ZMQ_GSSAPI_PRINCIPAL, name, strlen (name) + 1)); + int name_type = ZMQ_GSSAPI_NT_HOSTBASED; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + pub, ZMQ_GSSAPI_PRINCIPAL_NAMETYPE, &name_type, sizeof (name_type))); + + char pub_endpoint[MAX_SOCKET_STRING]; + bind_loopback_ipv4 (pub, pub_endpoint, sizeof pub_endpoint); + + // Create a SUB socket acting as client + void *sub = test_context_socket (ZMQ_SUB); + setup_gssapi_client_socket (sub); + + // Subscribe to topic "test" + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "test", 4)); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, pub_endpoint)); + + // Give time for connection and subscription to be established + msleep (500); + + // Send a matching message + send_string_expect_success (pub, "test message", 0); + + // Should receive the message + int timeout = 1000; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (sub, ZMQ_RCVTIMEO, &timeout, sizeof (timeout))); + recv_string_expect_success (sub, "test message", 0); + + test_context_socket_close (sub); + test_context_socket_close (pub); +} + +// Test that PUB/SUB subscriptions survive reconnection with GSSAPI encryption. +// Specifically tests the bug where xhiccuped() re-sends subscriptions through +// GSSAPI encode_message() and the subscribe flag was lost, causing the +// subscription to be silently dropped on the PUB side after reconnect. +void test_pubsub_subscription_after_reconnect () +{ + // Create a PUB socket acting as server + void *pub = test_context_socket (ZMQ_PUB); + int as_server = 1; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pub, ZMQ_GSSAPI_SERVER, &as_server, sizeof (int))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pub, ZMQ_GSSAPI_PRINCIPAL, name, strlen (name) + 1)); + int name_type = ZMQ_GSSAPI_NT_HOSTBASED; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + pub, ZMQ_GSSAPI_PRINCIPAL_NAMETYPE, &name_type, sizeof (name_type))); + + char pub_endpoint[MAX_SOCKET_STRING]; + bind_loopback_ipv4 (pub, pub_endpoint, sizeof pub_endpoint); + + // Create a SUB socket, subscribe, and connect + void *sub = test_context_socket (ZMQ_SUB); + setup_gssapi_client_socket (sub); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "test", 4)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, pub_endpoint)); + + // Give time for connection and subscription to be established + msleep (500); + + // Verify initial subscription works + send_string_expect_success (pub, "test hello", 0); + int timeout = 1000; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (sub, ZMQ_RCVTIMEO, &timeout, sizeof (timeout))); + recv_string_expect_success (sub, "test hello", 0); + + // Simulate server restart: close and rebind PUB on same endpoint + test_context_socket_close (pub); + msleep (100); + + pub = test_context_socket (ZMQ_PUB); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pub, ZMQ_GSSAPI_SERVER, &as_server, sizeof (int))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pub, ZMQ_GSSAPI_PRINCIPAL, name, strlen (name) + 1)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + pub, ZMQ_GSSAPI_PRINCIPAL_NAMETYPE, &name_type, sizeof (name_type))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, pub_endpoint)); + + // Give time for the SUB to reconnect and re-send subscriptions + msleep (1000); + + // Send a matching message - subscription should still be active + send_string_expect_success (pub, "test world", 0); + recv_string_expect_success (sub, "test world", 0); + + // Also verify that a non-matching message is filtered + send_string_expect_success (pub, "other message", 0); + char buf[64]; + int rc = zmq_recv (sub, buf, sizeof buf, 0); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + test_context_socket_close (sub); + test_context_socket_close (pub); +} + void test_valid_creds () { void *client = test_context_socket (ZMQ_DEALER); @@ -243,5 +368,7 @@ int main (void) RUN_TEST (test_plain_creds); RUN_TEST (test_vanilla_socket); RUN_TEST (test_unauth_creds); + RUN_TEST (test_pubsub_subscription); + RUN_TEST (test_pubsub_subscription_after_reconnect); return UNITY_END (); }