diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 4b63e0abc28..50b930b5fc0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -1425,71 +1425,38 @@ public boolean page(Message message, @Override public boolean checkFullPolicy(Message message) throws Exception { - - boolean diskFull = pagingManager.isDiskFull(); - - if (diskFull && (diskFullMessagePolicy == DiskFullMessagePolicy.DROP || diskFullMessagePolicy == DiskFullMessagePolicy.FAIL)) { - if (message.isLargeMessage()) { - ((LargeServerMessage) message).deleteFile(); - } - - if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) { - throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString()); - } - - // Dist is full, just drop the data - if (!printedDropMessagesWarning) { - printedDropMessagesWarning = true; - ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo()); - } - + if (pagingManager.isDiskFull() && (diskFullMessagePolicy == DiskFullMessagePolicy.DROP || diskFullMessagePolicy == DiskFullMessagePolicy.FAIL)) { + handleDrop(message, diskFullMessagePolicy == DiskFullMessagePolicy.FAIL); return false; } - if (isFull()) { - if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { - if (message.isLargeMessage()) { - removeLargeMessage(message); - } - throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString()); - } else if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) { - if (message.isLargeMessage()) { - removeLargeMessage(message); - } - // storage is full, just drop the data - if (!printedDropMessagesWarning) { - printedDropMessagesWarning = true; - ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo()); - } - return false; - } + if (isFull() && (addressFullMessagePolicy == AddressFullMessagePolicy.DROP || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL)) { + handleDrop(message, addressFullMessagePolicy == AddressFullMessagePolicy.FAIL); + return false; } if (pageFull) { - if (message.isLargeMessage()) { - removeLargeMessage(message); - } - - if (pageFullMessagePolicy == PageFullMessagePolicy.FAIL) { - throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString()); - } - - if (!printedDropMessagesWarning) { - printedDropMessagesWarning = true; - ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo()); - } + handleDrop(message, pageFullMessagePolicy == PageFullMessagePolicy.FAIL); return false; } return true; } - private static void removeLargeMessage(Message message) { - try { + // message will be dropped, we may throw an Exception if fail + private void handleDrop(Message message, boolean fail) throws Exception { + if (message.isLargeMessage()) { ((LargeServerMessage) message).deleteFile(); - } catch (Exception e) { - // only thing to be done is log on this case - logger.debug("Error deleting large message file for {}", message, e); + } + + if (fail) { + throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString()); + } + + // System is full, just drop the message + if (!printedDropMessagesWarning) { + printedDropMessagesWarning = true; + ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo()); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index d263295a6ab..bb2d9a398e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1685,18 +1685,15 @@ public void processRoute(final Message message, final SimpleString messageAddress = message.getAddressSimpleString(); final PagingStore owningStore = pagingManager.getPageStore(messageAddress); message.setOwner(owningStore); + boolean dropMessages = false; - if (owningStore != null) { - if (!owningStore.checkFullPolicy(message)) { - dropMessages = true; - } + if (owningStore != null && !owningStore.checkFullPolicy(message)) { + dropMessages = true; } for (Map.Entry entry : context.getContexListing().entrySet()) { final PagingStore store = entry.getValue().getAddressStore(); - if (store != null) { - if (!store.checkFullPolicy(message)) { - dropMessages = true; - } + if (store != null && store != owningStore && !store.checkFullPolicy(message)) { + dropMessages = true; } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java index ca067549026..1aaa3dc7c45 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java @@ -262,7 +262,6 @@ public void testBlock() throws Exception { CountDownLatch doneConsume = new CountDownLatch(queueToReceive.length); for (String q : queueToReceive) { - final String consumerQueue = q; executorService.execute(() -> { ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); try (Connection connection = factory.createConnection()) {