Skip to content

Commit a718bc5

Browse files
AntonRoskvistclebertsuconic
authored andcommitted
ARTEMIS-5530 Some handling of compressed messages can throw NegativeArraySizeException
co-author: In collaboration with Clebert Suconic
1 parent 7195400 commit a718bc5

File tree

11 files changed

+187
-19
lines changed

11 files changed

+187
-19
lines changed

artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
*/
3030
public interface ICoreMessage extends Message {
3131

32+
@Override
33+
ICoreMessage copy();
3234
/**
3335
* The buffer will belong to this message, until release is called.
3436
*/

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -643,9 +643,6 @@ private void handleCompressedMessageSentAsRegular(final ClientMessageInternal cl
643643
largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
644644
// this is refeeding the packet after decompressed, hence the flow control must be 0
645645
currentLargeMessageController.addPacket(body, 0, false);
646-
largeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
647-
//make sure the message is decompressed before it is handled
648-
largeMessage.checkCompletion();
649646
currentLargeMessageController = null;
650647

651648
handleRegularMessage(largeMessage);
@@ -685,7 +682,6 @@ public synchronized void handleLargeMessage(final ClientLargeMessageInternal cli
685682

686683
if (clientLargeMessage.isCompressed()) {
687684
clientLargeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
688-
clientLargeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
689685
} else {
690686
clientLargeMessage.setLargeMessageController(currentLargeMessageController);
691687
}

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,14 @@ private void checkBuffer() throws ActiveMQException {
149149
writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this);
150150

151151
largeMessageController.saveBuffer(new ActiveMQOutputStream(writableBuffer));
152+
153+
unsetCompressionPropertyIfNeeded();
154+
}
155+
}
156+
157+
private void unsetCompressionPropertyIfNeeded() {
158+
if (largeMessageController instanceof CompressedLargeMessageControllerImpl) {
159+
putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
152160
}
153161
}
154162

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.apache.activemq.artemis.api.core.ActiveMQException;
2626
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
27+
import org.apache.activemq.artemis.api.core.ICoreMessage;
2728
import org.apache.activemq.artemis.api.core.Message;
2829
import org.apache.activemq.artemis.api.core.SimpleString;
2930
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
@@ -396,7 +397,7 @@ public int readInto(final ByteBuffer bufferRead) {
396397
}
397398

398399
@Override
399-
public Message copy() {
400+
public ICoreMessage copy() {
400401
return new ClientMessageImpl(this);
401402
}
402403

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ private void largeMessageSendBuffered(final boolean sendBlocking,
396396
}
397397

398398
private void largeMessageSendStreamed(final boolean sendBlocking,
399-
final ICoreMessage msgI,
399+
ICoreMessage msgI,
400400
final InputStream inputStreamParameter,
401401
final ClientProducerCredits credits,
402402
SendAcknowledgementHandler handler) throws ActiveMQException {
@@ -411,6 +411,8 @@ private void largeMessageSendStreamed(final boolean sendBlocking,
411411
DeflaterReader deflaterReader = null;
412412

413413
if (session.isCompressLargeMessages()) {
414+
// We need to change properties the compressed message as we send it
415+
msgI = msgI.copy();
414416
msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
415417
deflaterReader = new DeflaterReader(inputStreamParameter, messageSize);
416418
deflaterReader.setLevel(session.getCompressionLevel());

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,10 @@ protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
486486
}
487487
if (other.buffer != null) {
488488
this.buffer = other.buffer.copy();
489+
if (this.buffer.capacity() == 0) {
490+
// we are copying an empty buffer probably, we need to set the proper capacity
491+
this.buffer.capacity(other.buffer.capacity());
492+
}
489493
}
490494
}
491495
}
@@ -515,7 +519,7 @@ public void moveHeadersAndProperties(final Message msg) {
515519
}
516520

517521
@Override
518-
public Message copy() {
522+
public ICoreMessage copy() {
519523
getProperties();
520524
checkEncode();
521525
return new CoreMessage(this);

artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ public void setStorageManager(StorageManager storageManager) {
324324
}
325325

326326
@Override
327-
public Message copy() {
327+
public ICoreMessage copy() {
328328
SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable);
329329
LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID);
330330
newMessage.setParentRef(this);

artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.netty.buffer.Unpooled;
2020
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
21+
import org.apache.activemq.artemis.api.core.ICoreMessage;
2122
import org.apache.activemq.artemis.api.core.Message;
2223
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
2324
import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -135,7 +136,7 @@ public String toString() {
135136
}
136137

137138
@Override
138-
public Message copy() {
139+
public ICoreMessage copy() {
139140
// This is a simple copy, used only to avoid changing original properties
140141
return new NullStorageLargeServerMessage(this);
141142
}

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java

Lines changed: 94 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -569,8 +569,68 @@ public void testPreviouslyCompressedMessageCleanup() throws Exception {
569569
locator2.close();
570570
}
571571

572+
@TestTemplate
573+
public void testCompressedMessageRouting() throws Exception {
574+
SimpleString DATA = SimpleString.of(RandomUtil.randomAlphaNumericString(110 * 1024));
575+
576+
ActiveMQServer server = createServer(true, isNetty());
577+
server.start();
578+
579+
server.createQueue(QueueConfiguration.of(ADDRESS).setRoutingType(RoutingType.ANYCAST));
580+
581+
locator.setAckBatchSize(0);
582+
583+
try (ServerLocator locator2 = createFactory(isNetty()); ServerLocator locator3 = createFactory(isNetty())) {
584+
locator2.setMinLargeMessageSize(10240);
585+
//Any sufficiently large value here causes a "java.lang.NegativeArraySizeException"
586+
locator3.setMinLargeMessageSize(1024000);
587+
588+
ClientSessionFactory sf = locator.createSessionFactory();
589+
ClientSession session = sf.createSession(true, true);
590+
ClientProducer producer = session.createProducer(ADDRESS);
591+
ClientConsumer consumer = session.createConsumer(ADDRESS);
592+
593+
ClientMessage message = session.createMessage(true);
594+
message.getBodyBuffer().writeNullableSimpleString(DATA);
595+
producer.send(message);
596+
597+
session.start();
598+
message = consumer.receive(2000);
599+
assertNotNull(message);
600+
assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
601+
message.checkCompletion();
602+
message.acknowledge();
603+
604+
ClientSessionFactory sf2 = locator2.createSessionFactory();
605+
ClientSessionFactory sf3 = locator3.createSessionFactory();
606+
ClientSession session2 = sf2.createSession(true, true);
607+
ClientSession session3 = sf3.createSession(true, true);
608+
ClientProducer producer2 = session2.createProducer(ADDRESS);
609+
ClientProducer producer3 = session3.createProducer(ADDRESS);
610+
ClientMessage receivedMessage;
611+
612+
//Notice the _AMQ_LARGE_SIZE value changing part way through
613+
for (int i = 0; i < 3; i++) {
614+
producer.send(message);
615+
producer2.send(message);
616+
producer3.send(message);
617+
}
618+
619+
for (int i = 0; i < 9; i++) {
620+
receivedMessage = consumer.receive(2000);
621+
assertNotNull(receivedMessage);
622+
assertEquals(DATA, receivedMessage.getBodyBuffer().readNullableSimpleString());
623+
receivedMessage.acknowledge();
624+
}
625+
626+
consumer.close();
627+
}
628+
629+
}
630+
572631
@TestTemplate
573632
public void testLargeMessageCompressionLevel() throws Exception {
633+
SimpleString DATA = SimpleString.of(RandomUtil.randomAlphaNumericString(1024 * 1024));
574634

575635
SimpleString address1 = SimpleString.of("address1");
576636
SimpleString address2 = SimpleString.of("address2");
@@ -602,16 +662,16 @@ public void testLargeMessageCompressionLevel() throws Exception {
602662
session2.createQueue(QueueConfiguration.of(address2));
603663
session3.createQueue(QueueConfiguration.of(address3));
604664

605-
String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
606-
for (int i = 0; i < 20; i++) {
607-
inputString = inputString + inputString;
608-
}
665+
ClientMessage message1 = session1.createMessage(true);
666+
ClientMessage message2 = session2.createMessage(true);
667+
ClientMessage message3 = session3.createMessage(true);
668+
message1.getBodyBuffer().writeNullableSimpleString(DATA);
669+
message2.getBodyBuffer().writeNullableSimpleString(DATA);
670+
message3.getBodyBuffer().writeNullableSimpleString(DATA);
609671

610-
ClientMessage message = session1.createMessage(true);
611-
message.getBodyBuffer().writeString(inputString);
612-
producer1.send(message);
613-
producer2.send(message);
614-
producer3.send(message);
672+
producer1.send(message1);
673+
producer2.send(message2);
674+
producer3.send(message3);
615675

616676
QueueControl queueControl1 = (QueueControl)server.getManagementService().
617677
getResource(ResourceNames.QUEUE + address1);
@@ -623,10 +683,34 @@ public void testLargeMessageCompressionLevel() throws Exception {
623683
assertEquals(1, queueControl1.getMessageCount());
624684
assertEquals(1, queueControl2.getMessageCount());
625685
assertEquals(1, queueControl3.getMessageCount());
626-
assertTrue(message.getPersistentSize() > queueControl1.getPersistentSize());
686+
687+
assertTrue(message1.getPersistentSize() > queueControl1.getPersistentSize());
627688
assertTrue(queueControl1.getPersistentSize() > queueControl2.getPersistentSize());
628689
assertTrue(queueControl2.getPersistentSize() > queueControl3.getPersistentSize());
629690

691+
ClientConsumer consumer1 = session1.createConsumer(address1);
692+
ClientConsumer consumer2 = session2.createConsumer(address2);
693+
ClientConsumer consumer3 = session3.createConsumer(address3);
694+
session1.start();
695+
session2.start();
696+
session3.start();
697+
698+
ClientMessage message;
699+
message = consumer1.receive(2000);
700+
assertNotNull(message);
701+
assertEquals(DATA, message.getBodyBuffer().readNullableSimpleString());
702+
message.acknowledge();
703+
704+
message = consumer2.receive(2000);
705+
assertNotNull(message);
706+
assertEquals(DATA, message.getBodyBuffer().readNullableSimpleString());
707+
message.acknowledge();
708+
709+
message = consumer3.receive(2000);
710+
assertNotNull(message);
711+
assertEquals(DATA, message.getBodyBuffer().readNullableSimpleString());
712+
message.acknowledge();
713+
630714
sf1.close();
631715
sf2.close();
632716
sf3.close();

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import javax.jms.MessageProducer;
2525
import javax.jms.Queue;
2626
import javax.jms.Session;
27+
import javax.jms.TextMessage;
2728
import javax.jms.Topic;
2829

2930
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -37,9 +38,11 @@
3738
import org.apache.activemq.artemis.core.server.transformer.Transformer;
3839
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
3940
import org.apache.activemq.artemis.tests.util.Wait;
41+
import org.apache.activemq.artemis.utils.RandomUtil;
4042
import org.junit.jupiter.api.BeforeEach;
4143
import org.junit.jupiter.api.Test;
4244

45+
import static org.junit.jupiter.api.Assertions.assertEquals;
4346
import static org.junit.jupiter.api.Assertions.assertNotNull;
4447
import static org.junit.jupiter.api.Assertions.assertNull;
4548
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -647,6 +650,65 @@ public void testFederatedAddressChainOfBrokers() throws Exception {
647650
}
648651
}
649652

653+
@Test
654+
public void testUpstreamFederatedAddressWithCompressedMessage() throws Exception {
655+
final String DATA_REGULAR = RandomUtil.randomAlphaNumericString(10);
656+
final String DATA_COMPRESSED_REGULAR = "Compresses easily".repeat(500 * 1024);
657+
final String DATA_COMPRESSED_LARGE = RandomUtil.randomAlphaNumericString(1024 * 1024);
658+
final String address = getName();
659+
660+
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
661+
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
662+
getServer(0).getFederationManager().deploy();
663+
664+
FederationConfiguration federationConfiguration2 = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server0", address);
665+
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2);
666+
getServer(1).getFederationManager().deploy();
667+
668+
ActiveMQConnectionFactory cf0 = new ActiveMQConnectionFactory("vm://" + 0);
669+
ActiveMQConnectionFactory cf1 = new ActiveMQConnectionFactory("vm://" + 1);
670+
cf0.setCompressLargeMessage(true);
671+
cf1.setCompressLargeMessage(true);
672+
673+
try (Connection connection1 = cf1.createConnection();
674+
Connection connection0 = cf0.createConnection()) {
675+
connection1.start();
676+
connection0.start();
677+
678+
Session session0 = connection0.createSession();
679+
Session session1 = connection1.createSession();
680+
Topic topic0 = session0.createTopic(address);
681+
Topic topic1 = session1.createTopic(address);
682+
MessageConsumer consumer0 = session0.createConsumer(topic0);
683+
MessageConsumer consumer1 = session1.createConsumer(topic1);
684+
685+
MessageProducer producer = session0.createProducer(topic0);
686+
687+
sendAndConsume(producer, session0, DATA_REGULAR, "regular", consumer0, consumer1);
688+
sendAndConsume(producer, session0, DATA_COMPRESSED_LARGE, "compressedLarge", consumer0, consumer1);
689+
sendAndConsume(producer, session0, DATA_COMPRESSED_REGULAR, "compressedRegular", consumer0, consumer1);
690+
}
691+
}
692+
693+
private void sendAndConsume(MessageProducer producer,
694+
Session session0,
695+
String data,
696+
String identification,
697+
MessageConsumer consumer0,
698+
MessageConsumer consumer1) throws Exception {
699+
{
700+
TextMessage message = session0.createTextMessage(data);
701+
message.setStringProperty("identification", identification);
702+
producer.send(message);
703+
}
704+
TextMessage message0 = (TextMessage) consumer0.receive(1000);
705+
TextMessage message1 = (TextMessage) consumer1.receive(1000);
706+
assertNotNull(message0);
707+
assertNotNull(message1);
708+
assertEquals(data, message0.getText());
709+
assertEquals(data, message1.getText());
710+
}
711+
650712
private Message createTextMessage(Session session1, String group) throws JMSException {
651713
Message message = session1.createTextMessage("hello");
652714
message.setStringProperty("JMSXGroupID", group);

0 commit comments

Comments
 (0)