-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Expand file tree
/
Copy pathPublisherConfirmsAsync.java
More file actions
74 lines (61 loc) · 3.08 KB
/
PublisherConfirmsAsync.java
File metadata and controls
74 lines (61 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
import com.rabbitmq.client.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* Example demonstrating asynchronous publisher confirmations with ConfirmationChannel.
*/
public class PublisherConfirmsAsync {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Create ConfirmationChannel with optional rate limiting
RateLimiter rateLimiter = new ThrottlingRateLimiter(100); // Max 100 in-flight
ConfirmationChannel confirmChannel = ConfirmationChannel.create(channel, rateLimiter);
String queueName = confirmChannel.queueDeclare().getQueue();
// Collect futures for all publishes
List<CompletableFuture<String>> futures = new ArrayList<>();
// Publish messages asynchronously with context for correlation
for (int i = 0; i < 10; i++) {
String messageId = "msg-" + i;
String message = "Message " + i;
CompletableFuture<String> future = confirmChannel.basicPublishAsync(
"",
queueName,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(),
messageId // Context parameter for correlation
);
// Handle confirmation
future.thenAccept(id -> System.out.println("Confirmed: " + id))
.exceptionally(ex -> {
if (ex.getCause() instanceof PublishException) {
PublishException pe = (PublishException) ex.getCause();
System.err.println("Failed: " + pe.getContext() + " - " + ex.getMessage());
}
return null;
});
futures.add(future);
}
// Wait for all confirmations
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
System.out.println("All messages published and confirmed");
}
}
}