Skip to content

Commit 8098239

Browse files
committed
ARTEMIS-5987: Properly handling exceptions with single connection.
If there is an issue while creating a session in single connection mode then the connection factory can be in a bad state and the loop for creating sessions still continues over this. Issue: https://issues.apache.org/jira/browse/ARTEMIS-5987 Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
1 parent 066c8ce commit 8098239

File tree

2 files changed

+201
-4
lines changed

2 files changed

+201
-4
lines changed

artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ protected synchronized void setup() throws Exception {
244244
ClientSessionFactory cf = null;
245245

246246
for (int i = 0; i < spec.getMaxSession(); i++) {
247-
//if we are sharing the ceonnection only create 1
247+
//if we are sharing the connection only create 1
248248
if (!spec.isSingleConnection()) {
249249
cf = null;
250250
}
@@ -259,17 +259,22 @@ protected synchronized void setup() throws Exception {
259259
handler.setup();
260260
handlers.add(handler);
261261
} catch (Exception e) {
262+
logger.trace("Failed to setup session {} for activation {}", i, spec, e);
262263
if (cf != null) {
263-
if (!spec.isSingleConnection()) {
264-
cf.close();
265-
}
264+
cf.close();
266265
}
267266
if (session != null) {
268267
session.close();
269268
}
270269
if (firstException == null) {
271270
firstException = e;
272271
}
272+
if (spec.isSingleConnection()) {
273+
// The shared ClientSessionFactory is in a broken state; stop the loop
274+
// all remaining sessions would fail with "ClientSession closed while
275+
// creating session", masking the real error.
276+
break;
277+
}
273278
}
274279
}
275280
//if we have any exceptions close all the handlers and throw the first exception.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.artemis.tests.integration.ra;
18+
19+
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertFalse;
21+
import static org.junit.jupiter.api.Assertions.assertNotNull;
22+
import static org.junit.jupiter.api.Assertions.assertTrue;
23+
import static org.junit.jupiter.api.Assertions.fail;
24+
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
29+
import org.apache.activemq.artemis.api.core.ActiveMQException;
30+
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
31+
import org.apache.activemq.artemis.api.core.Interceptor;
32+
import org.apache.activemq.artemis.api.core.client.ClientMessage;
33+
import org.apache.activemq.artemis.api.core.client.ClientProducer;
34+
import org.apache.activemq.artemis.api.core.client.ClientSession;
35+
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
36+
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
37+
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
38+
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
39+
import org.junit.jupiter.api.Test;
40+
41+
/**
42+
* Tests for single connection mode exception handling in ActiveMQActivation.
43+
* Related to ARTEMIS-5987: Properly handling exceptions with single connection.
44+
*/
45+
public class ActiveMQMessageHandlerSingleConnectionTest extends ActiveMQRATestBase {
46+
47+
@Override
48+
public boolean useSecurity() {
49+
return false;
50+
}
51+
52+
/**
53+
* Test that when using single connection mode and session creation fails (server is down),
54+
* the activation setup fails immediately with the original exception and does not continue
55+
* creating more sessions that would mask the real error with a "ClientSessionFactory is closed"
56+
* error.
57+
*
58+
* Without the fix, the loop would continue after closing the shared ClientSessionFactory,
59+
* and subsequent sessions would fail with a different exception, hiding the root cause.
60+
*/
61+
@Test
62+
public void testSingleConnectionSessionCreationFailurePropagatesOriginalException() throws Exception {
63+
server.stop();
64+
65+
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
66+
MyBootstrapContext ctx = new MyBootstrapContext();
67+
qResourceAdapter.start(ctx);
68+
69+
ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
70+
spec.setResourceAdapter(qResourceAdapter);
71+
spec.setUseJNDI(false);
72+
spec.setDestinationType("javax.jms.Queue");
73+
spec.setDestination(MDBQUEUE);
74+
spec.setMaxSession(5); // Multiple sessions to exercise the loop
75+
spec.setSingleConnection(true);
76+
spec.setSetupAttempts(1); // Only try once, no reconnection
77+
78+
CountDownLatch latch = new CountDownLatch(1);
79+
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
80+
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
81+
82+
try {
83+
qResourceAdapter.endpointActivation(endpointFactory, spec);
84+
fail("Expected activation to fail because server is down");
85+
} catch (Exception e) {
86+
// The thrown exception must be the original connection failure, not a secondary
87+
// "ClientSessionFactory is closed" / "IllegalStateException" that would appear
88+
// if the loop continued past the first failure in single connection mode.
89+
assertTrue(e instanceof ActiveMQException, "Expected ActiveMQException but got: " + e.getClass().getName() + ": " + e.getMessage());
90+
assertEquals(ActiveMQExceptionType.NOT_CONNECTED, ((ActiveMQException) e).getType(),
91+
"Expected NOT_CONNECTED exception type but got: " + ((ActiveMQException) e).getType());
92+
} finally {
93+
qResourceAdapter.stop();
94+
// Restart the server so that tearDown() can cleanly stop jmsServer
95+
server.start();
96+
}
97+
}
98+
99+
/**
100+
* Test that when using single connection mode and session creation fails mid-loop (after some
101+
* sessions have already been set up successfully), the original exception is still propagated
102+
* and does not get masked by a secondary "ClientSessionFactory is closed" error.
103+
*
104+
* This exercises the handler teardown path: the already-created handlers must be torn down
105+
* and the loop must break immediately on the first failure.
106+
*/
107+
@Test
108+
public void testSingleConnectionMidLoopFailurePropagatesOriginalException() throws Exception {
109+
// Allow the first 2 CreateSession packets through, fail on the 3rd.
110+
// This means sessions 0 and 1 succeed, session 2 fails mid-loop.
111+
AtomicInteger sessionCreateCount = new AtomicInteger(0);
112+
Interceptor interceptor = (packet, connection) -> {
113+
if (packet instanceof CreateSessionMessage && sessionCreateCount.incrementAndGet() > 2) {
114+
throw new ActiveMQException(ActiveMQExceptionType.NOT_CONNECTED, "injected mid-loop failure");
115+
}
116+
return true;
117+
};
118+
server.getRemotingService().addIncomingInterceptor(interceptor);
119+
120+
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
121+
MyBootstrapContext ctx = new MyBootstrapContext();
122+
qResourceAdapter.start(ctx);
123+
124+
ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
125+
spec.setResourceAdapter(qResourceAdapter);
126+
spec.setUseJNDI(false);
127+
spec.setDestinationType("javax.jms.Queue");
128+
spec.setDestination(MDBQUEUE);
129+
spec.setMaxSession(5); // 2 succeed, then fail mid-loop
130+
spec.setSingleConnection(true);
131+
spec.setSetupAttempts(1);
132+
133+
CountDownLatch latch = new CountDownLatch(1);
134+
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
135+
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
136+
137+
try {
138+
qResourceAdapter.endpointActivation(endpointFactory, spec);
139+
fail("Expected activation to fail due to injected mid-loop failure");
140+
} catch (Exception e) {
141+
assertTrue(e instanceof ActiveMQException, "Expected ActiveMQException but got: " + e.getClass().getName() + ": " + e.getMessage());
142+
// Without the fix, the loop would continue past the failure and subsequent sessions
143+
// would fail with "ClientSessionFactory is closed", masking the original error.
144+
assertFalse(e.getMessage().contains("closed"), "Got a masked 'factory closed' exception instead of the original error: " + e.getMessage());
145+
} finally {
146+
server.getRemotingService().removeIncomingInterceptor(interceptor);
147+
qResourceAdapter.stop();
148+
}
149+
}
150+
151+
/**
152+
* Test that single connection mode works correctly under normal operation — all sessions
153+
* share one underlying connection and messages are delivered.
154+
*/
155+
@Test
156+
public void testSingleConnectionNormalOperation() throws Exception {
157+
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
158+
MyBootstrapContext ctx = new MyBootstrapContext();
159+
qResourceAdapter.start(ctx);
160+
161+
ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
162+
spec.setResourceAdapter(qResourceAdapter);
163+
spec.setUseJNDI(false);
164+
spec.setDestinationType("javax.jms.Queue");
165+
spec.setDestination(MDBQUEUE);
166+
spec.setMaxSession(3);
167+
spec.setSingleConnection(true);
168+
169+
CountDownLatch latch = new CountDownLatch(3);
170+
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
171+
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
172+
173+
qResourceAdapter.endpointActivation(endpointFactory, spec);
174+
175+
try (ClientSessionFactory sf = locator.createSessionFactory();
176+
ClientSession session = sf.createSession()) {
177+
ClientProducer producer = session.createProducer(MDBQUEUEPREFIXED);
178+
for (int i = 0; i < 3; i++) {
179+
ClientMessage message = session.createMessage(true);
180+
message.getBodyBuffer().writeString("test-message-" + i);
181+
producer.send(message);
182+
}
183+
}
184+
185+
assertTrue(latch.await(5, TimeUnit.SECONDS), "All 3 messages should be received within 5s");
186+
assertNotNull(endpoint.lastMessage, "At least one message should have been received");
187+
188+
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
189+
qResourceAdapter.stop();
190+
}
191+
192+
}

0 commit comments

Comments
 (0)