Skip to content

Commit b6a510f

Browse files
ARTEMIS-5573 and ARTEMIS-5975 Improve AMQP Size estimation
- making it immutable to avoid races after updates like we had in the past - avoiding Scanning after reloading by persisting extra data on storage
1 parent 066c8ce commit b6a510f

File tree

26 files changed

+1207
-234
lines changed

26 files changed

+1207
-234
lines changed

artemis-cli/src/main/java/org/apache/activemq/artemis/util/ServerUtil.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.InputStreamReader;
2525
import java.util.ArrayList;
2626
import java.util.List;
27+
import java.util.function.Consumer;
2728

2829
import org.apache.activemq.artemis.api.core.TransportConfiguration;
2930
import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -51,7 +52,11 @@ public static Process startServer(String artemisInstance, String serverName, int
5152
}
5253

5354
public static Process startServer(String artemisInstance, String serverName, int id, int timeout, File brokerProperties) throws Exception {
54-
final Process process = internalStartServer(artemisInstance, serverName, brokerProperties);
55+
return startServer(artemisInstance, serverName, id, timeout, brokerProperties, null);
56+
}
57+
58+
public static Process startServer(String artemisInstance, String serverName, int id, int timeout, File brokerProperties, Consumer<String> logCallback) throws Exception {
59+
final Process process = internalStartServer(artemisInstance, serverName, brokerProperties, logCallback);
5560

5661
// wait for start
5762
if (timeout > 0) {
@@ -66,7 +71,11 @@ public static Process startServer(String artemisInstance, String serverName, Str
6671
}
6772

6873
public static Process startServer(String artemisInstance, String serverName, String uri, int timeout, File propertiesFile) throws Exception {
69-
final Process process = internalStartServer(artemisInstance, serverName, propertiesFile);
74+
return startServer(artemisInstance, serverName, uri, timeout, propertiesFile, null);
75+
}
76+
77+
public static Process startServer(String artemisInstance, String serverName, String uri, int timeout, File propertiesFile, Consumer<String> logCallback) throws Exception {
78+
final Process process = internalStartServer(artemisInstance, serverName, propertiesFile, logCallback);
7079

7180
// wait for start
7281
if (timeout != 0) {
@@ -78,20 +87,30 @@ public static Process startServer(String artemisInstance, String serverName, Str
7887

7988
private static Process internalStartServer(String artemisInstance,
8089
String serverName) throws IOException, ClassNotFoundException {
81-
return internalStartServer(artemisInstance, serverName, null);
90+
return internalStartServer(artemisInstance, serverName, null, null);
8291
}
8392
private static Process internalStartServer(String artemisInstance,
8493
String serverName,
8594
File propertiesFile) throws IOException, ClassNotFoundException {
95+
return internalStartServer(artemisInstance, serverName, propertiesFile, null);
96+
}
97+
private static Process internalStartServer(String artemisInstance,
98+
String serverName,
99+
File propertiesFile,
100+
Consumer<String> logCallback) throws IOException, ClassNotFoundException {
86101

87102
if (propertiesFile != null) {
88-
return execute(artemisInstance, serverName, "run", "--properties", propertiesFile.getAbsolutePath());
103+
return execute(artemisInstance, serverName, logCallback, "run", "--properties", propertiesFile.getAbsolutePath());
89104
} else {
90-
return execute(artemisInstance, serverName, "run");
105+
return execute(artemisInstance, serverName, logCallback, "run");
91106
}
92107
}
93108

94109
public static Process execute(String artemisInstance, String jobName, String...args) throws IOException, ClassNotFoundException {
110+
return execute(artemisInstance, jobName, null, args);
111+
}
112+
113+
public static Process execute(String artemisInstance, String jobName, Consumer<String> logCallback, String...args) throws IOException, ClassNotFoundException {
95114
try {
96115
boolean IS_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win");
97116

@@ -117,11 +136,11 @@ public static Process execute(String artemisInstance, String jobName, String...a
117136
final Process process = builder.start();
118137
Runtime.getRuntime().addShutdownHook(new Thread(() -> process.destroy()));
119138

120-
ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), jobName, false);
139+
ProcessLogger outputLogger = new ProcessLogger(logCallback == null, process.getInputStream(), jobName, false, logCallback);
121140
outputLogger.start();
122141

123142
// Adding a reader to System.err, so the VM won't hang on a System.err.println
124-
ProcessLogger errorLogger = new ProcessLogger(true, process.getErrorStream(), jobName, true);
143+
ProcessLogger errorLogger = new ProcessLogger(logCallback == null, process.getErrorStream(), jobName, true, logCallback);
125144
errorLogger.start();
126145
return process;
127146
} catch (IOException e) {
@@ -215,14 +234,18 @@ static class ProcessLogger extends Thread {
215234

216235
private final boolean sendToErr;
217236

237+
private final Consumer<String> logCallback;
238+
218239
ProcessLogger(final boolean print,
219240
final InputStream is,
220241
final String logName,
221-
final boolean sendToErr) throws ClassNotFoundException {
242+
final boolean sendToErr,
243+
final Consumer<String> logCallback) throws ClassNotFoundException {
222244
this.is = is;
223245
this.print = print;
224246
this.logName = logName;
225247
this.sendToErr = sendToErr;
248+
this.logCallback = logCallback;
226249
setDaemon(false);
227250
}
228251

@@ -240,6 +263,9 @@ public void run() {
240263
System.out.println(logName + "-out:" + line);
241264
}
242265
}
266+
if (logCallback != null) {
267+
logCallback.accept((sendToErr ? logName + "-err:" : logName + "-out:") + line);
268+
}
243269
}
244270
} catch (IOException e) {
245271
// ok, stream closed

artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/PersisterIDs.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
*/
2424
public class PersisterIDs {
2525

26-
public static final int MAX_PERSISTERS = 5;
26+
public static final int MAX_PERSISTERS = 7;
2727

2828
public static final byte CoreLargeMessagePersister_ID = (byte)0;
2929

@@ -37,4 +37,8 @@ public class PersisterIDs {
3737

3838
public static final byte AMQPMessagePersisterV3_ID = (byte)5;
3939

40+
public static final byte AMQPMessagePersisterV4_ID = (byte)6;
41+
42+
public static final byte AMQPLargeMessagePersisterV2_ID = (byte)7;
43+
4044
}

artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -876,14 +876,6 @@ default CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws
876876

877877
int getMemoryEstimate();
878878

879-
/**
880-
* The first estimate that's been calculated without any updates.
881-
*/
882-
default int getOriginalEstimate() {
883-
// For Core Protocol we always use the same estimate
884-
return getMemoryEstimate();
885-
}
886-
887879
/**
888880
* This is the size of the message when persisted on disk which is used for metrics tracking Note that even if the
889881
* message itself is not persisted on disk (ie non-durable) this value is still used for metrics tracking If a normal

artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,18 @@ public void rebuildPageCounters() throws Exception {
109109
simpleManagementVoid("broker", "rebuildPageCounters");
110110
}
111111

112+
public long getAddressSize(String address) throws Exception {
113+
return simpleManagementLong(ResourceNames.ADDRESS + address, "getAddressSize");
114+
}
115+
116+
public long getMessageCountOnAddress(String address) throws Exception {
117+
return simpleManagementLong(ResourceNames.ADDRESS + address, "getMessageCount");
118+
}
119+
120+
public int removeMessagesOnQueue(String queue, String filter) throws Exception {
121+
return simpleManagementInt(ResourceNames.QUEUE + queue, "removeMessages", filter);
122+
}
123+
112124
/**
113125
* Simple helper for management returning a string.
114126
*/

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
4141
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
4242
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
43+
import org.apache.activemq.artemis.utils.DataConstants;
4344
import org.apache.activemq.artemis.utils.collections.TypedProperties;
4445
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
4546
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@@ -80,6 +81,8 @@ public Message getMessage() {
8081

8182
private boolean reencoded = false;
8283

84+
private int applicationPropertiesSize;
85+
8386
/**
8487
* AMQPLargeMessagePersister will save the buffer here.
8588
*/
@@ -264,7 +267,9 @@ protected void readSavedEncoding(ByteBuf buf) {
264267
applicationPropertiesPosition = buf.readInt();
265268
remainingBodyPosition = buf.readInt();
266269

270+
int applicationPropertiesInitialPosition = buf.readerIndex();
267271
applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject();
272+
this.applicationPropertiesSize = buf.readerIndex() - applicationPropertiesInitialPosition;
268273

269274
if (properties != null && properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) {
270275
if (!expirationReload) {
@@ -400,6 +405,12 @@ protected void parseLargeMessage(byte[] data, boolean initialHeader) {
400405
}
401406
}
402407

408+
@Override
409+
protected synchronized void resetMessageData() {
410+
super.resetMessageData();
411+
applicationPropertiesSize = 0;
412+
}
413+
403414
private void genericParseLargeMessage() {
404415
try {
405416
parsingBuffer.position(0);
@@ -412,6 +423,16 @@ private void genericParseLargeMessage() {
412423
}
413424
}
414425

426+
@Override
427+
protected ApplicationProperties readApplicationProperties(ReadableBuffer data, int position) {
428+
ApplicationProperties localAP = super.readApplicationProperties(data, position);
429+
if (localAP != null) {
430+
this.applicationPropertiesSize = data.position() - position;
431+
this.applicationPropertiesCount = localAP.getValue().size();
432+
}
433+
return localAP;
434+
}
435+
415436
protected void parseLargeMessage(ReadableBuffer data) {
416437
MessageDataScanningStatus status = getDataScanningStatus();
417438
if (status == MessageDataScanningStatus.NOT_SCANNED) {
@@ -596,16 +617,16 @@ public long getWholeMessageSize() {
596617
return largeBody.getBodySize();
597618
} catch (Exception e) {
598619
logger.warn(e.getMessage());
599-
return -1;
620+
return VALUE_NOT_PRESENT;
600621
}
601622
}
602623

603624

604625
@Override
605626
public synchronized int getMemoryEstimate() {
606-
if (memoryEstimate == -1) {
607-
memoryEstimate = memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0);
608-
originalEstimate = memoryEstimate;
627+
if (memoryEstimate == VALUE_NOT_PRESENT) {
628+
// This estimation was tested and validated through AMQPGlobalMaxTest on soak-tests
629+
memoryEstimate = MINIMUM_ESTIMATE + (extraProperties != null ? extraProperties.getEncodeSize() : 0) + applicationPropertiesSize * 2 + applicationPropertiesCount * DataConstants.SIZE_INT;
609630
}
610631
return memoryEstimate;
611632
}
@@ -637,7 +658,7 @@ public long getPersistentSize() {
637658

638659
@Override
639660
public Persister<Message> getPersister() {
640-
return AMQPLargeMessagePersister.getInstance();
661+
return AMQPLargeMessagePersisterV2.getInstance();
641662
}
642663

643664
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.artemis.protocol.amqp.broker;
18+
19+
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
20+
import org.apache.activemq.artemis.api.core.Message;
21+
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
22+
import org.apache.activemq.artemis.utils.DataConstants;
23+
24+
import static org.apache.activemq.artemis.core.persistence.PersisterIDs.AMQPLargeMessagePersisterV2_ID;
25+
26+
public class AMQPLargeMessagePersisterV2 extends AMQPLargeMessagePersister {
27+
28+
public static final byte ID = AMQPLargeMessagePersisterV2_ID;
29+
30+
public static AMQPLargeMessagePersisterV2 theInstance;
31+
32+
public static AMQPLargeMessagePersisterV2 getInstance() {
33+
if (theInstance == null) {
34+
theInstance = new AMQPLargeMessagePersisterV2();
35+
}
36+
return theInstance;
37+
}
38+
39+
@Override
40+
public byte getID() {
41+
return ID;
42+
}
43+
44+
public AMQPLargeMessagePersisterV2() {
45+
super();
46+
}
47+
48+
49+
protected static final int PERSISTER_SIZE = DataConstants.SIZE_INT + // meemory estimate
50+
DataConstants.SIZE_BYTE + // message priority
51+
DataConstants.SIZE_BOOLEAN; // durable
52+
53+
@Override
54+
public int getEncodeSize(Message record) {
55+
return super.getEncodeSize(record) +
56+
DataConstants.SIZE_INT + // size delimiter for future use to keep compatibility better
57+
PERSISTER_SIZE;
58+
}
59+
60+
/**
61+
* Sub classes must add the first short as the protocol-id
62+
*/
63+
@Override
64+
public void encode(ActiveMQBuffer buffer, Message record) {
65+
super.encode(buffer, record);
66+
67+
AMQPLargeMessage msgEncode = (AMQPLargeMessage) record;
68+
writeSizeDelimiter(buffer);
69+
buffer.writeInt(msgEncode.getMemoryEstimate());
70+
buffer.writeByte(msgEncode.getPriority());
71+
buffer.writeBoolean(msgEncode.isDurable());
72+
}
73+
74+
protected void writeSizeDelimiter(ActiveMQBuffer buffer) {
75+
buffer.writeInt(PERSISTER_SIZE); // how many bytes this persister is using
76+
}
77+
78+
@Override
79+
public Message decode(ActiveMQBuffer buffer, Message record, CoreMessageObjectPools pools) {
80+
AMQPLargeMessage message = (AMQPLargeMessage) super.decode(buffer, record, pools);
81+
82+
83+
int sizePersister = buffer.readInt();
84+
int lastPosition = buffer.readerIndex() + sizePersister;
85+
86+
{
87+
message.setMemoryEstimate(buffer.readInt());
88+
message.setPriority(buffer.readByte());
89+
message.directSetDurable(buffer.readBoolean());
90+
91+
assert buffer.readerIndex() <= lastPosition;
92+
}
93+
94+
// if a future version of this persister wrote more bytes than what we expected now, this will make sure we skip them.
95+
buffer.readerIndex(lastPosition);
96+
97+
return message;
98+
}
99+
100+
}

0 commit comments

Comments
 (0)