Skip to content

Commit 03b7176

Browse files
ARTEMIS-5573 and ARTEMIS-5975 Improve AMQP Size estimation and make it static
1 parent 90e3db6 commit 03b7176

File tree

13 files changed

+618
-159
lines changed

13 files changed

+618
-159
lines changed

artemis-cli/src/main/java/org/apache/activemq/artemis/util/ServerUtil.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.InputStreamReader;
2525
import java.util.ArrayList;
2626
import java.util.List;
27+
import java.util.function.Consumer;
2728

2829
import org.apache.activemq.artemis.api.core.TransportConfiguration;
2930
import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -51,7 +52,11 @@ public static Process startServer(String artemisInstance, String serverName, int
5152
}
5253

5354
public static Process startServer(String artemisInstance, String serverName, int id, int timeout, File brokerProperties) throws Exception {
54-
final Process process = internalStartServer(artemisInstance, serverName, brokerProperties);
55+
return startServer(artemisInstance, serverName, id, timeout, brokerProperties, null);
56+
}
57+
58+
public static Process startServer(String artemisInstance, String serverName, int id, int timeout, File brokerProperties, Consumer<String> logCallback) throws Exception {
59+
final Process process = internalStartServer(artemisInstance, serverName, brokerProperties, logCallback);
5560

5661
// wait for start
5762
if (timeout > 0) {
@@ -66,7 +71,11 @@ public static Process startServer(String artemisInstance, String serverName, Str
6671
}
6772

6873
public static Process startServer(String artemisInstance, String serverName, String uri, int timeout, File propertiesFile) throws Exception {
69-
final Process process = internalStartServer(artemisInstance, serverName, propertiesFile);
74+
return startServer(artemisInstance, serverName, uri, timeout, propertiesFile, null);
75+
}
76+
77+
public static Process startServer(String artemisInstance, String serverName, String uri, int timeout, File propertiesFile, Consumer<String> logCallback) throws Exception {
78+
final Process process = internalStartServer(artemisInstance, serverName, propertiesFile, logCallback);
7079

7180
// wait for start
7281
if (timeout != 0) {
@@ -78,20 +87,30 @@ public static Process startServer(String artemisInstance, String serverName, Str
7887

7988
private static Process internalStartServer(String artemisInstance,
8089
String serverName) throws IOException, ClassNotFoundException {
81-
return internalStartServer(artemisInstance, serverName, null);
90+
return internalStartServer(artemisInstance, serverName, null, null);
8291
}
8392
private static Process internalStartServer(String artemisInstance,
8493
String serverName,
8594
File propertiesFile) throws IOException, ClassNotFoundException {
95+
return internalStartServer(artemisInstance, serverName, propertiesFile, null);
96+
}
97+
private static Process internalStartServer(String artemisInstance,
98+
String serverName,
99+
File propertiesFile,
100+
Consumer<String> logCallback) throws IOException, ClassNotFoundException {
86101

87102
if (propertiesFile != null) {
88-
return execute(artemisInstance, serverName, "run", "--properties", propertiesFile.getAbsolutePath());
103+
return execute(artemisInstance, serverName, logCallback, "run", "--properties", propertiesFile.getAbsolutePath());
89104
} else {
90-
return execute(artemisInstance, serverName, "run");
105+
return execute(artemisInstance, serverName, logCallback, "run");
91106
}
92107
}
93108

94109
public static Process execute(String artemisInstance, String jobName, String...args) throws IOException, ClassNotFoundException {
110+
return execute(artemisInstance, jobName, null, args);
111+
}
112+
113+
public static Process execute(String artemisInstance, String jobName, Consumer<String> logCallback, String...args) throws IOException, ClassNotFoundException {
95114
try {
96115
boolean IS_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win");
97116

@@ -117,11 +136,11 @@ public static Process execute(String artemisInstance, String jobName, String...a
117136
final Process process = builder.start();
118137
Runtime.getRuntime().addShutdownHook(new Thread(() -> process.destroy()));
119138

120-
ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), jobName, false);
139+
ProcessLogger outputLogger = new ProcessLogger(logCallback == null, process.getInputStream(), jobName, false, logCallback);
121140
outputLogger.start();
122141

123142
// Adding a reader to System.err, so the VM won't hang on a System.err.println
124-
ProcessLogger errorLogger = new ProcessLogger(true, process.getErrorStream(), jobName, true);
143+
ProcessLogger errorLogger = new ProcessLogger(logCallback == null, process.getErrorStream(), jobName, true, logCallback);
125144
errorLogger.start();
126145
return process;
127146
} catch (IOException e) {
@@ -215,14 +234,18 @@ static class ProcessLogger extends Thread {
215234

216235
private final boolean sendToErr;
217236

237+
private final Consumer<String> logCallback;
238+
218239
ProcessLogger(final boolean print,
219240
final InputStream is,
220241
final String logName,
221-
final boolean sendToErr) throws ClassNotFoundException {
242+
final boolean sendToErr,
243+
final Consumer<String> logCallback) throws ClassNotFoundException {
222244
this.is = is;
223245
this.print = print;
224246
this.logName = logName;
225247
this.sendToErr = sendToErr;
248+
this.logCallback = logCallback;
226249
setDaemon(false);
227250
}
228251

@@ -240,6 +263,9 @@ public void run() {
240263
System.out.println(logName + "-out:" + line);
241264
}
242265
}
266+
if (logCallback != null) {
267+
logCallback.accept((sendToErr ? logName + "-err:" : logName + "-out:") + line);
268+
}
243269
}
244270
} catch (IOException e) {
245271
// ok, stream closed

artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@ public void rebuildPageCounters() throws Exception {
109109
simpleManagementVoid("broker", "rebuildPageCounters");
110110
}
111111

112+
public long getAddressSize(String address) throws Exception {
113+
return simpleManagementLong(ResourceNames.ADDRESS + address, "getAddressSize");
114+
}
115+
116+
public long getMessageCountOnAddress(String address) throws Exception {
117+
return simpleManagementLong(ResourceNames.ADDRESS + address, "getMessageCount");
118+
}
119+
112120
/**
113121
* Simple helper for management returning a string.
114122
*/

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ public Message getMessage() {
8080

8181
private boolean reencoded = false;
8282

83+
private int applicationPropertiesSize;
84+
8385
/**
8486
* AMQPLargeMessagePersister will save the buffer here.
8587
*/
@@ -264,7 +266,9 @@ protected void readSavedEncoding(ByteBuf buf) {
264266
applicationPropertiesPosition = buf.readInt();
265267
remainingBodyPosition = buf.readInt();
266268

269+
int applicationPropertiesInitialPosition = buf.readerIndex();
267270
applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject();
271+
this.applicationPropertiesSize = buf.readerIndex() - applicationPropertiesInitialPosition;
268272

269273
if (properties != null && properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) {
270274
if (!expirationReload) {
@@ -412,6 +416,16 @@ private void genericParseLargeMessage() {
412416
}
413417
}
414418

419+
@Override
420+
protected ApplicationProperties readApplicationProperties(ReadableBuffer data, int position) {
421+
applicationProperties = super.readApplicationProperties(data, position);
422+
if (applicationProperties != null) {
423+
this.applicationPropertiesSize = data.position() - position;
424+
}
425+
return applicationProperties;
426+
}
427+
428+
415429
protected void parseLargeMessage(ReadableBuffer data) {
416430
MessageDataScanningStatus status = getDataScanningStatus();
417431
if (status == MessageDataScanningStatus.NOT_SCANNED) {
@@ -604,8 +618,7 @@ public long getWholeMessageSize() {
604618
@Override
605619
public synchronized int getMemoryEstimate() {
606620
if (memoryEstimate == -1) {
607-
memoryEstimate = memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0);
608-
originalEstimate = memoryEstimate;
621+
memoryEstimate = AMQP_OFFSET + (extraProperties != null ? extraProperties.getEncodeSize() : 0) + applicationPropertiesSize * 4;
609622
}
610623
return memoryEstimate;
611624
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java

Lines changed: 25 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.activemq.artemis.api.core.SimpleString;
4545
import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
4646
import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
47-
import org.apache.activemq.artemis.core.paging.PagingStore;
4847
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
4948
import org.apache.activemq.artemis.core.persistence.Persister;
5049
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -119,6 +118,12 @@
119118
*/
120119
public abstract class AMQPMessage extends RefCountMessage implements org.apache.activemq.artemis.api.core.Message {
121120

121+
// The basic (minimal) size an AMQP message uses.
122+
// this is an estimate, and it's based on the following test:
123+
// by running AmMQPGlobalMaxTest::testSendUntilOME, you look at the initial memory used by the broker without any messages.
124+
// by the time you get the OME, you can do some bare calculations on how much each message uses and get an AVG
125+
public static final int AMQP_OFFSET = 1300;
126+
122127
private static final SimpleString ANNOTATION_AREA_PREFIX = SimpleString.of("m.");
123128

124129
protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -146,7 +151,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
146151
* developing purposes.
147152
*/
148153
public enum MessageDataScanningStatus {
149-
NOT_SCANNED(0), RELOAD_PERSISTENCE(1), SCANNED(2);
154+
NOT_SCANNED(0), SCANNED(1);
150155

151156
private static final MessageDataScanningStatus[] STATES;
152157

@@ -205,7 +210,6 @@ private static void checkCode(int code) {
205210
protected long messageID;
206211
protected SimpleString address;
207212
protected volatile int memoryEstimate = -1;
208-
protected volatile int originalEstimate = -1;
209213
protected long expiration;
210214
protected boolean expirationReload = false;
211215
protected long scheduledTime = -1;
@@ -546,36 +550,27 @@ protected ApplicationProperties lazyDecodeApplicationProperties() {
546550
// need to synchronize access to lazyDecodeApplicationProperties to avoid clashes with getMemoryEstimate
547551
protected synchronized ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
548552
if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
549-
applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
550-
if (owner != null && memoryEstimate != -1) {
551-
// the memory has already been tracked and needs to be updated to reflect the new decoding
552-
int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data);
553-
554-
// it is difficult to track the updates for paged messages
555-
// for that reason we won't do it if paged
556-
// we also only do the update if the message was previously routed
557-
// so if a debug method or an interceptor changed the size before routing we would get a different size
558-
if (!isPaged && routed) {
559-
((PagingStore) owner).addSize(addition, false);
560-
final int updatedEstimate = memoryEstimate + addition;
561-
memoryEstimate = updatedEstimate;
562-
}
563-
}
553+
readApplicationProperties(data, applicationPropertiesPosition);
564554
}
565555

566556
return applicationProperties;
567557
}
568558

559+
protected ApplicationProperties readApplicationProperties(ReadableBuffer data, int position) {
560+
applicationProperties = scanForMessageSection(data, position, ApplicationProperties.class);
561+
return applicationProperties;
562+
}
563+
569564
protected int unmarshalledApplicationPropertiesMemoryEstimateFromData(ReadableBuffer data) {
570-
if (applicationProperties != null) {
571-
// they have been unmarshalled, estimate memory usage based on their encoded size
572-
if (remainingBodyPosition != VALUE_NOT_PRESENT) {
573-
return remainingBodyPosition - applicationPropertiesPosition;
574-
} else {
575-
return data.capacity() - applicationPropertiesPosition;
576-
}
565+
// no need to rescan if it's from RELOAD_PERSISTENCE
566+
ensureScanning();
567+
568+
// they have been unmarshalled, estimate memory usage based on their encoded size
569+
if (remainingBodyPosition != VALUE_NOT_PRESENT) {
570+
return remainingBodyPosition - applicationPropertiesPosition;
571+
} else {
572+
return data.capacity() - applicationPropertiesPosition;
577573
}
578-
return 0;
579574
}
580575

581576
@SuppressWarnings("unchecked")
@@ -661,9 +656,6 @@ protected synchronized void ensureMessageDataScanned() {
661656
case NOT_SCANNED:
662657
scanMessageData();
663658
break;
664-
case RELOAD_PERSISTENCE:
665-
lazyScanAfterReloadPersistence();
666-
break;
667659
case SCANNED:
668660
// NO-OP
669661
break;
@@ -686,7 +678,6 @@ protected synchronized void resetMessageData() {
686678
priority = DEFAULT_MESSAGE_PRIORITY;
687679
encodedHeaderSize = 0;
688680
memoryEstimate = -1;
689-
originalEstimate = -1;
690681
scheduledTime = -1;
691682
encodedDeliveryAnnotationsSize = 0;
692683
headerPosition = VALUE_NOT_PRESENT;
@@ -885,12 +876,8 @@ public final void receiveBuffer(ByteBuf buffer) {
885876

886877
@Override
887878
public int getOriginalEstimate() {
888-
if (originalEstimate < 0) {
889-
// getMemoryEstimate should initialize originalEstimate
890-
return getMemoryEstimate();
891-
} else {
892-
return originalEstimate;
893-
}
879+
// getMemoryEstimate should initialize originalEstimate
880+
return getMemoryEstimate();
894881
}
895882

896883
@Override
@@ -1033,13 +1020,9 @@ protected int internalPersistSize() {
10331020
public abstract void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);
10341021

10351022
protected synchronized void lazyScanAfterReloadPersistence() {
1036-
assert messageDataScanned == MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
10371023
scanMessageData();
10381024
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
10391025
modified = false;
1040-
// reinitialise memory estimate as message will already be on a queue
1041-
// and lazy decode will want to update
1042-
getMemoryEstimate();
10431026
}
10441027

10451028
@Override
@@ -1223,9 +1206,8 @@ public boolean isDurable() {
12231206
if (header != null && header .getDurable() != null) {
12241207
return header.getDurable();
12251208
} else {
1226-
// if header == null and scanningStatus=RELOAD_PERSISTENCE, it means the message can only be durable
1227-
// even though the parsing hasn't happened yet
1228-
return getDataScanningStatus() == MessageDataScanningStatus.RELOAD_PERSISTENCE;
1209+
// we will assume it's non persistent if no header
1210+
return false;
12291211
}
12301212
}
12311213

0 commit comments

Comments
 (0)