Skip to content

Commit 6606896

Browse files
ARTEMIS-5573 and ARTEMIS-5975 Improve AMQP Size estimation
Also making it immutable to avoid races after updates like we had in the past
1 parent 5647ebb commit 6606896

File tree

16 files changed

+622
-192
lines changed

16 files changed

+622
-192
lines changed

artemis-cli/src/main/java/org/apache/activemq/artemis/util/ServerUtil.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.InputStreamReader;
2525
import java.util.ArrayList;
2626
import java.util.List;
27+
import java.util.function.Consumer;
2728

2829
import org.apache.activemq.artemis.api.core.TransportConfiguration;
2930
import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -51,7 +52,11 @@ public static Process startServer(String artemisInstance, String serverName, int
5152
}
5253

5354
public static Process startServer(String artemisInstance, String serverName, int id, int timeout, File brokerProperties) throws Exception {
54-
final Process process = internalStartServer(artemisInstance, serverName, brokerProperties);
55+
return startServer(artemisInstance, serverName, id, timeout, brokerProperties, null);
56+
}
57+
58+
public static Process startServer(String artemisInstance, String serverName, int id, int timeout, File brokerProperties, Consumer<String> logCallback) throws Exception {
59+
final Process process = internalStartServer(artemisInstance, serverName, brokerProperties, logCallback);
5560

5661
// wait for start
5762
if (timeout > 0) {
@@ -66,7 +71,11 @@ public static Process startServer(String artemisInstance, String serverName, Str
6671
}
6772

6873
public static Process startServer(String artemisInstance, String serverName, String uri, int timeout, File propertiesFile) throws Exception {
69-
final Process process = internalStartServer(artemisInstance, serverName, propertiesFile);
74+
return startServer(artemisInstance, serverName, uri, timeout, propertiesFile, null);
75+
}
76+
77+
public static Process startServer(String artemisInstance, String serverName, String uri, int timeout, File propertiesFile, Consumer<String> logCallback) throws Exception {
78+
final Process process = internalStartServer(artemisInstance, serverName, propertiesFile, logCallback);
7079

7180
// wait for start
7281
if (timeout != 0) {
@@ -78,20 +87,30 @@ public static Process startServer(String artemisInstance, String serverName, Str
7887

7988
private static Process internalStartServer(String artemisInstance,
8089
String serverName) throws IOException, ClassNotFoundException {
81-
return internalStartServer(artemisInstance, serverName, null);
90+
return internalStartServer(artemisInstance, serverName, null, null);
8291
}
8392
private static Process internalStartServer(String artemisInstance,
8493
String serverName,
8594
File propertiesFile) throws IOException, ClassNotFoundException {
95+
return internalStartServer(artemisInstance, serverName, propertiesFile, null);
96+
}
97+
private static Process internalStartServer(String artemisInstance,
98+
String serverName,
99+
File propertiesFile,
100+
Consumer<String> logCallback) throws IOException, ClassNotFoundException {
86101

87102
if (propertiesFile != null) {
88-
return execute(artemisInstance, serverName, "run", "--properties", propertiesFile.getAbsolutePath());
103+
return execute(artemisInstance, serverName, logCallback, "run", "--properties", propertiesFile.getAbsolutePath());
89104
} else {
90-
return execute(artemisInstance, serverName, "run");
105+
return execute(artemisInstance, serverName, logCallback, "run");
91106
}
92107
}
93108

94109
public static Process execute(String artemisInstance, String jobName, String...args) throws IOException, ClassNotFoundException {
110+
return execute(artemisInstance, jobName, null, args);
111+
}
112+
113+
public static Process execute(String artemisInstance, String jobName, Consumer<String> logCallback, String...args) throws IOException, ClassNotFoundException {
95114
try {
96115
boolean IS_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win");
97116

@@ -117,11 +136,11 @@ public static Process execute(String artemisInstance, String jobName, String...a
117136
final Process process = builder.start();
118137
Runtime.getRuntime().addShutdownHook(new Thread(() -> process.destroy()));
119138

120-
ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), jobName, false);
139+
ProcessLogger outputLogger = new ProcessLogger(logCallback == null, process.getInputStream(), jobName, false, logCallback);
121140
outputLogger.start();
122141

123142
// Adding a reader to System.err, so the VM won't hang on a System.err.println
124-
ProcessLogger errorLogger = new ProcessLogger(true, process.getErrorStream(), jobName, true);
143+
ProcessLogger errorLogger = new ProcessLogger(logCallback == null, process.getErrorStream(), jobName, true, logCallback);
125144
errorLogger.start();
126145
return process;
127146
} catch (IOException e) {
@@ -215,14 +234,18 @@ static class ProcessLogger extends Thread {
215234

216235
private final boolean sendToErr;
217236

237+
private final Consumer<String> logCallback;
238+
218239
ProcessLogger(final boolean print,
219240
final InputStream is,
220241
final String logName,
221-
final boolean sendToErr) throws ClassNotFoundException {
242+
final boolean sendToErr,
243+
final Consumer<String> logCallback) throws ClassNotFoundException {
222244
this.is = is;
223245
this.print = print;
224246
this.logName = logName;
225247
this.sendToErr = sendToErr;
248+
this.logCallback = logCallback;
226249
setDaemon(false);
227250
}
228251

@@ -240,6 +263,9 @@ public void run() {
240263
System.out.println(logName + "-out:" + line);
241264
}
242265
}
266+
if (logCallback != null) {
267+
logCallback.accept((sendToErr ? logName + "-err:" : logName + "-out:") + line);
268+
}
243269
}
244270
} catch (IOException e) {
245271
// ok, stream closed

artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -876,14 +876,6 @@ default CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws
876876

877877
int getMemoryEstimate();
878878

879-
/**
880-
* The first estimate that's been calculated without any updates.
881-
*/
882-
default int getOriginalEstimate() {
883-
// For Core Protocol we always use the same estimate
884-
return getMemoryEstimate();
885-
}
886-
887879
/**
888880
* This is the size of the message when persisted on disk which is used for metrics tracking Note that even if the
889881
* message itself is not persisted on disk (ie non-durable) this value is still used for metrics tracking If a normal

artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,18 @@ public void rebuildPageCounters() throws Exception {
109109
simpleManagementVoid("broker", "rebuildPageCounters");
110110
}
111111

112+
public long getAddressSize(String address) throws Exception {
113+
return simpleManagementLong(ResourceNames.ADDRESS + address, "getAddressSize");
114+
}
115+
116+
public long getMessageCountOnAddress(String address) throws Exception {
117+
return simpleManagementLong(ResourceNames.ADDRESS + address, "getMessageCount");
118+
}
119+
120+
public int removeMessagesOnQueue(String queue, String filter) throws Exception {
121+
return simpleManagementInt(ResourceNames.QUEUE + queue, "removeMessages", filter);
122+
}
123+
112124
/**
113125
* Simple helper for management returning a string.
114126
*/

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
4141
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
4242
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
43+
import org.apache.activemq.artemis.utils.DataConstants;
4344
import org.apache.activemq.artemis.utils.collections.TypedProperties;
4445
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
4546
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@@ -80,6 +81,8 @@ public Message getMessage() {
8081

8182
private boolean reencoded = false;
8283

84+
private int applicationPropertiesSize;
85+
8386
/**
8487
* AMQPLargeMessagePersister will save the buffer here.
8588
*/
@@ -264,7 +267,9 @@ protected void readSavedEncoding(ByteBuf buf) {
264267
applicationPropertiesPosition = buf.readInt();
265268
remainingBodyPosition = buf.readInt();
266269

270+
int applicationPropertiesInitialPosition = buf.readerIndex();
267271
applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject();
272+
this.applicationPropertiesSize = buf.readerIndex() - applicationPropertiesInitialPosition;
268273

269274
if (properties != null && properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) {
270275
if (!expirationReload) {
@@ -400,6 +405,12 @@ protected void parseLargeMessage(byte[] data, boolean initialHeader) {
400405
}
401406
}
402407

408+
@Override
409+
protected synchronized void resetMessageData() {
410+
super.resetMessageData();
411+
applicationPropertiesSize = 0;
412+
}
413+
403414
private void genericParseLargeMessage() {
404415
try {
405416
parsingBuffer.position(0);
@@ -412,6 +423,17 @@ private void genericParseLargeMessage() {
412423
}
413424
}
414425

426+
@Override
427+
protected ApplicationProperties readApplicationProperties(ReadableBuffer data, int position) {
428+
ApplicationProperties localAP = super.readApplicationProperties(data, position);
429+
if (localAP != null) {
430+
this.applicationPropertiesSize = data.position() - position;
431+
this.applicationPropertiesCount = localAP.getValue().size();
432+
}
433+
return localAP;
434+
}
435+
436+
415437
protected void parseLargeMessage(ReadableBuffer data) {
416438
MessageDataScanningStatus status = getDataScanningStatus();
417439
if (status == MessageDataScanningStatus.NOT_SCANNED) {
@@ -596,16 +618,16 @@ public long getWholeMessageSize() {
596618
return largeBody.getBodySize();
597619
} catch (Exception e) {
598620
logger.warn(e.getMessage());
599-
return -1;
621+
return VALUE_NOT_PRESENT;
600622
}
601623
}
602624

603625

604626
@Override
605627
public synchronized int getMemoryEstimate() {
606-
if (memoryEstimate == -1) {
607-
memoryEstimate = memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0);
608-
originalEstimate = memoryEstimate;
628+
if (memoryEstimate == VALUE_NOT_PRESENT) {
629+
// This estimation was tested and validated through AMQPGlobalMaxTest on soak-tests
630+
memoryEstimate = MINIMUM_ESTIMATE + (extraProperties != null ? extraProperties.getEncodeSize() : 0) + applicationPropertiesSize * 2 + applicationPropertiesCount * DataConstants.SIZE_INT;
609631
}
610632
return memoryEstimate;
611633
}

0 commit comments

Comments
 (0)