Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
Expand Down Expand Up @@ -787,12 +788,19 @@ public static void routeMirrorCommand(ActiveMQServer server, Message message, Tr

static class PagedRouteContext implements RouteContextList {

private final PagingStore addressStore;
private final List<Queue> durableQueues;
private final List<Queue> nonDurableQueues;

@Override
public PagingStore getAddressStore() {
return addressStore;
}

PagedRouteContext(Queue snfQueue) {
List<Queue> queues = new ArrayList<>(1);
queues.add(snfQueue);
this.addressStore = snfQueue.getPagingStore();

if (snfQueue.isDurable()) {
durableQueues = queues;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf

Page usePage(long page);

boolean checkFullPolicy(Message message) throws Exception;

/**
* Use this method when you want to use the cache of used pages. If you are just using offline (e.g. print-data), use
* the newPageObject method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1424,66 +1424,50 @@ public boolean page(Message message,
}

@Override
public int page(Message message,
final Transaction tx,
RouteContextList listCtx,
Function<Message, Message> pageDecorator,
boolean useFlowControl) throws Exception {

if (!running) {
return -1;
}
public boolean checkFullPolicy(Message message) throws Exception {

boolean diskFull = pagingManager.isDiskFull();

if (diskFullMessagePolicy == DiskFullMessagePolicy.DROP || diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
if (diskFull) {
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
}

if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
}
if (diskFull && (diskFullMessagePolicy == DiskFullMessagePolicy.DROP || diskFullMessagePolicy == DiskFullMessagePolicy.FAIL)) {
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
}

// Dist is full, just drop the data
if (!printedDropMessagesWarning) {
printedDropMessagesWarning = true;
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo());
}
if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
}

return 0;
// Dist is full, just drop the data
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inherited typo:

Suggested change
// Dist is full, just drop the data
// Disk is full, just drop the data

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks.. I had fixed it at some point.. but then I reverted things..

will send a commit fixing it

if (!printedDropMessagesWarning) {
printedDropMessagesWarning = true;
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo());
}
}

boolean full = isFull();
return false;
}

if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
if (full) {
if (isFull()) {
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
removeLargeMessage(message);
}

if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
} else if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
if (message.isLargeMessage()) {
removeLargeMessage(message);
}

// Address is full, we just pretend we are paging, and drop the data
// storage is full, just drop the data
if (!printedDropMessagesWarning) {
printedDropMessagesWarning = true;
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo());
}
return 0;
} else {
return -1;
return false;
}
} else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
return -1;
}

if (pageFull) {
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
removeLargeMessage(message);
}

if (pageFullMessagePolicy == PageFullMessagePolicy.FAIL) {
Expand All @@ -1494,20 +1478,36 @@ public int page(Message message,
printedDropMessagesWarning = true;
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo());
}

// we are in page mode, if we got to this point, we are dropping the message while still paging
// we return 0 as in the storage is in "page mode" however no credits are being taken.
return 0;
return false;
}

return writePage(message, tx, listCtx, pageDecorator, useFlowControl);
return true;
}

private static void removeLargeMessage(Message message) {
try {
((LargeServerMessage) message).deleteFile();
} catch (Exception e) {
// only thing to be done is log on this case
logger.debug("Error deleting large message file for {}", message, e);
Comment thread
brusdev marked this conversation as resolved.
}
}

private int writePage(Message message,
Transaction tx,
RouteContextList listCtx,
Function<Message, Message> pageDecorator,
boolean useFlowControl) throws Exception {
@Override
public int page(Message message,
final Transaction tx,
RouteContextList listCtx,
Function<Message, Message> pageDecorator,
boolean useFlowControl) throws Exception {

if (!running) {
return -1;
}

if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
return -1;
}

// We need to use a readLock as we need to keep paging until we scheduled a task
// notice that to leave paging you need pending tasks done
readLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,25 @@ public void processRoute(final Message message,
final SimpleString messageAddress = message.getAddressSimpleString();
final PagingStore owningStore = pagingManager.getPageStore(messageAddress);
message.setOwner(owningStore);
boolean dropMessages = false;
if (owningStore != null) {
if (!owningStore.checkFullPolicy(message)) {
dropMessages = true;
}
}
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
final PagingStore store = entry.getValue().getAddressStore();
if (store != null) {
if (!store.checkFullPolicy(message)) {
dropMessages = true;
}
}
}

if (dropMessages) {
return;
}

for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
final PagingStore store;
if (entry.getKey().equals(messageAddress)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

import java.util.List;

import org.apache.activemq.artemis.core.paging.PagingStore;

/**
* This is a simple datatype containing the list of a routing context
*/
public interface RouteContextList {

PagingStore getAddressStore();

int getNumberOfNonDurableQueues();

int getNumberOfDurableQueues();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
Expand Down Expand Up @@ -78,11 +79,11 @@ public interface RoutingContext {

Map<SimpleString, RouteContextList> getContexListing();

RouteContextList getContextListing(SimpleString address);
RouteContextList getContextListing(SimpleString address, PagingStore addressStore);

List<Queue> getNonDurableQueues(SimpleString address);
List<Queue> getNonDurableQueues(SimpleString address, PagingStore addressStore);

List<Queue> getDurableQueues(SimpleString address);
List<Queue> getDurableQueues(SimpleString address, PagingStore addressStore);

int getQueueCount();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void unproposed(SimpleString groupID) {
public void route(final Message message, final RoutingContext context) {
addRouteContextToMessage(message);

List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress(), storeAndForwardQueue.getPagingStore());

if (!durableQueuesOnContext.contains(storeAndForwardQueue)) {
// There can be many remote bindings for the same node, we only want to add the message once to
Expand All @@ -195,7 +195,7 @@ public void route(final Message message, final RoutingContext context) {
public void routeWithAck(Message message, RoutingContext context) {
addRouteContextToMessage(message);

List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress(), storeAndForwardQueue.getPagingStore());

if (!durableQueuesOnContext.contains(storeAndForwardQueue)) {
// There can be many remote bindings for the same node, we only want to add the message once to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
Expand Down Expand Up @@ -204,7 +205,7 @@ public RoutingContext setMirrorSource(MirrorController mirrorController) {
@Override
public void addQueue(final SimpleString address, final Queue queue) {

RouteContextList listing = getContextListing(address);
RouteContextList listing = getContextListing(address, queue.getPagingStore());

if (queue.isDurable()) {
listing.getDurableQueues().add(queue);
Expand Down Expand Up @@ -257,7 +258,7 @@ public MessageLoadBalancingType getLoadBalancingType() {
@Override
public void addQueueWithAck(SimpleString address, Queue queue) {
addQueue(address, queue);
RouteContextList listing = getContextListing(address);
RouteContextList listing = getContextListing(address, queue.getPagingStore());
listing.addAckedQueue(queue);
}

Expand Down Expand Up @@ -316,10 +317,10 @@ public RoutingType getPreviousRoutingType() {
}

@Override
public RouteContextList getContextListing(SimpleString address) {
public RouteContextList getContextListing(SimpleString address, PagingStore addressStore) {
RouteContextList listing = map.get(address);
if (listing == null) {
listing = new ContextListing();
listing = new ContextListing(addressStore);
map.put(address, listing);
}
return listing;
Expand All @@ -336,13 +337,13 @@ public void setTransaction(final Transaction tx) {
}

@Override
public List<Queue> getNonDurableQueues(SimpleString address) {
return getContextListing(address).getNonDurableQueues();
public List<Queue> getNonDurableQueues(SimpleString address, PagingStore addressStore) {
return getContextListing(address, addressStore).getNonDurableQueues();
}

@Override
public List<Queue> getDurableQueues(SimpleString address) {
return getContextListing(address).getDurableQueues();
public List<Queue> getDurableQueues(SimpleString address, PagingStore addressStore) {
return getContextListing(address, addressStore).getDurableQueues();
}

@Override
Expand All @@ -368,6 +369,17 @@ public Map<SimpleString, RouteContextList> getContexListing() {

public static class ContextListing implements RouteContextList {

public ContextListing(PagingStore addressStore) {
this.addressStore = addressStore;
}

@Override
public PagingStore getAddressStore() {
return addressStore;
}

private PagingStore addressStore;

private final List<Queue> durableQueue = new ArrayList<>(1);

private final List<Queue> nonDurableQueue = new ArrayList<>(1);
Expand Down
Loading