diff --git a/pom.xml b/pom.xml index b338f28a28..ad96a47367 100644 --- a/pom.xml +++ b/pom.xml @@ -784,6 +784,7 @@ src/main/java/com/rabbitmq/client/ConnectionFactory.java src/main/java/com/rabbitmq/client/PemReader.java + src/main/java/com/rabbitmq/client/WriteListener.java src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java src/main/java/com/rabbitmq/client/impl/Environment.java src/main/java/com/rabbitmq/client/observation/**/*.java @@ -798,6 +799,8 @@ src/test/java/com/rabbitmq/client/test/TestUtils.java src/test/java/com/rabbitmq/client/test/RpcTopologyRecordingTest.java src/test/java/com/rabbitmq/client/test/server/Permissions.java + src/test/java/com/rabbitmq/client/test/PublishWithByteBufferTest.java + src/test/java/com/rabbitmq/client/test/ByteBufferPublishTest.java ${google-java-format.version} diff --git a/src/main/java/com/rabbitmq/client/Channel.java b/src/main/java/com/rabbitmq/client/Channel.java index f9544e7580..1c319bf47b 100644 --- a/src/main/java/com/rabbitmq/client/Channel.java +++ b/src/main/java/com/rabbitmq/client/Channel.java @@ -19,6 +19,7 @@ import com.rabbitmq.client.AMQP.*; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; @@ -299,6 +300,48 @@ void basicPublish(String exchange, String routingKey, boolean mandatory, BasicPr void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException; + /** + * Publish a message with a {@link ByteBuffer} body and a {@link WriteListener} for + * write completion notification. + * + *

The buffer's content between its current position and its limit will be sent. + * + *

The listener is called when the network layer has finished writing the body to the + * socket. This is useful for off-heap buffers that must be released after the write completes. + * + *

Using this method will only benefit workloads using off-heap {@link ByteBuffer}s and the Netty + * IO layer. Other workloads can stick to the basicPublish variants that use an array of bytes + * for message payloads. + * + *

Threading: the listener may be called on the Netty event loop thread. + * It must not perform blocking operations. + * + *

This API is experimental and is susceptible to change at any time. + * + * @param exchange the exchange to publish the message to + * @param routingKey the routing key + * @param mandatory true if the 'mandatory' flag is to be set + * @param immediate true if the 'immediate' flag is to be + * set. Note that the RabbitMQ server does not support this flag. + * @param props other properties for the message - routing headers etc + * @param body the message body + * @param listener called when the write completes, may be {@code null} + * @throws java.io.IOException if an error is encountered + * @since 5.30.0 + */ + default void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, ByteBuffer body, WriteListener listener) + throws IOException { + byte[] bytes = null; + if (body != null) { + bytes = new byte[body.remaining()]; + body.get(bytes); + } + basicPublish(exchange, routingKey, mandatory, immediate, props, bytes); + if (listener != null) { + listener.done(true, null); + } + } + /** * Actively declare a non-autodelete, non-durable exchange with no extra arguments * @see com.rabbitmq.client.AMQP.Exchange.Declare diff --git a/src/main/java/com/rabbitmq/client/WriteListener.java b/src/main/java/com/rabbitmq/client/WriteListener.java new file mode 100644 index 0000000000..f0b2675667 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/WriteListener.java @@ -0,0 +1,44 @@ +// Copyright (c) 2007-2026 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client; + +/** + * Listener notified when the network layer has finished writing a message body. + * + *

This is primarily useful when publishing with an off-heap {@link java.nio.ByteBuffer}: the + * callback tells the caller when the buffer is no longer in use and can be safely released. + * + *

This only applies when using Netty as the IO layer and is unnecessary when using the blocking + * IO layer. + * + *

Threading: the callback may be invoked on the Netty event loop thread. + * Implementations must not perform blocking operations. + * + *

This API is experimental and is susceptible to change at any time. + * + * @since 5.30.0 + */ +@FunctionalInterface +public interface WriteListener { + + /** + * Called when the write operation completes. + * + * @param success {@code true} if the data was written to the socket successfully + * @param cause the exception if the write failed, {@code null} on success + */ + void done(boolean success, Throwable cause); +} diff --git a/src/main/java/com/rabbitmq/client/impl/AMQCommand.java b/src/main/java/com/rabbitmq/client/impl/AMQCommand.java index e9efc99376..3056fa6d51 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQCommand.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQCommand.java @@ -18,11 +18,13 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Command; +import com.rabbitmq.client.WriteListener; /** * AMQP 0-9-1-specific implementation of {@link Command} which accumulates @@ -46,14 +48,17 @@ public class AMQCommand implements Command { /** The assembler for this command - synchronised on - contains all the state */ private final CommandAssembler assembler; private final Lock assemblerLock = new ReentrantLock(); + private final WriteListener writeListener; + /** Construct a command for inbound frame assembly with a max body length. */ AMQCommand(int maxBodyLength) { - this(null, null, null, maxBodyLength); + this.assembler = new CommandAssembler(null, null, maxBodyLength); + this.writeListener = null; } - /** Construct a command ready to fill in by reading frames */ + /** Construct a command ready to fill in by reading frames. */ public AMQCommand() { - this(null, null, null, Integer.MAX_VALUE); + this(Integer.MAX_VALUE); } /** @@ -61,29 +66,31 @@ public AMQCommand() { * @param method the wrapped method */ public AMQCommand(com.rabbitmq.client.Method method) { - this(method, null, null, Integer.MAX_VALUE); + this.assembler = new CommandAssembler((Method) method, null, Integer.MAX_VALUE); + this.writeListener = null; } /** - * Construct a command with a specified method, header and body. + * Construct a command with a ByteBuffer body for transmission. * @param method the wrapped method * @param contentHeader the wrapped content header - * @param body the message body data + * @param body the message body as a ByteBuffer */ - public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) { - this.assembler = new CommandAssembler((Method) method, contentHeader, body, Integer.MAX_VALUE); + public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, ByteBuffer body) { + this(method, contentHeader, body, null); } /** - * Construct a command with a specified method, header and body. + * Construct a command with a ByteBuffer body and a write completion listener. * @param method the wrapped method * @param contentHeader the wrapped content header - * @param body the message body data - * @param maxBodyLength the maximum size for an inbound message body + * @param body the message body as a ByteBuffer + * @param writeListener called when the network layer finishes writing, may be {@code null} */ - public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body, - int maxBodyLength) { - this.assembler = new CommandAssembler((Method) method, contentHeader, body, maxBodyLength); + public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, ByteBuffer body, + WriteListener writeListener) { + this.assembler = new CommandAssembler((Method) method, contentHeader, body); + this.writeListener = writeListener; } /** Public API - {@inheritDoc} */ @@ -122,13 +129,13 @@ public void transmit(AMQChannel channel) throws IOException { try { Method m = this.assembler.getMethod(); if (m.hasContent()) { - byte[] body = this.assembler.getContentBody(); - - Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length); + ByteBuffer body = this.assembler.getByteBufferBody(); + int bodySize = body == null ? 0 : body.remaining(); + Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, bodySize); int frameMax = connection.getFrameMax(); boolean cappedFrameMax = frameMax > 0; - int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length; + int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : bodySize; if (cappedFrameMax && headerFrame.size() > frameMax) { String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax); @@ -137,14 +144,14 @@ public void transmit(AMQChannel channel) throws IOException { connection.writeFrame(m.toFrame(channelNumber)); connection.writeFrame(headerFrame); - for (int offset = 0; offset < body.length; offset += bodyPayloadMax) { - int remaining = body.length - offset; - - int fragmentLength = (remaining < bodyPayloadMax) ? remaining - : bodyPayloadMax; - Frame frame = Frame.fromBodyFragment(channelNumber, body, - offset, fragmentLength); - connection.writeFrame(frame); + if (body != null) { + int bodyPosition = body.position(); + for (int offset = 0; offset < bodySize; offset += bodyPayloadMax) { + int remaining = bodySize - offset; + int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax; + Frame frame = Frame.fromBodyFragment(channelNumber, body, bodyPosition + offset, fragmentLength); + connection.writeFrame(frame); + } } } else { connection.writeFrame(m.toFrame(channelNumber)); @@ -153,7 +160,7 @@ public void transmit(AMQChannel channel) throws IOException { assemblerLock.unlock(); } - connection.flush(); + connection.flush(this.writeListener); } @Override public String toString() { @@ -200,7 +207,7 @@ public static void checkPreconditions() { * code in Frame. */ private static void checkEmptyFrameSize() { - Frame f = new Frame(AMQP.FRAME_BODY, 0, new byte[0]); + Frame f = new Frame(AMQP.FRAME_BODY, 0, ByteBuffer.wrap(new byte[0])); ByteArrayOutputStream s = new ByteArrayOutputStream(); try { f.writeTo(new DataOutputStream(s)); diff --git a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java index 8810033f54..1b738309c8 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java @@ -655,8 +655,12 @@ void writeFrame(Frame f) throws IOException { * Public API - flush the output buffers */ public void flush() throws IOException { + flush(null); + } + + void flush(WriteListener listener) throws IOException { try { - _frameHandler.flush(); + _frameHandler.flush(listener); } catch (IOException ioe) { this.errorOnWriteListener.handle(this, ioe); } diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index 84ba47112c..ea2df2d717 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; @@ -714,6 +715,18 @@ public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException + { + basicPublish(exchange, routingKey, mandatory, immediate, props, + body == null ? null : ByteBuffer.wrap(body), null); + } + + /** Public API - {@inheritDoc} */ + @Override + public void basicPublish(String exchange, String routingKey, + boolean mandatory, boolean immediate, + BasicProperties props, ByteBuffer body, + WriteListener listener) + throws IOException { final long deliveryTag; if (nextPublishSeqNo > 0) { @@ -734,7 +747,7 @@ public void basicPublish(String exchange, String routingKey, .build(); try { ObservationCollector.PublishCall publishCall = properties -> { - AMQCommand command = new AMQCommand(publish, properties, body); + AMQCommand command = new AMQCommand(publish, properties, body, listener); transmit(command); }; observationCollector.publish(publishCall, publish, props, body, this.connectionInfo()); diff --git a/src/main/java/com/rabbitmq/client/impl/CommandAssembler.java b/src/main/java/com/rabbitmq/client/impl/CommandAssembler.java index 2e3e977950..ad051ae666 100644 --- a/src/main/java/com/rabbitmq/client/impl/CommandAssembler.java +++ b/src/main/java/com/rabbitmq/client/impl/CommandAssembler.java @@ -16,6 +16,7 @@ package com.rabbitmq.client.impl; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -50,20 +51,41 @@ private enum CAState { /** sum of the lengths of all fragments */ private int bodyLength; + /** Zero-copy body buffer, mutually exclusive with bodyN usage on the outbound path */ + private final ByteBuffer byteBufferBody; + /** No bytes of content body not yet accumulated */ private long remainingBodyBytes; private final int maxBodyLength; - public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body, + public CommandAssembler(Method method, AMQContentHeader contentHeader, int maxBodyLength) { this.method = method; this.contentHeader = contentHeader; this.bodyN = new ArrayList<>(2); this.bodyLength = 0; + this.byteBufferBody = null; this.remainingBodyBytes = 0; this.maxBodyLength = maxBodyLength; - appendBodyFragment(body); + if (method == null) { + this.state = CAState.EXPECTING_METHOD; + } else if (contentHeader == null) { + this.state = method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE; + } else { + this.remainingBodyBytes = contentHeader.getBodySize() - this.bodyLength; + updateContentBodyState(); + } + } + + public CommandAssembler(Method method, AMQContentHeader contentHeader, ByteBuffer body) { + this.method = method; + this.contentHeader = contentHeader; + this.bodyN = new ArrayList<>(0); + this.bodyLength = body == null ? 0 : body.remaining(); + this.byteBufferBody = body; + this.remainingBodyBytes = 0; + this.maxBodyLength = Integer.MAX_VALUE; if (method == null) { this.state = CAState.EXPECTING_METHOD; } else if (contentHeader == null) { @@ -151,9 +173,19 @@ private byte[] coalesceContentBody() { } public synchronized byte[] getContentBody() { + if (byteBufferBody != null) { + ByteBuffer dup = byteBufferBody.duplicate(); + byte[] result = new byte[dup.remaining()]; + dup.get(result); + return result; + } return coalesceContentBody(); } + public ByteBuffer getByteBufferBody() { + return this.byteBufferBody; + } + private void appendBodyFragment(byte[] fragment) { if (fragment == null || fragment.length == 0) return; bodyN.add(fragment); diff --git a/src/main/java/com/rabbitmq/client/impl/DefaultHeartbeatSender.java b/src/main/java/com/rabbitmq/client/impl/DefaultHeartbeatSender.java index 0026ae51f4..180e0b40ff 100644 --- a/src/main/java/com/rabbitmq/client/impl/DefaultHeartbeatSender.java +++ b/src/main/java/com/rabbitmq/client/impl/DefaultHeartbeatSender.java @@ -137,7 +137,7 @@ public void run() { if (now > (lastActivityTime + this.heartbeatNanos)) { frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0)); - frameHandler.flush(); + frameHandler.flush(null); } } catch (IOException e) { // ignore diff --git a/src/main/java/com/rabbitmq/client/impl/Frame.java b/src/main/java/com/rabbitmq/client/impl/Frame.java index f38e98db9e..02ff0dc5ad 100644 --- a/src/main/java/com/rabbitmq/client/impl/Frame.java +++ b/src/main/java/com/rabbitmq/client/impl/Frame.java @@ -23,6 +23,7 @@ import java.io.*; import java.math.BigDecimal; import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; import java.util.Date; import java.util.List; import java.util.Map; @@ -44,6 +45,9 @@ public class Frame { /** Frame payload (for outbound frames) */ private final ByteArrayOutputStream accumulator; + /** Frame payload as a ByteBuffer (for basic.publish body frames) */ + private final ByteBuffer byteBufferPayload; + private static final int NON_BODY_SIZE = 1 /* type */ + 2 /* channel */ + 4 /* payload size */ + 1 /* end character */; /** @@ -55,6 +59,7 @@ public Frame(int type, int channel) { this.channel = channel; this.payload = null; this.accumulator = new ByteArrayOutputStream(); + this.byteBufferPayload = null; } /** @@ -66,15 +71,31 @@ public Frame(int type, int channel, byte[] payload) { this.channel = channel; this.payload = payload; this.accumulator = null; + this.byteBufferPayload = null; } - public static Frame fromBodyFragment(int channelNumber, byte[] body, int offset, int length) - throws IOException - { - Frame frame = new Frame(AMQP.FRAME_BODY, channelNumber); - DataOutputStream bodyOut = frame.getOutputStream(); - bodyOut.write(body, offset, length); - return frame; + /** + * For basic.publish frames. + * @param type + * @param channel + * @param byteBufferPayload + */ + public Frame(int type, int channel, ByteBuffer byteBufferPayload) { + this.type = type; + this.channel = channel; + this.payload = null; + this.accumulator = null; + this.byteBufferPayload = byteBufferPayload; + } + + /** + * Body frame from a ByteBuffer slice. The buffer is not copied; + * the caller must not modify it after this call. + */ + public static Frame fromBodyFragment(int channelNumber, ByteBuffer body, int offset, int length) { + ByteBuffer slice = body.duplicate(); + slice.position(offset).limit(offset + length); + return new Frame(AMQP.FRAME_BODY, channelNumber, slice.slice()); } /** @@ -191,12 +212,21 @@ public static void protocolVersionMismatch(DataInputStream is) throws IOExceptio public void writeTo(DataOutputStream os) throws IOException { os.writeByte(type); os.writeShort(channel); - if (accumulator != null) { + if (byteBufferPayload != null) { + ByteBuffer buf = byteBufferPayload.duplicate(); + os.writeInt(buf.remaining()); + if (buf.hasArray()) { + os.write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); + } else { + byte[] tmp = new byte[buf.remaining()]; + buf.get(tmp); + os.write(tmp); + } + } else if (accumulator != null) { os.writeInt(accumulator.size()); accumulator.writeTo(os); } else { - os.writeInt(payload.length); - os.write(payload); + throw new IllegalStateException("Either a ByteBuffer or an accumulator is used to write a frame"); } os.write(AMQP.FRAME_END); } @@ -222,27 +252,41 @@ public void write(byte[] b, int off, int len) { buf.writeBytes(b, off, len); } }); - } else { - buf.writeInt(payload.length); - buf.writeBytes(payload); } buf.writeByte(AMQP.FRAME_END); } public int size() { - if(accumulator != null) { + if (byteBufferPayload != null) { + return byteBufferPayload.remaining() + NON_BODY_SIZE; + } else if (accumulator != null) { return accumulator.size() + NON_BODY_SIZE; } else { return payload.length + NON_BODY_SIZE; } } + /** + * Returns the ByteBuffer payload if this frame was constructed from one, or null otherwise. + * The returned buffer is a duplicate; its position and limit are independent of the original. + */ + ByteBuffer getByteBufferPayload() { + return byteBufferPayload == null ? null : byteBufferPayload.duplicate(); + } + /** * Public API - retrieves the frame payload */ public byte[] getPayload() { if (payload != null) return payload; + if (byteBufferPayload != null) { + ByteBuffer dup = byteBufferPayload.duplicate(); + byte[] result = new byte[dup.remaining()]; + dup.get(result); + return result; + } + // This is a Frame we've constructed ourselves. For some reason (e.g. // testing), we're acting as if we received it even though it // didn't come in off the wire. @@ -266,7 +310,9 @@ public DataOutputStream getOutputStream() { @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Frame(type=").append(type).append(", channel=").append(channel).append(", "); - if (accumulator == null) { + if (byteBufferPayload != null) { + sb.append(byteBufferPayload.remaining()).append(" bytes of ByteBuffer payload)"); + } else if (accumulator == null) { sb.append(payload.length).append(" bytes of payload)"); } else { sb.append(accumulator.size()).append(" bytes of accumulator)"); diff --git a/src/main/java/com/rabbitmq/client/impl/FrameHandler.java b/src/main/java/com/rabbitmq/client/impl/FrameHandler.java index a3810cfb38..f7b75d4149 100644 --- a/src/main/java/com/rabbitmq/client/impl/FrameHandler.java +++ b/src/main/java/com/rabbitmq/client/impl/FrameHandler.java @@ -15,6 +15,7 @@ package com.rabbitmq.client.impl; +import com.rabbitmq.client.WriteListener; import java.io.IOException; import java.net.SocketException; import java.net.SocketTimeoutException; @@ -74,10 +75,12 @@ default void finishConnectionNegotiation() { void writeFrame(Frame frame) throws IOException; /** - * Flush the underlying data connection. + * Flush the underlying data connection and notify the listener when the write completes. + * The default implementation flushes synchronously and calls the listener immediately. + * @param listener called when the flush completes, may be {@code null} * @throws IOException if there is a problem accessing the connection */ - void flush() throws IOException; + void flush(WriteListener listener) throws IOException; /** Close the underlying data connection (complaint not permitted). */ void close(); diff --git a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java index 483265f3aa..80b821a5a4 100644 --- a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java @@ -25,14 +25,18 @@ import com.rabbitmq.client.MalformedFrameException; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.SocketConfigurator; +import com.rabbitmq.client.WriteListener; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; @@ -51,6 +55,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -170,11 +175,20 @@ private static final class NettyFrameHandler implements FrameHandler { new byte[] { 'A', 'M', 'Q', 'P', 0, AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR, AMQP.PROTOCOL.REVISION }; + + /** + * A shared, unreleasable buffer for the AMQP FRAME_END. Prevents allocation on every message + * and is safe to share across threads. + */ + private static final ByteBuf FRAME_END_BUFFER = + Unpooled.unreleasableBuffer(Unpooled.directBuffer(1).writeByte(AMQP.FRAME_END)); + private final EventLoopGroup eventLoopGroup; private final Duration enqueuingTimeout; private final Channel channel; private final AmqpHandler handler; private final AtomicBoolean closed = new AtomicBoolean(false); + private final int zeroCopyThreshold = 1024; private NettyFrameHandler( int maxInboundMessageBodySize, @@ -377,14 +391,72 @@ public void writeFrame(Frame frame) throws IOException { } private void doWriteFrame(Frame frame) throws IOException { - ByteBuf bb = this.channel.alloc().buffer(frame.size()); - frame.writeToByteBuf(bb); - this.channel.writeAndFlush(bb); + ByteBuffer bbPayload = frame.getByteBufferPayload(); + if (bbPayload != null) { + int payloadSize = bbPayload.remaining(); + if (bbPayload.isDirect() && payloadSize > this.zeroCopyThreshold) { + // zero-copy (Large Off-Heap Buffers) + ByteBuf header = this.channel.alloc().directBuffer(7); + CompositeByteBuf composite = null; + try { + header.writeByte(frame.getType()); + header.writeShort(frame.getChannel()); + header.writeInt(payloadSize); + + composite = this.channel.alloc().compositeBuffer(3); + composite.addComponents( + true, header, Unpooled.wrappedBuffer(bbPayload), FRAME_END_BUFFER.duplicate()); + this.channel.write(composite, this.channel.voidPromise()); + } catch (Throwable t) { + // If composite exists, releasing it releases the header too. + // If it doesn't exist yet, we must release the header manually. + if (composite != null) { + composite.release(); + } else { + header.release(); + } + throw t; + } + } else { + // contiguous copy (Small or Heap Buffers) + int totalSize = 7 + payloadSize + 1; + ByteBuf bb = this.channel.alloc().directBuffer(totalSize); + try { + bb.writeByte(frame.getType()); + bb.writeShort(frame.getChannel()); + bb.writeInt(payloadSize); + bb.writeBytes(bbPayload); // uses memcpy + bb.writeByte(AMQP.FRAME_END); + this.channel.write(bb, this.channel.voidPromise()); + } catch (Exception e) { + bb.release(); + throw e; + } + } + } else { + ByteBuf bb = this.channel.alloc().buffer(frame.size()); + try { + frame.writeToByteBuf(bb); + this.channel.write(bb); + } catch (RuntimeException | IOException e) { + bb.release(); + throw e; + } + } + if (!this.channel.isWritable()) { + this.channel.flush(); + } } @Override - public void flush() { - this.channel.flush(); + public void flush(WriteListener listener) { + if (listener == null) { + this.channel.flush(); + } else { + ChannelPromise promise = this.channel.newPromise(); + promise.addListener(future -> listener.done(future.isSuccess(), future.cause())); + this.channel.writeAndFlush(Unpooled.EMPTY_BUFFER, promise); + } } @Override diff --git a/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java b/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java index cb9f06333a..b9d6301fbb 100644 --- a/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java +++ b/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java @@ -16,6 +16,7 @@ package com.rabbitmq.client.impl; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.WriteListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -213,8 +214,18 @@ public void writeFrame(Frame frame) throws IOException { } @Override - public void flush() throws IOException { - _outputStream.flush(); + public void flush(WriteListener listener) throws IOException { + try { + _outputStream.flush(); + if (listener != null) { + listener.done(true, null); + } + } catch (IOException e) { + if (listener != null) { + listener.done(false, e); + } + throw e; + } } @Override @@ -222,12 +233,9 @@ public void close() { try { _socket.setSoLinger(true, SOCKET_CLOSING_TIMEOUT); } catch (Exception _e) {} // async flush if possible // see https://github.com/rabbitmq/rabbitmq-java-client/issues/194 - Callable flushCallable = new Callable() { - @Override - public Void call() throws Exception { - flush(); - return null; - } + Callable flushCallable = () -> { + flush(null); + return null; }; Future flushTask = null; try { diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index 6fc10ad85a..8f39a319d1 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; @@ -221,6 +222,11 @@ public void basicPublish(String exchange, String routingKey, boolean mandatory, delegate.basicPublish(exchange, routingKey, mandatory, immediate, props, body); } + @Override + public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, ByteBuffer body, WriteListener listener) throws IOException { + delegate.basicPublish(exchange, routingKey, mandatory, immediate, props, body, listener); + } + @Override public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException { return exchangeDeclare(exchange, type, false, false, null); diff --git a/src/main/java/com/rabbitmq/client/observation/NoOpObservationCollector.java b/src/main/java/com/rabbitmq/client/observation/NoOpObservationCollector.java index fe28f3da74..1880b90c81 100644 --- a/src/main/java/com/rabbitmq/client/observation/NoOpObservationCollector.java +++ b/src/main/java/com/rabbitmq/client/observation/NoOpObservationCollector.java @@ -19,6 +19,7 @@ import com.rabbitmq.client.Consumer; import com.rabbitmq.client.GetResponse; import java.io.IOException; +import java.nio.ByteBuffer; final class NoOpObservationCollector implements ObservationCollector { @@ -27,7 +28,7 @@ public void publish( PublishCall call, AMQP.Basic.Publish publish, AMQP.BasicProperties properties, - byte[] body, + ByteBuffer body, ConnectionInfo connectionInfo) throws IOException { call.publish(properties); diff --git a/src/main/java/com/rabbitmq/client/observation/ObservationCollector.java b/src/main/java/com/rabbitmq/client/observation/ObservationCollector.java index e66db1e00b..98c6611e45 100644 --- a/src/main/java/com/rabbitmq/client/observation/ObservationCollector.java +++ b/src/main/java/com/rabbitmq/client/observation/ObservationCollector.java @@ -19,6 +19,7 @@ import com.rabbitmq.client.Consumer; import com.rabbitmq.client.GetResponse; import java.io.IOException; +import java.nio.ByteBuffer; /** * API to instrument operations in the AMQP client. The supported operations are publishing, @@ -40,9 +41,8 @@ public interface ObservationCollector { /** * Decorate message publishing. * - *

Implementations are expected to call {@link PublishCall#publish( PublishCall, - * AMQP.Basic.Publish, AMQP.BasicProperties, byte[], ConnectionInfo)} to make sure the message is - * actually sent. + *

Implementations are expected to call {@link PublishCall#publish(AMQP.BasicProperties)} to + * make sure the message is actually sent. * * @param call * @param publish @@ -55,7 +55,7 @@ void publish( PublishCall call, AMQP.Basic.Publish publish, AMQP.BasicProperties properties, - byte[] body, + ByteBuffer body, ConnectionInfo connectionInfo) throws IOException; diff --git a/src/main/java/com/rabbitmq/client/observation/micrometer/MicrometerObservationCollector.java b/src/main/java/com/rabbitmq/client/observation/micrometer/MicrometerObservationCollector.java index 14cc7e3758..a221ba1e1e 100644 --- a/src/main/java/com/rabbitmq/client/observation/micrometer/MicrometerObservationCollector.java +++ b/src/main/java/com/rabbitmq/client/observation/micrometer/MicrometerObservationCollector.java @@ -21,6 +21,7 @@ import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -58,7 +59,7 @@ public void publish( PublishCall call, AMQP.Basic.Publish publish, AMQP.BasicProperties properties, - byte[] body, + ByteBuffer body, ConnectionInfo connectionInfo) throws IOException { Map headers; @@ -67,13 +68,10 @@ public void publish( } else { headers = new HashMap<>(properties.getHeaders()); } + int bodySize = body == null ? 0 : body.remaining(); PublishContext micrometerPublishContext = new PublishContext( - publish.getExchange(), - publish.getRoutingKey(), - headers, - body == null ? 0 : body.length, - connectionInfo); + publish.getExchange(), publish.getRoutingKey(), headers, bodySize, connectionInfo); AMQP.BasicProperties.Builder builder = properties.builder(); builder.headers(headers); Observation observation = diff --git a/src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java b/src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java index b3ea3b3198..7e5d54b6cd 100644 --- a/src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java +++ b/src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java @@ -264,7 +264,7 @@ public int getPort() { return -1; } - public void flush() throws IOException { + public void flush(WriteListener listener) throws IOException { // no need to implement this: don't bother writing the frame } diff --git a/src/test/java/com/rabbitmq/client/test/BrokenFramesTest.java b/src/test/java/com/rabbitmq/client/test/BrokenFramesTest.java index cadfe6f83f..c127de8f22 100644 --- a/src/test/java/com/rabbitmq/client/test/BrokenFramesTest.java +++ b/src/test/java/com/rabbitmq/client/test/BrokenFramesTest.java @@ -19,6 +19,7 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.UnexpectedFrameError; +import com.rabbitmq.client.WriteListener; import com.rabbitmq.client.impl.AMQConnection; import com.rabbitmq.client.impl.AMQImpl.Basic.Publish; import com.rabbitmq.client.impl.Frame; @@ -30,6 +31,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.SocketException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -79,7 +81,7 @@ public class BrokenFramesTest { Publish method = new Publish(1, "test", "test", false, false); frames.add(method.toFrame(0)); - frames.add(Frame.fromBodyFragment(channelNumber, contentBody, 0, contentBody.length)); + frames.add(Frame.fromBodyFragment(channelNumber, ByteBuffer.wrap(contentBody), 0, contentBody.length)); myFrameHandler.setFrames(frames.iterator()); @@ -151,7 +153,7 @@ public int getPort() { return -1; } - public void flush() throws IOException { + public void flush(WriteListener listener) throws IOException { // no need to implement this: don't bother writing the frame } diff --git a/src/test/java/com/rabbitmq/client/test/ByteBufferPublishTest.java b/src/test/java/com/rabbitmq/client/test/ByteBufferPublishTest.java new file mode 100644 index 0000000000..3638c848bb --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/ByteBufferPublishTest.java @@ -0,0 +1,95 @@ +// Copyright (c) 2026 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.test; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.impl.AMQCommand; +import com.rabbitmq.client.impl.AMQContentHeader; +import com.rabbitmq.client.impl.Frame; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Random; +import org.junit.jupiter.api.Test; + +public class ByteBufferPublishTest { + + @Test + void frameFromByteBufferHasCorrectSize() { + byte[] data = "hello world".getBytes(StandardCharsets.UTF_8); + ByteBuffer buf = ByteBuffer.wrap(data); + Frame frame = Frame.fromBodyFragment(1, buf, 0, data.length); + assertThat(frame.size()).isEqualTo(data.length + 8); + } + + @Test + void frameFromByteBufferSlice() { + byte[] data = "AAAA hello BBBB".getBytes(StandardCharsets.UTF_8); + ByteBuffer buf = ByteBuffer.wrap(data); + Frame frame = Frame.fromBodyFragment(1, buf, 5, 5); + byte[] payload = frame.getPayload(); + assertThat(new String(payload, StandardCharsets.UTF_8)).isEqualTo("hello"); + } + + @Test + void frameFromDirectByteBuffer() { + byte[] data = "direct buffer test".getBytes(StandardCharsets.UTF_8); + ByteBuffer direct = ByteBuffer.allocateDirect(data.length); + direct.put(data); + direct.flip(); + Frame frame = Frame.fromBodyFragment(1, direct, 0, data.length); + assertThat(frame.getPayload()).isEqualTo(data); + } + + @Test + void frameToString() { + byte[] data = new byte[42]; + ByteBuffer buf = ByteBuffer.wrap(data); + Frame frame = Frame.fromBodyFragment(1, buf, 0, data.length); + assertThat(frame.toString()).contains("42").contains("ByteBuffer"); + } + + @Test + void amqCommandByteBufferConstructor() { + byte[] data = "amqcommand test".getBytes(StandardCharsets.UTF_8); + ByteBuffer buf = ByteBuffer.wrap(data); + + AMQP.Basic.Publish publish = + new AMQP.Basic.Publish.Builder().exchange("").routingKey("test").build(); + AMQContentHeader header = new AMQP.BasicProperties.Builder().build(); + AMQCommand cmd = new AMQCommand(publish, header, buf); + + assertThat(cmd.getMethod()).isNotNull(); + assertThat(cmd.getContentHeader()).isNotNull(); + assertThat(cmd.getContentBody()).isEqualTo(data); + } + + @Test + void frameFromByteBufferWithOffset() { + byte[] data = new byte[100]; + new Random().nextBytes(data); + ByteBuffer buf = ByteBuffer.wrap(data); + + Frame frame = Frame.fromBodyFragment(1, buf, 20, 50); + byte[] payload = frame.getPayload(); + assertThat(payload.length).isEqualTo(50); + + byte[] expected = new byte[50]; + System.arraycopy(data, 20, expected, 0, 50); + assertThat(payload).isEqualTo(expected); + } +} diff --git a/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java b/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java index 9c8336c6a3..dd7d54025f 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java @@ -74,7 +74,9 @@ BlockedConnectionTest.class, NettyTest.class, IoDeadlockOnConnectionClosing.class, - ProtocolVersionMismatch.class + ProtocolVersionMismatch.class, + ByteBufferPublishTest.class, + PublishWithByteBufferTest.class }) public class ClientTestSuite { diff --git a/src/test/java/com/rabbitmq/client/test/PublishWithByteBufferTest.java b/src/test/java/com/rabbitmq/client/test/PublishWithByteBufferTest.java new file mode 100644 index 0000000000..a8b460a8fe --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/PublishWithByteBufferTest.java @@ -0,0 +1,102 @@ +// Copyright (c) 2017-2026 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.WriteListener; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +public class PublishWithByteBufferTest extends BrokerTestCase { + + String queue; + + @Override + protected void createResources() throws IOException, TimeoutException { + queue = channel.queueDeclare(UUID.randomUUID().toString(), true, false, false, null).getQueue(); + } + + @Override + protected void releaseResources() throws IOException { + channel.queueDelete(queue); + } + + @ParameterizedTest + @CsvSource({ + "-1,12", + "-1,1000", + "-1,50000", + "-1,66000", + "-1,550000", + "10000,12", + "10000,1000", + "10000,50000", + "10000,550000" + }) + public void publishByteBuffer(int frameMax, int size) throws Exception { + byte[] masterData = new byte[size]; + ThreadLocalRandom.current().nextBytes(masterData); + ConnectionFactory cf = TestUtils.connectionFactory(); + if (frameMax > 0) { + cf.setRequestedFrameMax(frameMax); + } + try (Connection connection = cf.newConnection()) { + final CountDownLatch consumingLatch = new CountDownLatch(1); + AtomicReference dataRef = new AtomicReference<>(); + String ctag = + this.channel.basicConsume( + queue, + true, + (consumerTag, delivery) -> { + dataRef.set(delivery.getBody()); + consumingLatch.countDown(); + }, + consumerTag -> {}); + Channel publishingChannel = connection.createChannel(); + ByteBuffer buf = ByteBuffer.allocateDirect(size); + buf.put(masterData); + buf.flip(); + AtomicBoolean canBeReleased = new AtomicBoolean(false); + publishingChannel.basicPublish( + "", + queue, + false, + false, + null, + buf, + (WriteListener) (success, cause) -> canBeReleased.set(success)); + assertTrue( + consumingLatch.await(1, TimeUnit.SECONDS), "deliver callback should have been called"); + assertThat(dataRef.get()).isEqualTo(masterData); + assertThat(canBeReleased.get()).isTrue(); + this.channel.basicCancel(ctag); + } + } +} diff --git a/src/test/java/com/rabbitmq/client/test/functional/UnexpectedFrames.java b/src/test/java/com/rabbitmq/client/test/functional/UnexpectedFrames.java index 1edd21befd..a78a9a8d5a 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/UnexpectedFrames.java +++ b/src/test/java/com/rabbitmq/client/test/functional/UnexpectedFrames.java @@ -27,6 +27,7 @@ import javax.net.SocketFactory; import java.io.IOException; import java.net.Socket; +import java.nio.ByteBuffer; /** * Test that the server correctly handles us when we send it bad frames @@ -112,7 +113,7 @@ public Frame confuse(Frame frame) { // We can't just skip the method as that will lead us to // send 0 bytes and hang waiting for a response. return new Frame(AMQP.FRAME_HEADER, - frame.getChannel(), frame.getPayload()); + frame.getChannel(), ByteBuffer.wrap(frame.getPayload())); } return frame; } @@ -136,7 +137,7 @@ public Frame confuse(Frame frame) { if (frame.getType() == AMQP.FRAME_HEADER) { byte[] payload = frame.getPayload(); Frame confusedFrame = new Frame(AMQP.FRAME_HEADER, - frame.getChannel(), payload); + frame.getChannel(), ByteBuffer.wrap(payload)); // First two bytes = class ID, must match class ID from // method. payload[0] = 12; @@ -164,7 +165,7 @@ public Frame confuse(Frame frame) { public Frame confuse(Frame frame) { if (frame.getType() == AMQP.FRAME_METHOD) { return new Frame(0, frame.getChannel(), - "1234567890\0001234567890".getBytes()); + ByteBuffer.wrap("1234567890\0001234567890".getBytes())); } return frame; }