Skip to content

Commit 3f0297e

Browse files
ARTEMIS-6011 Wildcard routing / Address full or dropped is not properly propagated
Co-authored with Justin Bertram
1 parent 93eaead commit 3f0297e

15 files changed

Lines changed: 473 additions & 84 deletions

File tree

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.activemq.artemis.api.core.SimpleString;
2828
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
2929
import org.apache.activemq.artemis.core.io.IOCallback;
30+
import org.apache.activemq.artemis.core.paging.PagingStore;
3031
import org.apache.activemq.artemis.core.persistence.OperationContext;
3132
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
3233
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
@@ -787,12 +788,19 @@ public static void routeMirrorCommand(ActiveMQServer server, Message message, Tr
787788

788789
static class PagedRouteContext implements RouteContextList {
789790

791+
private final PagingStore addressStore;
790792
private final List<Queue> durableQueues;
791793
private final List<Queue> nonDurableQueues;
792794

795+
@Override
796+
public PagingStore getAddressStore() {
797+
return addressStore;
798+
}
799+
793800
PagedRouteContext(Queue snfQueue) {
794801
List<Queue> queues = new ArrayList<>(1);
795802
queues.add(snfQueue);
803+
this.addressStore = snfQueue.getPagingStore();
796804

797805
if (snfQueue.isDurable()) {
798806
durableQueues = queues;

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf
139139

140140
Page usePage(long page);
141141

142+
boolean checkFullPolicy(Message message) throws Exception;
143+
142144
/**
143145
* Use this method when you want to use the cache of used pages. If you are just using offline (e.g. print-data), use
144146
* the newPageObject method.

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java

Lines changed: 37 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1424,66 +1424,29 @@ public boolean page(Message message,
14241424
}
14251425

14261426
@Override
1427-
public int page(Message message,
1428-
final Transaction tx,
1429-
RouteContextList listCtx,
1430-
Function<Message, Message> pageDecorator,
1431-
boolean useFlowControl) throws Exception {
1432-
1433-
if (!running) {
1434-
return -1;
1435-
}
1436-
1437-
boolean diskFull = pagingManager.isDiskFull();
1438-
1439-
if (diskFullMessagePolicy == DiskFullMessagePolicy.DROP || diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
1440-
if (diskFull) {
1427+
public boolean checkFullPolicy(Message message) throws Exception {
1428+
if (isFull() || pagingManager.isDiskFull()) {
1429+
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
14411430
if (message.isLargeMessage()) {
1442-
((LargeServerMessage) message).deleteFile();
1443-
}
1444-
1445-
if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
1446-
throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
1447-
}
1448-
1449-
// Dist is full, just drop the data
1450-
if (!printedDropMessagesWarning) {
1451-
printedDropMessagesWarning = true;
1452-
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo());
1431+
removeLargeMessage(message);
14531432
}
1454-
1455-
return 0;
1456-
}
1457-
}
1458-
1459-
boolean full = isFull();
1460-
1461-
if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
1462-
if (full) {
1433+
throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
1434+
} else if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
14631435
if (message.isLargeMessage()) {
1464-
((LargeServerMessage) message).deleteFile();
1436+
removeLargeMessage(message);
14651437
}
1466-
1467-
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
1468-
throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
1469-
}
1470-
1471-
// Address is full, we just pretend we are paging, and drop the data
1438+
// Dist is full, just drop the data
14721439
if (!printedDropMessagesWarning) {
14731440
printedDropMessagesWarning = true;
14741441
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo());
14751442
}
1476-
return 0;
1477-
} else {
1478-
return -1;
1443+
return false;
14791444
}
1480-
} else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
1481-
return -1;
14821445
}
14831446

14841447
if (pageFull) {
14851448
if (message.isLargeMessage()) {
1486-
((LargeServerMessage) message).deleteFile();
1449+
removeLargeMessage(message);
14871450
}
14881451

14891452
if (pageFullMessagePolicy == PageFullMessagePolicy.FAIL) {
@@ -1494,10 +1457,34 @@ public int page(Message message,
14941457
printedDropMessagesWarning = true;
14951458
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo());
14961459
}
1460+
return false;
1461+
}
14971462

1498-
// we are in page mode, if we got to this point, we are dropping the message while still paging
1499-
// we return 0 as in the storage is in "page mode" however no credits are being taken.
1500-
return 0;
1463+
return true;
1464+
}
1465+
1466+
private static void removeLargeMessage(Message message) {
1467+
try {
1468+
((LargeServerMessage) message).deleteFile();
1469+
} catch (Exception e) {
1470+
// only thing to be done is log on this case
1471+
logger.debug("Error deleting large message file for {}", message, e);
1472+
}
1473+
}
1474+
1475+
@Override
1476+
public int page(Message message,
1477+
final Transaction tx,
1478+
RouteContextList listCtx,
1479+
Function<Message, Message> pageDecorator,
1480+
boolean useFlowControl) throws Exception {
1481+
1482+
if (!running) {
1483+
return -1;
1484+
}
1485+
1486+
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
1487+
return -1;
15011488
}
15021489

15031490
return writePage(message, tx, listCtx, pageDecorator, useFlowControl);

artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1685,6 +1685,25 @@ public void processRoute(final Message message,
16851685
final SimpleString messageAddress = message.getAddressSimpleString();
16861686
final PagingStore owningStore = pagingManager.getPageStore(messageAddress);
16871687
message.setOwner(owningStore);
1688+
boolean dropMessages = false;
1689+
if (owningStore != null) {
1690+
if (!owningStore.checkFullPolicy(message)) {
1691+
dropMessages = true;
1692+
}
1693+
}
1694+
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
1695+
final PagingStore store = entry.getValue().getAddressStore();
1696+
if (store != null) {
1697+
if (!store.checkFullPolicy(message)) {
1698+
dropMessages = true;
1699+
}
1700+
}
1701+
}
1702+
1703+
if (dropMessages) {
1704+
return;
1705+
}
1706+
16881707
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
16891708
final PagingStore store;
16901709
if (entry.getKey().equals(messageAddress)) {

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818

1919
import java.util.List;
2020

21+
import org.apache.activemq.artemis.core.paging.PagingStore;
22+
2123
/**
2224
* This is a simple datatype containing the list of a routing context
2325
*/
2426
public interface RouteContextList {
2527

28+
PagingStore getAddressStore();
29+
2630
int getNumberOfNonDurableQueues();
2731

2832
int getNumberOfDurableQueues();

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.activemq.artemis.api.core.Message;
2424
import org.apache.activemq.artemis.api.core.RoutingType;
2525
import org.apache.activemq.artemis.api.core.SimpleString;
26+
import org.apache.activemq.artemis.core.paging.PagingStore;
2627
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
2728
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
2829
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -78,11 +79,11 @@ public interface RoutingContext {
7879

7980
Map<SimpleString, RouteContextList> getContexListing();
8081

81-
RouteContextList getContextListing(SimpleString address);
82+
RouteContextList getContextListing(SimpleString address, PagingStore addressStore);
8283

83-
List<Queue> getNonDurableQueues(SimpleString address);
84+
List<Queue> getNonDurableQueues(SimpleString address, PagingStore addressStore);
8485

85-
List<Queue> getDurableQueues(SimpleString address);
86+
List<Queue> getDurableQueues(SimpleString address, PagingStore addressStore);
8687

8788
int getQueueCount();
8889

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public void unproposed(SimpleString groupID) {
182182
public void route(final Message message, final RoutingContext context) {
183183
addRouteContextToMessage(message);
184184

185-
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
185+
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress(), storeAndForwardQueue.getPagingStore());
186186

187187
if (!durableQueuesOnContext.contains(storeAndForwardQueue)) {
188188
// There can be many remote bindings for the same node, we only want to add the message once to
@@ -195,7 +195,7 @@ public void route(final Message message, final RoutingContext context) {
195195
public void routeWithAck(Message message, RoutingContext context) {
196196
addRouteContextToMessage(message);
197197

198-
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
198+
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress(), storeAndForwardQueue.getPagingStore());
199199

200200
if (!durableQueuesOnContext.contains(storeAndForwardQueue)) {
201201
// There can be many remote bindings for the same node, we only want to add the message once to

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.apache.activemq.artemis.api.core.Message;
2828
import org.apache.activemq.artemis.api.core.SimpleString;
29+
import org.apache.activemq.artemis.core.paging.PagingStore;
2930
import org.apache.activemq.artemis.core.server.Queue;
3031
import org.apache.activemq.artemis.core.server.RouteContextList;
3132
import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -204,7 +205,7 @@ public RoutingContext setMirrorSource(MirrorController mirrorController) {
204205
@Override
205206
public void addQueue(final SimpleString address, final Queue queue) {
206207

207-
RouteContextList listing = getContextListing(address);
208+
RouteContextList listing = getContextListing(address, queue.getPagingStore());
208209

209210
if (queue.isDurable()) {
210211
listing.getDurableQueues().add(queue);
@@ -257,7 +258,7 @@ public MessageLoadBalancingType getLoadBalancingType() {
257258
@Override
258259
public void addQueueWithAck(SimpleString address, Queue queue) {
259260
addQueue(address, queue);
260-
RouteContextList listing = getContextListing(address);
261+
RouteContextList listing = getContextListing(address, queue.getPagingStore());
261262
listing.addAckedQueue(queue);
262263
}
263264

@@ -316,10 +317,10 @@ public RoutingType getPreviousRoutingType() {
316317
}
317318

318319
@Override
319-
public RouteContextList getContextListing(SimpleString address) {
320+
public RouteContextList getContextListing(SimpleString address, PagingStore addressStore) {
320321
RouteContextList listing = map.get(address);
321322
if (listing == null) {
322-
listing = new ContextListing();
323+
listing = new ContextListing(addressStore);
323324
map.put(address, listing);
324325
}
325326
return listing;
@@ -336,13 +337,13 @@ public void setTransaction(final Transaction tx) {
336337
}
337338

338339
@Override
339-
public List<Queue> getNonDurableQueues(SimpleString address) {
340-
return getContextListing(address).getNonDurableQueues();
340+
public List<Queue> getNonDurableQueues(SimpleString address, PagingStore addressStore) {
341+
return getContextListing(address, addressStore).getNonDurableQueues();
341342
}
342343

343344
@Override
344-
public List<Queue> getDurableQueues(SimpleString address) {
345-
return getContextListing(address).getDurableQueues();
345+
public List<Queue> getDurableQueues(SimpleString address, PagingStore addressStore) {
346+
return getContextListing(address, addressStore).getDurableQueues();
346347
}
347348

348349
@Override
@@ -368,6 +369,17 @@ public Map<SimpleString, RouteContextList> getContexListing() {
368369

369370
public static class ContextListing implements RouteContextList {
370371

372+
public ContextListing(PagingStore addressStore) {
373+
this.addressStore = addressStore;
374+
}
375+
376+
@Override
377+
public PagingStore getAddressStore() {
378+
return addressStore;
379+
}
380+
381+
private PagingStore addressStore;
382+
371383
private final List<Queue> durableQueue = new ArrayList<>(1);
372384

373385
private final List<Queue> nonDurableQueue = new ArrayList<>(1);

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.activemq.artemis.api.core.QueueConfiguration;
3636
import org.apache.activemq.artemis.api.core.RoutingType;
3737
import org.apache.activemq.artemis.api.core.SimpleString;
38+
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
3839
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
3940
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
4041
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
@@ -48,6 +49,7 @@
4849
import org.apache.activemq.artemis.core.server.Queue;
4950
import org.apache.activemq.artemis.core.server.ServerSession;
5051
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
52+
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
5153
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
5254
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
5355
import org.apache.activemq.artemis.utils.RandomUtil;
@@ -105,6 +107,54 @@ public void messageArrived(String topic, MqttMessage message) {
105107
assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
106108
}
107109

110+
@Test
111+
@Timeout(DEFAULT_TIMEOUT_SEC)
112+
public void testMaxMessagesSameAddress() throws Exception {
113+
testMaxMessages("a/b", "a/b", "a.#");
114+
}
115+
116+
@Test
117+
@Timeout(DEFAULT_TIMEOUT_SEC)
118+
public void testMaxMessagesDifferentAddresses() throws Exception {
119+
testMaxMessages("a/b", "a/#", "a.#");
120+
}
121+
122+
private void testMaxMessages(final String publisherTopic, final String subscriptionTopic, final String addressSettingsMatch) throws MqttException {
123+
final int MAX_SIZE_MESSAGES = 1;
124+
125+
// ensure too many messages will trigger a failure
126+
server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new AddressSettings().setMaxSizeMessages(MAX_SIZE_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL));
127+
128+
// ensure that the subscription should get the proper max size messages
129+
assertEquals(MAX_SIZE_MESSAGES, server.getAddressSettingsRepository().getMatch(MQTTUtil.getCoreAddressFromMqttTopic(subscriptionTopic, WildcardConfiguration.DEFAULT_WILDCARD_CONFIGURATION)).getMaxSizeMessages());
130+
131+
// create and disconnect subscriber and ensure it leaves behind a subscription queue on the address
132+
MqttClient subscriber = createPahoClient("subscriber");
133+
MqttConnectionOptions subscriberOptions = new MqttConnectionOptionsBuilder()
134+
.cleanStart(false)
135+
.sessionExpiryInterval(999L)
136+
.build();
137+
subscriber.connect(subscriberOptions);
138+
subscriber.subscribe(subscriptionTopic, AT_LEAST_ONCE);
139+
subscriber.disconnect();
140+
assertNotNull(getSubscriptionQueue(subscriptionTopic, "subscriber"));
141+
142+
// send messages and ensure the max-size-messages is enforced
143+
MqttClient producer = createPahoClient("producer");
144+
producer.connect();
145+
for (int i = 0; i < MAX_SIZE_MESSAGES; i++) {
146+
producer.publish(publisherTopic, RandomUtil.randomBytes(), 1, false);
147+
}
148+
try {
149+
producer.publish(publisherTopic, RandomUtil.randomBytes(), 1, false);
150+
fail("Should have failed to publish");
151+
} catch (MqttException e) {
152+
e.printStackTrace();
153+
// ignore
154+
}
155+
assertEquals(MAX_SIZE_MESSAGES, getSubscriptionQueue(subscriptionTopic, "subscriber").getMessageCount());
156+
}
157+
108158
@Test
109159
@Timeout(DEFAULT_TIMEOUT_SEC)
110160
public void testSimpleRetroSendReceive() throws Exception {

0 commit comments

Comments
 (0)