diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index b9e018c6e48..ea1ff1cc864 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; @@ -787,12 +788,19 @@ public static void routeMirrorCommand(ActiveMQServer server, Message message, Tr static class PagedRouteContext implements RouteContextList { + private final PagingStore addressStore; private final List durableQueues; private final List nonDurableQueues; + @Override + public PagingStore getAddressStore() { + return addressStore; + } + PagedRouteContext(Queue snfQueue) { List queues = new ArrayList<>(1); queues.add(snfQueue); + this.addressStore = snfQueue.getPagingStore(); if (snfQueue.isDurable()) { durableQueues = queues; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 55775c5b53f..898034f7a8c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -139,6 +139,8 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf Page usePage(long page); + boolean checkFullPolicy(Message message) throws Exception; + /** * Use this method when you want to use the cache of used pages. If you are just using offline (e.g. print-data), use * the newPageObject method. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 544b329e2e6..4b63e0abc28 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -1424,66 +1424,50 @@ public boolean page(Message message, } @Override - public int page(Message message, - final Transaction tx, - RouteContextList listCtx, - Function pageDecorator, - boolean useFlowControl) throws Exception { - - if (!running) { - return -1; - } + public boolean checkFullPolicy(Message message) throws Exception { boolean diskFull = pagingManager.isDiskFull(); - if (diskFullMessagePolicy == DiskFullMessagePolicy.DROP || diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) { - if (diskFull) { - if (message.isLargeMessage()) { - ((LargeServerMessage) message).deleteFile(); - } - - if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) { - throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString()); - } + if (diskFull && (diskFullMessagePolicy == DiskFullMessagePolicy.DROP || diskFullMessagePolicy == DiskFullMessagePolicy.FAIL)) { + if (message.isLargeMessage()) { + ((LargeServerMessage) message).deleteFile(); + } - // Dist is full, just drop the data - if (!printedDropMessagesWarning) { - printedDropMessagesWarning = true; - ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo()); - } + if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) { + throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString()); + } - return 0; + // Dist is full, just drop the data + if (!printedDropMessagesWarning) { + printedDropMessagesWarning = true; + ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo()); } - } - boolean full = isFull(); + return false; + } - if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { - if (full) { + if (isFull()) { + if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { if (message.isLargeMessage()) { - ((LargeServerMessage) message).deleteFile(); + removeLargeMessage(message); } - - if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { - throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString()); + throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString()); + } else if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) { + if (message.isLargeMessage()) { + removeLargeMessage(message); } - - // Address is full, we just pretend we are paging, and drop the data + // storage is full, just drop the data if (!printedDropMessagesWarning) { printedDropMessagesWarning = true; ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo()); } - return 0; - } else { - return -1; + return false; } - } else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { - return -1; } if (pageFull) { if (message.isLargeMessage()) { - ((LargeServerMessage) message).deleteFile(); + removeLargeMessage(message); } if (pageFullMessagePolicy == PageFullMessagePolicy.FAIL) { @@ -1494,20 +1478,36 @@ public int page(Message message, printedDropMessagesWarning = true; ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo()); } - - // we are in page mode, if we got to this point, we are dropping the message while still paging - // we return 0 as in the storage is in "page mode" however no credits are being taken. - return 0; + return false; } - return writePage(message, tx, listCtx, pageDecorator, useFlowControl); + return true; + } + + private static void removeLargeMessage(Message message) { + try { + ((LargeServerMessage) message).deleteFile(); + } catch (Exception e) { + // only thing to be done is log on this case + logger.debug("Error deleting large message file for {}", message, e); + } } - private int writePage(Message message, - Transaction tx, - RouteContextList listCtx, - Function pageDecorator, - boolean useFlowControl) throws Exception { + @Override + public int page(Message message, + final Transaction tx, + RouteContextList listCtx, + Function pageDecorator, + boolean useFlowControl) throws Exception { + + if (!running) { + return -1; + } + + if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { + return -1; + } + // We need to use a readLock as we need to keep paging until we scheduled a task // notice that to leave paging you need pending tasks done readLock(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 86275ac2adc..d263295a6ab 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1685,6 +1685,25 @@ public void processRoute(final Message message, final SimpleString messageAddress = message.getAddressSimpleString(); final PagingStore owningStore = pagingManager.getPageStore(messageAddress); message.setOwner(owningStore); + boolean dropMessages = false; + if (owningStore != null) { + if (!owningStore.checkFullPolicy(message)) { + dropMessages = true; + } + } + for (Map.Entry entry : context.getContexListing().entrySet()) { + final PagingStore store = entry.getValue().getAddressStore(); + if (store != null) { + if (!store.checkFullPolicy(message)) { + dropMessages = true; + } + } + } + + if (dropMessages) { + return; + } + for (Map.Entry entry : context.getContexListing().entrySet()) { final PagingStore store; if (entry.getKey().equals(messageAddress)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java index b4f9bd761fe..3d1c57cf14f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java @@ -18,11 +18,15 @@ import java.util.List; +import org.apache.activemq.artemis.core.paging.PagingStore; + /** * This is a simple datatype containing the list of a routing context */ public interface RouteContextList { + PagingStore getAddressStore(); + int getNumberOfNonDurableQueues(); int getNumberOfDurableQueues(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java index fe9c4f3f1fe..c784f841388 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.mirror.MirrorController; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -78,11 +79,11 @@ public interface RoutingContext { Map getContexListing(); - RouteContextList getContextListing(SimpleString address); + RouteContextList getContextListing(SimpleString address, PagingStore addressStore); - List getNonDurableQueues(SimpleString address); + List getNonDurableQueues(SimpleString address, PagingStore addressStore); - List getDurableQueues(SimpleString address); + List getDurableQueues(SimpleString address, PagingStore addressStore); int getQueueCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java index 6eda81f6cfb..a65866c61ee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java @@ -182,7 +182,7 @@ public void unproposed(SimpleString groupID) { public void route(final Message message, final RoutingContext context) { addRouteContextToMessage(message); - List durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress()); + List durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress(), storeAndForwardQueue.getPagingStore()); if (!durableQueuesOnContext.contains(storeAndForwardQueue)) { // There can be many remote bindings for the same node, we only want to add the message once to @@ -195,7 +195,7 @@ public void route(final Message message, final RoutingContext context) { public void routeWithAck(Message message, RoutingContext context) { addRouteContextToMessage(message); - List durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress()); + List durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress(), storeAndForwardQueue.getPagingStore()); if (!durableQueuesOnContext.contains(storeAndForwardQueue)) { // There can be many remote bindings for the same node, we only want to add the message once to diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java index 1f7a7a9dc6c..55d8318d681 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -204,7 +205,7 @@ public RoutingContext setMirrorSource(MirrorController mirrorController) { @Override public void addQueue(final SimpleString address, final Queue queue) { - RouteContextList listing = getContextListing(address); + RouteContextList listing = getContextListing(address, queue.getPagingStore()); if (queue.isDurable()) { listing.getDurableQueues().add(queue); @@ -257,7 +258,7 @@ public MessageLoadBalancingType getLoadBalancingType() { @Override public void addQueueWithAck(SimpleString address, Queue queue) { addQueue(address, queue); - RouteContextList listing = getContextListing(address); + RouteContextList listing = getContextListing(address, queue.getPagingStore()); listing.addAckedQueue(queue); } @@ -316,10 +317,10 @@ public RoutingType getPreviousRoutingType() { } @Override - public RouteContextList getContextListing(SimpleString address) { + public RouteContextList getContextListing(SimpleString address, PagingStore addressStore) { RouteContextList listing = map.get(address); if (listing == null) { - listing = new ContextListing(); + listing = new ContextListing(addressStore); map.put(address, listing); } return listing; @@ -336,13 +337,13 @@ public void setTransaction(final Transaction tx) { } @Override - public List getNonDurableQueues(SimpleString address) { - return getContextListing(address).getNonDurableQueues(); + public List getNonDurableQueues(SimpleString address, PagingStore addressStore) { + return getContextListing(address, addressStore).getNonDurableQueues(); } @Override - public List getDurableQueues(SimpleString address) { - return getContextListing(address).getDurableQueues(); + public List getDurableQueues(SimpleString address, PagingStore addressStore) { + return getContextListing(address, addressStore).getDurableQueues(); } @Override @@ -368,6 +369,17 @@ public Map getContexListing() { public static class ContextListing implements RouteContextList { + public ContextListing(PagingStore addressStore) { + this.addressStore = addressStore; + } + + @Override + public PagingStore getAddressStore() { + return addressStore; + } + + private PagingStore addressStore; + private final List durableQueue = new ArrayList<>(1); private final List nonDurableQueue = new ArrayList<>(1); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java index dcae8935a29..bc5b8b99ffa 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; @@ -48,6 +49,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.utils.RandomUtil; @@ -105,6 +107,54 @@ public void messageArrived(String topic, MqttMessage message) { assertTrue(latch.await(500, TimeUnit.MILLISECONDS)); } + @Test + @Timeout(DEFAULT_TIMEOUT_SEC) + public void testMaxMessagesSameAddress() throws Exception { + testMaxMessages("a/b", "a/b", "a.#"); + } + + @Test + @Timeout(DEFAULT_TIMEOUT_SEC) + public void testMaxMessagesDifferentAddresses() throws Exception { + testMaxMessages("a/b", "a/#", "a.#"); + } + + private void testMaxMessages(final String publisherTopic, final String subscriptionTopic, final String addressSettingsMatch) throws MqttException { + final int MAX_SIZE_MESSAGES = 1; + + // ensure too many messages will trigger a failure + server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new AddressSettings().setMaxSizeMessages(MAX_SIZE_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL)); + + // ensure that the subscription should get the proper max size messages + assertEquals(MAX_SIZE_MESSAGES, server.getAddressSettingsRepository().getMatch(MQTTUtil.getCoreAddressFromMqttTopic(subscriptionTopic, WildcardConfiguration.DEFAULT_WILDCARD_CONFIGURATION)).getMaxSizeMessages()); + + // create and disconnect subscriber and ensure it leaves behind a subscription queue on the address + MqttClient subscriber = createPahoClient("subscriber"); + MqttConnectionOptions subscriberOptions = new MqttConnectionOptionsBuilder() + .cleanStart(false) + .sessionExpiryInterval(999L) + .build(); + subscriber.connect(subscriberOptions); + subscriber.subscribe(subscriptionTopic, AT_LEAST_ONCE); + subscriber.disconnect(); + assertNotNull(getSubscriptionQueue(subscriptionTopic, "subscriber")); + + // send messages and ensure the max-size-messages is enforced + MqttClient producer = createPahoClient("producer"); + producer.connect(); + for (int i = 0; i < MAX_SIZE_MESSAGES; i++) { + producer.publish(publisherTopic, RandomUtil.randomBytes(), 1, false); + } + try { + producer.publish(publisherTopic, RandomUtil.randomBytes(), 1, false); + fail("Should have failed to publish"); + } catch (MqttException e) { + e.printStackTrace(); + // ignore + } + assertEquals(MAX_SIZE_MESSAGES, getSubscriptionQueue(subscriptionTopic, "subscriber").getMessageCount()); + } + @Test @Timeout(DEFAULT_TIMEOUT_SEC) public void testSimpleRetroSendReceive() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java new file mode 100644 index 00000000000..ca067549026 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.activemq.artemis.tests.integration.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class WildcardAddressFullTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + protected ActiveMQServer server; + protected ClientSession session; + protected ClientSessionFactory sf; + protected ServerLocator locator; + + final String addressToSend = "a.b.c.d.e.f.g"; + final String[] queueToReceive = new String[]{"a.b.c.d.e.f.*", "a.b.c.d.e.*.*", "a.b.c.d.*.*.*", "a.b.c.*.*.*.*", "a.b.*.*.*.*.*", "a.*.*.*.*.*.*"}; + final String addressSettingsMatch = "a.#"; + + @Override + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + server = createServer(true, createDefaultNettyConfig()); + server.start(); + locator = createInVMNonHALocator(); + sf = createSessionFactory(locator); + session = addClientSession(sf.createSession(false, true, true)); + } + + @Test + public void testFail() throws Exception { + + final int MAX_MESSAGES = 1; + server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new AddressSettings().setMaxSizeMessages(MAX_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL)); + + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + + session.createAddress(SimpleString.of(addressToSend), RoutingType.MULTICAST, false); + for (String q : queueToReceive) { + session.createQueue(QueueConfiguration.of(q).setRoutingType(RoutingType.MULTICAST)); + } + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createTopic(addressToSend)); + for (int i = 0; i < MAX_MESSAGES; i++) { + producer.send(session.createTextMessage("will send")); + } + try { + producer.send(session.createTextMessage("should fail")); + fail("should fail"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + try (Connection connection = factory.createConnection()) { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + for (String q : queueToReceive) { + MessageConsumer consumer = session.createConsumer(session.createQueue(q + "::" + q)); + for (int i = 0; i < MAX_MESSAGES; i++) { + assertNotNull(consumer.receive(5000)); + } + } + } + + PagingStore store = server.getPagingManager().getPageStore(SimpleString.of(addressToSend)); + assertEquals(0L, store.getAddressElements()); + assertEquals(0, store.getAddressSize()); + + for (String q : queueToReceive) { + store = server.getPagingManager().getPageStore(SimpleString.of(q)); + assertEquals(0L, store.getAddressElements()); + assertEquals(0, store.getAddressSize()); + } + } + + @Test + public void testPaging() throws Exception { + int producerSend = 10; + final int MAX_MESSAGES = 1; + server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new AddressSettings().setMaxSizeMessages(MAX_MESSAGES).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE)); + + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + + session.createAddress(SimpleString.of(addressToSend), RoutingType.MULTICAST, false); + for (String q : queueToReceive) { + session.createQueue(QueueConfiguration.of(q).setRoutingType(RoutingType.MULTICAST)); + } + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createTopic(addressToSend)); + for (int i = 0; i < producerSend; i++) { + producer.send(session.createTextMessage("will send")); + } + } + + try (Connection connection = factory.createConnection()) { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + for (String q : queueToReceive) { + MessageConsumer consumer = session.createConsumer(session.createQueue(q + "::" + q)); + for (int i = 0; i < producerSend; i++) { + assertNotNull(consumer.receive(5000)); + } + assertNull(consumer.receiveNoWait()); + } + } + + PagingStore store = server.getPagingManager().getPageStore(SimpleString.of(addressToSend)); + assertEquals(0L, store.getAddressElements()); + assertEquals(0, store.getAddressSize()); + + for (String q : queueToReceive) { + store = server.getPagingManager().getPageStore(SimpleString.of(q)); + assertEquals(0L, store.getAddressElements()); + assertEquals(0, store.getAddressSize()); + } + } + + @Test + public void testDrop() throws Exception { + int producerSend = 10; + final int MAX_MESSAGES = 1; + server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new AddressSettings().setMaxSizeMessages(MAX_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP)); + + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + + session.createAddress(SimpleString.of(addressToSend), RoutingType.MULTICAST, false); + for (String q : queueToReceive) { + session.createQueue(QueueConfiguration.of(q).setRoutingType(RoutingType.MULTICAST)); + } + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createTopic(addressToSend)); + for (int i = 0; i < producerSend; i++) { + producer.send(session.createTextMessage("will send")); + } + } + + try (Connection connection = factory.createConnection()) { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + for (String q : queueToReceive) { + MessageConsumer consumer = session.createConsumer(session.createQueue(q + "::" + q)); + for (int i = 0; i < MAX_MESSAGES; i++) { + assertNotNull(consumer.receive(5000)); + } + assertNull(consumer.receiveNoWait()); + } + } + + PagingStore store = server.getPagingManager().getPageStore(SimpleString.of(addressToSend)); + assertEquals(0L, store.getAddressElements()); + assertEquals(0, store.getAddressSize()); + + for (String q : queueToReceive) { + store = server.getPagingManager().getPageStore(SimpleString.of(q)); + assertEquals(0L, store.getAddressElements()); + assertEquals(0, store.getAddressSize()); + } + } + + @Test + public void testBlock() throws Exception { + String addressToSend = "a.b.c.d.e.f.g"; + String[] queueToReceive = new String[]{"a.b.c.d.e.f.*", "a.b.c.d.e.*.*", "a.b.c.d.*.*.*", "a.b.c.*.*.*.*", "a.b.*.*.*.*.*", "a.*.*.*.*.*.*"}; + String addressSettingsMatch = "a.#"; + + ExecutorService executorService = Executors.newFixedThreadPool(1 + queueToReceive.length); + runAfter(executorService::shutdownNow); + final int NUMBER_OF_MESSAGES = 100; + final int MAX_MESSAGES = 1; + server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new AddressSettings().setMaxSizeMessages(MAX_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK)); + + CountDownLatch doneSending = new CountDownLatch(1); + AtomicInteger errors = new AtomicInteger(0); + + executorService.execute(() -> { + + try { + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + + session.createAddress(SimpleString.of(addressToSend), RoutingType.MULTICAST, false); + for (String q : queueToReceive) { + session.createQueue(QueueConfiguration.of(q).setRoutingType(RoutingType.MULTICAST)); + } + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createTopic(addressToSend)); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + logger.info("Sending message {}", i); + producer.send(session.createTextMessage("a".repeat(10000))); + } + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + doneSending.countDown(); + } + }); + + assertFalse(doneSending.await(500, TimeUnit.MILLISECONDS)); + + CountDownLatch doneConsume = new CountDownLatch(queueToReceive.length); + + for (String q : queueToReceive) { + final String consumerQueue = q; + executorService.execute(() -> { + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(q + "::" + q)); + connection.start(); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + assertNotNull(consumer.receive(5000)); + logger.info("Consumed {} on queue {}", i, q); + } + assertNull(consumer.receiveNoWait()); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + doneConsume.countDown(); + } + }); + } + + assertTrue(doneSending.await(5, TimeUnit.SECONDS)); + assertTrue(doneConsume.await(5, TimeUnit.SECONDS)); + assertEquals(0, errors.get()); + + PagingStore store = server.getPagingManager().getPageStore(SimpleString.of(addressToSend)); + assertEquals(0L, store.getAddressElements()); + assertEquals(0, store.getAddressSize()); + + for (String q : queueToReceive) { + store = server.getPagingManager().getPageStore(SimpleString.of(q)); + assertEquals(0L, store.getAddressElements()); + assertEquals(0, store.getAddressSize()); + } + } + +} diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 29a8ff9671a..923b8a71adb 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -271,6 +271,11 @@ public void pageFull(PageSubscription subscription) { } + @Override + public boolean checkFullPolicy(Message message) throws Exception { + return true; + } + @Override public Page usePage(long page, boolean createEntry, boolean createFile) { return null; diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java index fda378e755a..b84e62bb806 100644 --- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java +++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java @@ -364,7 +364,7 @@ public void testConsumeLivePage() throws Exception { msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); + assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS, pageStore))); PagedReference readMessage = iterator.next(); @@ -398,7 +398,7 @@ public void testConsumeLivePage() throws Exception { msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); + assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS, pageStore))); } PagedReference readMessage = iterator.next(); @@ -429,7 +429,7 @@ public void testConsumeLivePage() throws Exception { msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); + assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS, pageStore))); } PagedReference readMessage = iterator.next(); @@ -512,7 +512,7 @@ public void testConsumeLivePageMultiThread() throws Exception { msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); + assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS, pageStore))); } if (tx != null) { @@ -728,7 +728,7 @@ private long addMessages(final int start, final int numMessages, final int messa msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); + assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS, pageStore))); } return pageStore.getNumberOfPages(); @@ -801,7 +801,7 @@ private Transaction pgMessages(StorageManager storage, Message msg = new CoreMessage(storage.generateID(), buffer.writerIndex()); msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); msg.putIntProperty("key", i); - pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS)); + pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS, pageStore)); } return txImpl; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java index cc8e11d66d4..3d29de49996 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java @@ -311,7 +311,7 @@ public void ioSync() throws Exception { pageStore.start(); pageStore.startPaging(); - routeContextList = new RoutingContextImpl.ContextListing(); + routeContextList = new RoutingContextImpl.ContextListing(Mockito.mock(PagingStoreImpl.class)); Queue mockQueue = Mockito.mock(Queue.class); Mockito.when(mockQueue.getID()).thenReturn(1L); routeContextList.addAckedQueue(mockQueue); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java index 384b37b5b61..506f363aa8e 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java @@ -78,11 +78,11 @@ public void testPagingManager() throws Exception { ICoreMessage msg = createMessage(1L, SimpleString.of("simple-test"), createRandomBuffer(10)); final RoutingContextImpl ctx = new RoutingContextImpl(null); - assertFalse(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()))); + assertFalse(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName(), store))); store.startPaging(); - assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()))); + assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName(), store))); syncOperationContext(); Page page = depageOnExecutor(store); @@ -102,7 +102,7 @@ public void testPagingManager() throws Exception { assertNull(depageOnExecutor(store)); final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()))); + assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName(), store))); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 78baaca17e4..0a8d4fce175 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -185,7 +185,7 @@ public void testStore() throws Exception { assertTrue(storeImpl.isPaging()); final RoutingContextImpl ctx = new RoutingContextImpl(null); - assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); + assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName(), storeImpl))); syncOperationContext(); assertEquals(1, storeImpl.getNumberOfPages()); @@ -227,7 +227,7 @@ public void testDepageOnCurrentPage() throws Exception { Message msg = createMessage(i, storeImpl, destination, buffer); final RoutingContextImpl ctx = new RoutingContextImpl(null); - assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); + assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName(), storeImpl))); syncOperationContext(); } @@ -295,7 +295,7 @@ public void testRemoveInTheMiddle() throws Exception { msg.putIntProperty("page", page); final RoutingContextImpl ctx = new RoutingContextImpl(null); ctx.addQueue(fakeQueue.getName(), fakeQueue); - assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); + assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName(), storeImpl))); syncOperationContext(); if (i > 0 && i % 10 == 0) { storeImpl.forceAnotherPage(true); @@ -413,7 +413,7 @@ public void testRemoveCurrentPage() throws Exception { msg.putIntProperty("page", page); final RoutingContextImpl ctx = new RoutingContextImpl(null); ctx.addQueue(fakeQueue.getName(), fakeQueue); - assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); + assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName(), storeImpl))); syncOperationContext(); if (i > 0 && i % 10 == 0) { storeImpl.forceAnotherPage(true); @@ -510,7 +510,7 @@ public void testReadNumberOfMessages() throws Exception { msg.putIntProperty("page", 1); final RoutingContextImpl ctx = new RoutingContextImpl(null); ctx.addQueue(fakeQueue.getName(), fakeQueue); - assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); + assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName(), storeImpl))); syncOperationContext(); } @@ -567,7 +567,7 @@ public void testDepageMultiplePages() throws Exception { Message msg = createMessage(i, store, destination, buffer); final RoutingContextImpl ctx = new RoutingContextImpl(null); - assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()))); + assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName(), store))); syncOperationContext(); } @@ -601,7 +601,7 @@ public void testDepageMultiplePages() throws Exception { Message msg = createMessage(1, store, destination, buffers.get(0)); final RoutingContextImpl ctx = new RoutingContextImpl(null); - assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()))); + assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName(), store))); syncOperationContext(); Page newPage = depageOnExecutor(store); @@ -622,14 +622,14 @@ public void testDepageMultiplePages() throws Exception { { final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()))); + assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName(), store))); } store.startPaging(); { final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - assertTrue(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()))); + assertTrue(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName(), store))); syncOperationContext(); } @@ -736,7 +736,7 @@ public void testOrderOnPaging() throws Throwable { msg.putLongProperty("count", i); final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName())); + store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName(), store)); } syncOperationContext(); } catch (Throwable e) { @@ -1165,7 +1165,7 @@ protected void writePageMessage(final PagingStore storeImpl, msg.putLongProperty("count", id); final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName())); + storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName(), storeImpl)); syncOperationContext(); }