Skip to content

Commit 8a9128a

Browse files
clebertsuconictabish121
authored andcommitted
ARTEMIS-5069 / ARTEMIS-5068 Temporary queues are going through mirroring
Mirroring should ignore send / create / delete / acks for temporary queues
1 parent b9b4dfd commit 8a9128a

11 files changed

Lines changed: 180 additions & 58 deletions

File tree

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
105105
}
106106

107107
@Override
108-
public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception {
108+
public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration queueConfiguration) throws Exception {
109109
for (MirrorController partition : partitions) {
110-
partition.deleteQueue(addressName, queueName);
110+
partition.deleteQueue(addressName, queueName, queueConfiguration);
111111
}
112112
}
113113

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,14 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception {
238238
}
239239
logger.trace("{} deleteAddress {}", server, addressInfo);
240240

241-
if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
241+
if (invalidTarget(getControllerInUse()) || addressInfo.isInternal() || addressInfo.isTemporary()) {
242+
if (logger.isTraceEnabled()) {
243+
logger.trace("ignoring deleteAddress for invalidTarget = {}, isInternal = {}, isTemporary = {}", invalidTarget(getControllerInUse()), addressInfo.isInternal(), addressInfo.isTemporary());
244+
}
242245
return;
243246
}
244247
if (ignoreAddress(addressInfo.getName())) {
248+
logger.trace("ignoring deleteAddress {} for ignoreAddress condition", addressInfo.getName());
245249
return;
246250
}
247251
if (deleteQueues) {
@@ -282,7 +286,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
282286
}
283287

284288
@Override
285-
public void deleteQueue(SimpleString address, SimpleString queue) throws Exception {
289+
public void deleteQueue(SimpleString address, SimpleString queue, QueueConfiguration queueConfiguration) throws Exception {
286290
if (!brokerConnection.isEnabled()) {
287291
return;
288292
}
@@ -298,6 +302,15 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti
298302
return;
299303
}
300304

305+
if (queueConfiguration != null) {
306+
if (queueConfiguration.isTemporary() || queueConfiguration.isInternal()) {
307+
if (logger.isTraceEnabled()) {
308+
logger.trace("deleteQueue {}/{} ignored for isTemporary = {} or isInternal = {}", address, queue, queueConfiguration.isTemporary(), queueConfiguration.isInternal());
309+
}
310+
return;
311+
}
312+
}
313+
301314
if (deleteQueues) {
302315
Message message = createMessage(address, queue, DELETE_QUEUE, null, queue.toString());
303316
routeMirrorCommand(server, message);
@@ -355,8 +368,8 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
355368
}
356369
SimpleString address = context.getAddress(message);
357370

358-
if (context.isInternal()) {
359-
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
371+
if (context.isMirrorIgnore()) {
372+
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal or temporary queue", server);
360373
return;
361374
}
362375

@@ -587,9 +600,9 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
587600
return;
588601
}
589602

590-
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || ref.getQueue().isMirrorController()))) {
591-
if (logger.isDebugEnabled()) {
592-
logger.debug("preAcknowledge::{} rejecting preAcknowledge queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, ref.getQueue().getName(), ref);
603+
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || ref.getQueue().isTemporary() || ref.getQueue().isMirrorController()))) {
604+
if (logger.isTraceEnabled()) {
605+
logger.trace("ignoring preAcknowledge on ref {} for either internalQueue = {}, temporary = {}, isMirrorController = {}", ref, ref.getQueue().isInternalQueue(), ref.getQueue().isTemporary(), ref.getQueue().isMirrorController());
593606
}
594607
return;
595608
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnota
318318
String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, ADDRESS);
319319
String queueName = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, QUEUE);
320320

321-
deleteQueue(SimpleString.of(address), SimpleString.of(queueName));
321+
deleteQueue(SimpleString.of(address), SimpleString.of(queueName), null);
322322
} else if (eventType.equals(POST_ACK)) {
323323
String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, BROKER_ID);
324324

@@ -440,7 +440,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
440440
}
441441

442442
@Override
443-
public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception {
443+
public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration configuration) throws Exception {
444444
if (logger.isDebugEnabled()) {
445445
logger.debug("{} destroy queue {} on address = {} server {}", server, queueName, addressName, server.getIdentity());
446446
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
640640
}
641641

642642
@Override
643-
public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception {
643+
public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration configuration) throws Exception {
644644

645645
}
646646

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ public interface RoutingContext {
5454
boolean isMirrorIndividualRoute();
5555

5656
/**
57-
* return true if every queue routed is internal
57+
* return true if every queue routed is internal or temporary
5858
*/
59-
boolean isInternal();
59+
boolean isMirrorIgnore();
6060

6161
MirrorController getMirrorSource();
6262

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2616,7 +2616,7 @@ public void destroyQueue(final SimpleString queueName,
26162616
}
26172617

26182618
if (mirrorControllerService != null) {
2619-
mirrorControllerService.deleteQueue(queue.getAddress(), queue.getName());
2619+
mirrorControllerService.deleteQueue(queue.getAddress(), queue.getName(), queue.getQueueConfiguration());
26202620
}
26212621

26222622
queue.deleteQueue(removeConsumers);

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class RoutingContextImpl implements RoutingContext {
5959

6060
Boolean reusable = null;
6161

62-
Boolean internalOnly = null;
62+
Boolean mirrorIgnore = null;
6363

6464
boolean divertDisabled = false;
6565

@@ -130,10 +130,11 @@ public boolean isReusable() {
130130
}
131131

132132
@Override
133-
public boolean isInternal() {
134-
return internalOnly != null && internalOnly;
133+
public boolean isMirrorIgnore() {
134+
return mirrorIgnore != null && mirrorIgnore;
135135
}
136136

137+
137138
@Override
138139
public int getPreviousBindingsVersion() {
139140
return version;
@@ -177,7 +178,7 @@ public RoutingContextImpl clear() {
177178

178179
this.reusable = null;
179180

180-
this.internalOnly = null;
181+
this.mirrorIgnore = null;
181182

182183
// once we set to disabled, we keep it always disabled.
183184
// This is because the routing object used to route commands will disable this
@@ -211,11 +212,11 @@ public void addQueue(final SimpleString address, final Queue queue) {
211212
listing.getNonDurableQueues().add(queue);
212213
}
213214

214-
if (internalOnly == null) {
215-
internalOnly = queue.isInternalQueue();
215+
if (mirrorIgnore == null) {
216+
mirrorIgnore = queue.isInternalQueue() || queue.isTemporary();
216217
} else {
217-
// every queue added has to be internal only
218-
internalOnly = internalOnly && queue.isInternalQueue();
218+
// making sure that every queue added matches the mirrorIgnore
219+
mirrorIgnore = mirrorIgnore && (queue.isInternalQueue() || queue.isTemporary());
219220
}
220221

221222
queueCount++;

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ default boolean isRetryACK() {
3636
void addAddress(AddressInfo addressInfo) throws Exception;
3737
void deleteAddress(AddressInfo addressInfo) throws Exception;
3838
void createQueue(QueueConfiguration queueConfiguration) throws Exception;
39-
void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception;
39+
void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration queueConfiguration) throws Exception;
4040
void sendMessage(Transaction tx, Message message, RoutingContext context);
4141

4242
void postAcknowledge(MessageReference ref, AckReason reason) throws Exception;

artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -949,28 +949,28 @@ public void recheckRefCount(OperationContext context) {
949949
@Test
950950
public void testValidateInternal() {
951951
RoutingContext context = new RoutingContextImpl(new TransactionImpl(new NullStorageManager()));
952-
assertFalse(context.isInternal());
952+
assertFalse(context.isMirrorIgnore());
953953

954954
context.addQueue(SimpleString.of("t1"), new FakeQueueForRoutingContextTest("t1", true, true));
955-
assertTrue(context.isInternal());
955+
assertTrue(context.isMirrorIgnore());
956956

957957
context.addQueue(SimpleString.of("t2"), new FakeQueueForRoutingContextTest("t2", false, true));
958-
assertFalse(context.isInternal());
958+
assertFalse(context.isMirrorIgnore());
959959

960960
context.addQueue(SimpleString.of("t3"), new FakeQueueForRoutingContextTest("t3", true, true));
961-
assertFalse(context.isInternal());
961+
assertFalse(context.isMirrorIgnore());
962962

963963
context.clear();
964-
assertFalse(context.isInternal());
964+
assertFalse(context.isMirrorIgnore());
965965

966966
context.addQueue(SimpleString.of("t1"), new FakeQueueForRoutingContextTest("t1", true, true));
967-
assertTrue(context.isInternal());
967+
assertTrue(context.isMirrorIgnore());
968968

969969
context.addQueue(SimpleString.of("t2"), new FakeQueueForRoutingContextTest("t2", true, true));
970-
assertTrue(context.isInternal());
970+
assertTrue(context.isMirrorIgnore());
971971

972972
context.addQueue(SimpleString.of("t3"), new FakeQueueForRoutingContextTest("t3", true, true));
973-
assertTrue(context.isInternal());
973+
assertTrue(context.isMirrorIgnore());
974974
}
975975

976976
}

0 commit comments

Comments
 (0)