Skip to content

Commit 6e1d328

Browse files
ARTEMIS-6011 Wildcard routing / Address full or dropped is not properly propagated
1 parent 93eaead commit 6e1d328

15 files changed

Lines changed: 473 additions & 83 deletions

File tree

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.activemq.artemis.api.core.SimpleString;
2828
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
2929
import org.apache.activemq.artemis.core.io.IOCallback;
30+
import org.apache.activemq.artemis.core.paging.PagingStore;
3031
import org.apache.activemq.artemis.core.persistence.OperationContext;
3132
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
3233
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
@@ -787,12 +788,19 @@ public static void routeMirrorCommand(ActiveMQServer server, Message message, Tr
787788

788789
static class PagedRouteContext implements RouteContextList {
789790

791+
private final PagingStore addressStore;
790792
private final List<Queue> durableQueues;
791793
private final List<Queue> nonDurableQueues;
792794

795+
@Override
796+
public PagingStore getAddressStore() {
797+
return addressStore;
798+
}
799+
793800
PagedRouteContext(Queue snfQueue) {
794801
List<Queue> queues = new ArrayList<>(1);
795802
queues.add(snfQueue);
803+
this.addressStore = snfQueue.getPagingStore();
796804

797805
if (snfQueue.isDurable()) {
798806
durableQueues = queues;

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf
139139

140140
Page usePage(long page);
141141

142+
boolean checkFullPolicy(Message message) throws Exception;
143+
142144
/**
143145
* Use this method when you want to use the cache of used pages. If you are just using offline (e.g. print-data), use
144146
* the newPageObject method.

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java

Lines changed: 37 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,62 +1423,35 @@ public boolean page(Message message,
14231423
return page(message, tx, listCtx, null, false) >= 0;
14241424
}
14251425

1426-
@Override
1427-
public int page(Message message,
1428-
final Transaction tx,
1429-
RouteContextList listCtx,
1430-
Function<Message, Message> pageDecorator,
1431-
boolean useFlowControl) throws Exception {
1432-
1433-
if (!running) {
1434-
return -1;
1435-
}
1436-
1437-
boolean diskFull = pagingManager.isDiskFull();
1426+
public boolean checkFullPolicy(Message message) throws Exception {
14381427

1439-
if (diskFullMessagePolicy == DiskFullMessagePolicy.DROP || diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
1440-
if (diskFull) {
1428+
if (isFull() || pagingManager.isDiskFull()) {
1429+
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
14411430
if (message.isLargeMessage()) {
1442-
((LargeServerMessage) message).deleteFile();
1443-
}
1444-
1445-
if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
1446-
throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
1447-
}
1448-
1449-
// Dist is full, just drop the data
1450-
if (!printedDropMessagesWarning) {
1451-
printedDropMessagesWarning = true;
1452-
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo());
1431+
try {
1432+
((LargeServerMessage) message).deleteFile();
1433+
} catch (Exception e) {
1434+
// only thing to be done is log on this case
1435+
logger.debug("Error deleting large message file for {}", message, e);
1436+
}
14531437
}
1454-
1455-
return 0;
1456-
}
1457-
}
1458-
1459-
boolean full = isFull();
1460-
1461-
if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
1462-
if (full) {
1438+
throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
1439+
} else if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
14631440
if (message.isLargeMessage()) {
1464-
((LargeServerMessage) message).deleteFile();
1465-
}
1466-
1467-
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
1468-
throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
1441+
try {
1442+
((LargeServerMessage) message).deleteFile();
1443+
} catch (Exception e) {
1444+
// only thing to be done is log on this case
1445+
logger.debug("Error deleting large message file for {}", message, e);
1446+
}
14691447
}
1470-
1471-
// Address is full, we just pretend we are paging, and drop the data
1448+
// Dist is full, just drop the data
14721449
if (!printedDropMessagesWarning) {
14731450
printedDropMessagesWarning = true;
14741451
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo());
14751452
}
1476-
return 0;
1477-
} else {
1478-
return -1;
1453+
return false;
14791454
}
1480-
} else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
1481-
return -1;
14821455
}
14831456

14841457
if (pageFull) {
@@ -1494,10 +1467,25 @@ public int page(Message message,
14941467
printedDropMessagesWarning = true;
14951468
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo());
14961469
}
1470+
return false;
1471+
}
1472+
1473+
return true;
1474+
}
1475+
1476+
@Override
1477+
public int page(Message message,
1478+
final Transaction tx,
1479+
RouteContextList listCtx,
1480+
Function<Message, Message> pageDecorator,
1481+
boolean useFlowControl) throws Exception {
14971482

1498-
// we are in page mode, if we got to this point, we are dropping the message while still paging
1499-
// we return 0 as in the storage is in "page mode" however no credits are being taken.
1500-
return 0;
1483+
if (!running) {
1484+
return -1;
1485+
}
1486+
1487+
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
1488+
return -1;
15011489
}
15021490

15031491
return writePage(message, tx, listCtx, pageDecorator, useFlowControl);

artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1685,6 +1685,25 @@ public void processRoute(final Message message,
16851685
final SimpleString messageAddress = message.getAddressSimpleString();
16861686
final PagingStore owningStore = pagingManager.getPageStore(messageAddress);
16871687
message.setOwner(owningStore);
1688+
boolean dropMessages = false;
1689+
if (owningStore != null) {
1690+
if (!owningStore.checkFullPolicy(message)) {
1691+
dropMessages = true;
1692+
}
1693+
}
1694+
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
1695+
final PagingStore store = entry.getValue().getAddressStore();
1696+
if (store != null) {
1697+
if (!store.checkFullPolicy(message)) {
1698+
dropMessages = true;
1699+
}
1700+
}
1701+
}
1702+
1703+
if (dropMessages) {
1704+
return;
1705+
}
1706+
16881707
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
16891708
final PagingStore store;
16901709
if (entry.getKey().equals(messageAddress)) {

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818

1919
import java.util.List;
2020

21+
import org.apache.activemq.artemis.core.paging.PagingStore;
22+
2123
/**
2224
* This is a simple datatype containing the list of a routing context
2325
*/
2426
public interface RouteContextList {
2527

28+
PagingStore getAddressStore();
29+
2630
int getNumberOfNonDurableQueues();
2731

2832
int getNumberOfDurableQueues();

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.activemq.artemis.api.core.Message;
2424
import org.apache.activemq.artemis.api.core.RoutingType;
2525
import org.apache.activemq.artemis.api.core.SimpleString;
26+
import org.apache.activemq.artemis.core.paging.PagingStore;
2627
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
2728
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
2829
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -78,11 +79,11 @@ public interface RoutingContext {
7879

7980
Map<SimpleString, RouteContextList> getContexListing();
8081

81-
RouteContextList getContextListing(SimpleString address);
82+
RouteContextList getContextListing(SimpleString address, PagingStore addressStore);
8283

83-
List<Queue> getNonDurableQueues(SimpleString address);
84+
List<Queue> getNonDurableQueues(SimpleString address, PagingStore addressStore);
8485

85-
List<Queue> getDurableQueues(SimpleString address);
86+
List<Queue> getDurableQueues(SimpleString address, PagingStore addressStore);
8687

8788
int getQueueCount();
8889

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public void unproposed(SimpleString groupID) {
182182
public void route(final Message message, final RoutingContext context) {
183183
addRouteContextToMessage(message);
184184

185-
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
185+
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress(), storeAndForwardQueue.getPagingStore());
186186

187187
if (!durableQueuesOnContext.contains(storeAndForwardQueue)) {
188188
// There can be many remote bindings for the same node, we only want to add the message once to
@@ -195,7 +195,7 @@ public void route(final Message message, final RoutingContext context) {
195195
public void routeWithAck(Message message, RoutingContext context) {
196196
addRouteContextToMessage(message);
197197

198-
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
198+
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress(), storeAndForwardQueue.getPagingStore());
199199

200200
if (!durableQueuesOnContext.contains(storeAndForwardQueue)) {
201201
// There can be many remote bindings for the same node, we only want to add the message once to

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.apache.activemq.artemis.api.core.Message;
2828
import org.apache.activemq.artemis.api.core.SimpleString;
29+
import org.apache.activemq.artemis.core.paging.PagingStore;
2930
import org.apache.activemq.artemis.core.server.Queue;
3031
import org.apache.activemq.artemis.core.server.RouteContextList;
3132
import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -204,7 +205,7 @@ public RoutingContext setMirrorSource(MirrorController mirrorController) {
204205
@Override
205206
public void addQueue(final SimpleString address, final Queue queue) {
206207

207-
RouteContextList listing = getContextListing(address);
208+
RouteContextList listing = getContextListing(address, queue.getPagingStore());
208209

209210
if (queue.isDurable()) {
210211
listing.getDurableQueues().add(queue);
@@ -257,7 +258,7 @@ public MessageLoadBalancingType getLoadBalancingType() {
257258
@Override
258259
public void addQueueWithAck(SimpleString address, Queue queue) {
259260
addQueue(address, queue);
260-
RouteContextList listing = getContextListing(address);
261+
RouteContextList listing = getContextListing(address, queue.getPagingStore());
261262
listing.addAckedQueue(queue);
262263
}
263264

@@ -316,10 +317,10 @@ public RoutingType getPreviousRoutingType() {
316317
}
317318

318319
@Override
319-
public RouteContextList getContextListing(SimpleString address) {
320+
public RouteContextList getContextListing(SimpleString address, PagingStore addressStore) {
320321
RouteContextList listing = map.get(address);
321322
if (listing == null) {
322-
listing = new ContextListing();
323+
listing = new ContextListing(addressStore);
323324
map.put(address, listing);
324325
}
325326
return listing;
@@ -336,13 +337,13 @@ public void setTransaction(final Transaction tx) {
336337
}
337338

338339
@Override
339-
public List<Queue> getNonDurableQueues(SimpleString address) {
340-
return getContextListing(address).getNonDurableQueues();
340+
public List<Queue> getNonDurableQueues(SimpleString address, PagingStore addressStore) {
341+
return getContextListing(address, addressStore).getNonDurableQueues();
341342
}
342343

343344
@Override
344-
public List<Queue> getDurableQueues(SimpleString address) {
345-
return getContextListing(address).getDurableQueues();
345+
public List<Queue> getDurableQueues(SimpleString address, PagingStore addressStore) {
346+
return getContextListing(address, addressStore).getDurableQueues();
346347
}
347348

348349
@Override
@@ -368,6 +369,17 @@ public Map<SimpleString, RouteContextList> getContexListing() {
368369

369370
public static class ContextListing implements RouteContextList {
370371

372+
public ContextListing(PagingStore addressStore) {
373+
this.addressStore = addressStore;
374+
}
375+
376+
@Override
377+
public PagingStore getAddressStore() {
378+
return addressStore;
379+
}
380+
381+
private PagingStore addressStore;
382+
371383
private final List<Queue> durableQueue = new ArrayList<>(1);
372384

373385
private final List<Queue> nonDurableQueue = new ArrayList<>(1);

0 commit comments

Comments
 (0)