diff --git a/sidebarsTutorials.js b/sidebarsTutorials.js index b4c7e61a7..e7a12f04e 100644 --- a/sidebarsTutorials.js +++ b/sidebarsTutorials.js @@ -96,6 +96,42 @@ const sidebars = { }, ] }, + { + type: 'category', + label: 'Java (AMQP 1.0)', + items: [ + { + type: 'doc', + id: 'tutorial-one-java-amqp10', + label: 'Hello World', + }, + { + type: 'doc', + id: 'tutorial-two-java-amqp10', + label: 'Work Queues', + }, + { + type: 'doc', + id: 'tutorial-three-java-amqp10', + label: 'Publish/Subscribe', + }, + { + type: 'doc', + id: 'tutorial-four-java-amqp10', + label: 'Routing', + }, + { + type: 'doc', + id: 'tutorial-five-java-amqp10', + label: 'Topics', + }, + { + type: 'doc', + id: 'tutorial-six-java-amqp10', + label: 'RPC', + }, + ] + }, { type: 'category', label: 'Kotlin', @@ -255,6 +291,42 @@ const sidebars = { }, ] }, + { + type: 'category', + label: '.NET (AMQP 1.0)', + items: [ + { + type: 'doc', + id: 'tutorial-one-dotnet-amqp10', + label: 'Hello World', + }, + { + type: 'doc', + id: 'tutorial-two-dotnet-amqp10', + label: 'Work Queues', + }, + { + type: 'doc', + id: 'tutorial-three-dotnet-amqp10', + label: 'Publish/Subscribe', + }, + { + type: 'doc', + id: 'tutorial-four-dotnet-amqp10', + label: 'Routing', + }, + { + type: 'doc', + id: 'tutorial-five-dotnet-amqp10', + label: 'Topics', + }, + { + type: 'doc', + id: 'tutorial-six-dotnet-amqp10', + label: 'RPC', + }, + ] + }, { type: 'category', label: 'JavaScript', @@ -327,6 +399,42 @@ const sidebars = { }, ] }, + { + type: 'category', + label: 'Go (AMQP 1.0)', + items: [ + { + type: 'doc', + id: 'tutorial-one-go-amqp10', + label: 'Hello World', + }, + { + type: 'doc', + id: 'tutorial-two-go-amqp10', + label: 'Work Queues', + }, + { + type: 'doc', + id: 'tutorial-three-go-amqp10', + label: 'Publish/Subscribe', + }, + { + type: 'doc', + id: 'tutorial-four-go-amqp10', + label: 'Routing', + }, + { + type: 'doc', + id: 'tutorial-five-go-amqp10', + label: 'Topics', + }, + { + type: 'doc', + id: 'tutorial-six-go-amqp10', + label: 'RPC', + }, + ] + }, { type: 'category', label: 'Elixir', diff --git a/tutorials/index.md b/tutorials/index.md index 8976c32f0..a8344039c 100644 --- a/tutorials/index.md +++ b/tutorials/index.md @@ -18,6 +18,9 @@ See the License for the specific language governing permissions and limitations under the License. --> +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + import T1DiagramToC from '@site/src/components/Tutorials/T1DiagramToC.md'; import T2DiagramToC from '@site/src/components/Tutorials/T2DiagramToC.md'; import T3DiagramToC from '@site/src/components/Tutorials/T3DiagramToC.md'; @@ -41,15 +44,111 @@ please see the [installation guide](/docs/download) or use the [community Docker Executable versions of these tutorials [are open source](https://github.com/rabbitmq/rabbitmq-tutorials), as is [this website](https://github.com/rabbitmq/rabbitmq-website). -There are two groups of tutorials: +There are three groups of tutorials: - * [RabbitMQ queues](#queue-tutorials) + * [RabbitMQ queues with AMQP 1.0](#queue-amqp-tutorials) + * [RabbitMQ queues with AMQP 0.9.1](#queue-tutorials) * [RabbitMQ streams](#stream-tutorials) :::info This tutorials target RabbitMQ 4.x. ::: + + + +### Queue tutorials with AMQP 1.0 + +This section covers the modern messaging protocol in RabbitMQ, AMQP 1.0. This protocol works +out of the box, without enabling any plugin. + + + + + + + + + + + + + + + + + + + + + + + +
+ ## 1. "Hello World!" + + The simplest thing that does *something* + + + + * [Java](tutorials/tutorial-one-java-amqp10) + * [C#](tutorials/tutorial-one-dotnet-amqp10) + * [Go](tutorials/tutorial-one-go-amqp10) + + ## 2. Work Queues + + Distributing tasks among workers (the competing consumers pattern) + + + + * [Java](tutorials/tutorial-two-java-amqp10) + * [C#](tutorials/tutorial-two-dotnet-amqp10) + * [Go](tutorials/tutorial-two-go-amqp10) + + ## 3. Publish/Subscribe + + Sending messages to many consumers at once + + + + * [Java](tutorials/tutorial-three-java-amqp10) + * [C#](tutorials/tutorial-three-dotnet-amqp10) + * [Go](tutorials/tutorial-three-go-amqp10) +
+ ## 4. Routing + + Receiving messages selectively + + + + * [Java](tutorials/tutorial-four-java-amqp10) + * [C#](tutorials/tutorial-four-dotnet-amqp10) + * [Go](tutorials/tutorial-four-go-amqp10) + + ## 5. Topics + + Receiving messages based on a pattern (topics) + + + + * [Java](tutorials/tutorial-five-java-amqp10) + * [C#](tutorials/tutorial-five-dotnet-amqp10) + * [Go](tutorials/tutorial-five-go-amqp10) + + ## 6. RPC + + Request/reply pattern example + + + + * [Java](tutorials/tutorial-six-java-amqp10) + * [C#](tutorials/tutorial-six-dotnet-amqp10) + * [Go](tutorials/tutorial-six-go-amqp10) +
+ +
+ + ### Queue tutorials @@ -211,6 +310,8 @@ and take a look at the [Compatibility and Conformance page](/docs/specification) to find relevant resources to learn more about AMQP 1.0 and AMQP 0-9-1, the two core protocols implemented by RabbitMQ. + +
## Stream tutorials diff --git a/tutorials/tutorial-five-dotnet-amqp10.md b/tutorials/tutorial-five-dotnet-amqp10.md new file mode 100644 index 000000000..e31e9f15a --- /dev/null +++ b/tutorials/tutorial-five-dotnet-amqp10.md @@ -0,0 +1,58 @@ +--- +title: RabbitMQ tutorial - Topics (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T5DiagramToC from '@site/src/components/Tutorials/T5DiagramToC.md'; +import T5DiagramTopicX from '@site/src/components/Tutorials/T5DiagramTopicX.md'; + +# RabbitMQ tutorial - Topics + +## Topics +### (using the AMQP 1.0 .NET client) + + + + + +In the [previous tutorial](./tutorial-four-dotnet-amqp10) we used a `direct` exchange to route by a single criterion (severity). **Topic** exchanges route by **patterns** in the routing key — useful when messages carry multiple dimensions (for example `kern.critical`). + + + +The sample declares exchange `logs_topic` with type `topic`. The publisher sets the routing key on `PublisherBuilder().Exchange(exchangeName).Key(routingKey)`. The consumer passes one or more binding keys (patterns) on the command line and binds the temporary queue for each. + +Topic binding rules: + +- `*` substitutes exactly one word. +- `#` substitutes zero or more words. +- Words are separated by `.` in routing keys. + +### Running + +```bash +dotnet run --project ReceiveLogsTopic/ReceiveLogsTopic.csproj "kern.*" +dotnet run --project EmitLogTopic/EmitLogTopic.csproj kern.critical "A critical kernel error" +``` + +### Source + +- [`EmitLogTopic/Program.cs`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/dotnet-amqp/EmitLogTopic/Program.cs) +- [`ReceiveLogsTopic/Program.cs`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/dotnet-amqp/ReceiveLogsTopic/Program.cs) + +Now we can move on to [tutorial 6](./tutorial-six-dotnet-amqp10) to learn about the RPC (request/reply) pattern with RabbitMQ. diff --git a/tutorials/tutorial-five-go-amqp10.md b/tutorials/tutorial-five-go-amqp10.md new file mode 100644 index 000000000..4d0f25b5e --- /dev/null +++ b/tutorials/tutorial-five-go-amqp10.md @@ -0,0 +1,109 @@ +--- +title: RabbitMQ tutorial - Topics (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T5DiagramToC from '@site/src/components/Tutorials/T5DiagramToC.md'; +import T5DiagramTopicX from '@site/src/components/Tutorials/T5DiagramTopicX.md'; + +# RabbitMQ tutorial - Topics + +## Topics +### (using the AMQP 1.0 Go client) + + + + + +In the [previous tutorial](./tutorial-four-go-amqp10) we improved our logging +system. Instead of using a `fanout` exchange only capable of mindless +broadcasting, we used a `direct` one, and gained the ability to selectively +receive the logs. + +Although using the `direct` exchange improved our system, it still has +limitations - it can't do routing based on multiple criteria. + +In our logging system we might want to subscribe to not only logs based on +severity, but also based on the source which emitted the log. You might know +this concept from the `syslog` Unix tool, which routes logs based on both +severity (info/warn/crit...) and facility (auth/cron/kern...). + +That would give us a lot of flexibility - we may want to listen to just critical +errors coming from 'cron' but also all logs from 'kern'. + +To implement our logging system we're going to use a `topic` exchange. Topic +exchanges implement pattern-based message routing. + + + +The `topic` exchange is powerful and can behave like other exchanges: + +- When a queue is bound with binding key `"*"` (star) it will match any routing key. +- When special characters `"*"` (star) and `"#"` (hash) aren't used in bindings, the topic exchange will behave just like a `direct` one. + +Publishing +---------- + +We'll use a topic exchange named `logs` with routing keys that have the form `"."`: + +```bash +go run emit_log_topic.go kern.critical "A critical kernel error" +go run emit_log_topic.go auth.info "Auth info message" +``` + +See [the full `emit_log_topic.go`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/emit_log_topic.go) for the implementation using `TopicExchangeSpecification`. + +Subscribing +----------- + +Receiving will show the power of topic exchanges. We'll create bindings with +pattern-based routing keys: + +```bash +go run receive_logs_topic.go "kern.*" +go run receive_logs_topic.go "*.critical" +go run receive_logs_topic.go "kern.critical" "auth.warn" +``` + +- The first consumer will receive all messages from the `kern` facility. +- The second will receive all messages of severity `critical`. +- The third will receive only critical errors from `kern` and warnings from `auth`. + +See [the full `receive_logs_topic.go`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/receive_logs_topic.go) for the implementation. + +Topic exchange binding rules +---------------------------- + +The topic exchange binding rules are: + +- `"#"` can substitute for zero or more words. +- `"*"` can substitute for exactly one word. +- Routing keys are separated by dots (e.g., `"kern.critical"`). + +Putting it all together +----------------------- + +The full code examples are available at: + +- [`emit_log_topic.go`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/emit_log_topic.go) +- [`receive_logs_topic.go`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/receive_logs_topic.go) + +The main pattern is using `TopicExchangeSpecification` instead of `DirectExchangeSpecification`, and binding queues with pattern-based routing keys using `"*"` and `"#"` wildcards. + +Now we can move on to [tutorial 6](./tutorial-six-go-amqp10) to learn about the RPC (Request/Reply) pattern with RabbitMQ. diff --git a/tutorials/tutorial-five-java-amqp10.md b/tutorials/tutorial-five-java-amqp10.md new file mode 100644 index 000000000..e015251b6 --- /dev/null +++ b/tutorials/tutorial-five-java-amqp10.md @@ -0,0 +1,27 @@ +--- +title: RabbitMQ tutorial - Topics (AMQP 1.0) +--- + + +# RabbitMQ tutorial - Topics (AMQP 1.0) + +The AMQP 1.0 Java edition of this tutorial is not yet published on this site. + +For the same concepts using the **AMQP 0-9-1** Java client, see [Topics](./tutorial-five-java). + +For other languages, the [tutorials index](./) lists AMQP 1.0 versions where available. diff --git a/tutorials/tutorial-four-dotnet-amqp10.md b/tutorials/tutorial-four-dotnet-amqp10.md new file mode 100644 index 000000000..84f76ca62 --- /dev/null +++ b/tutorials/tutorial-four-dotnet-amqp10.md @@ -0,0 +1,77 @@ +--- +title: RabbitMQ tutorial - Routing (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T4DiagramToC from '@site/src/components/Tutorials/T4DiagramToC.md'; +import T4DiagramDirectX from '@site/src/components/Tutorials/T4DiagramDirectX.md'; +import T4DiagramMultipleBindings from '@site/src/components/Tutorials/T4DiagramMultipleBindings.md'; + +# RabbitMQ tutorial - Routing + +## Routing +### (using the AMQP 1.0 .NET client) + + + + + +In the [previous tutorial](./tutorial-three-dotnet-amqp10) we used a `fanout` exchange. Here we use a **`direct`** exchange so consumers can subscribe to a subset of messages (for example by severity). + +Bindings +-------- + + + + + +The sample uses exchange `logs_direct`. The publisher sets the routing key when building the publisher: + +```csharp +IPublisher publisher = await connection.PublisherBuilder().Exchange(exchangeName).Key(severity).BuildAsync(); +``` + +The consumer declares an exclusive temporary queue and binds it once per severity from the command line: + +```csharp +foreach (string severity in args) +{ + IBindingSpecification binding = management.Binding() + .SourceExchange(exchangeSpec) + .DestinationQueue(queueName) + .Key(severity); + await binding.BindAsync(); +} +``` + +The handler can read the effective routing key from message annotations (see `ReceiveLogsDirect/Program.cs`). + +### Running + +```bash +dotnet run --project ReceiveLogsDirect/ReceiveLogsDirect.csproj warning error +dotnet run --project EmitLogDirect/EmitLogDirect.csproj warning "Run. Run. Or it will explode." +``` + +### Source + +- [`EmitLogDirect/Program.cs`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/dotnet-amqp/EmitLogDirect/Program.cs) +- [`ReceiveLogsDirect/Program.cs`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/dotnet-amqp/ReceiveLogsDirect/Program.cs) + +Now we can move on to [tutorial 5](./tutorial-five-dotnet-amqp10) to learn about pattern-based routing with topic exchanges. diff --git a/tutorials/tutorial-four-go-amqp10.md b/tutorials/tutorial-four-go-amqp10.md new file mode 100644 index 000000000..af92f2b7e --- /dev/null +++ b/tutorials/tutorial-four-go-amqp10.md @@ -0,0 +1,128 @@ +--- +title: RabbitMQ tutorial - Routing (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T4DiagramToC from '@site/src/components/Tutorials/T4DiagramToC.md'; +import T4DiagramDirectX from '@site/src/components/Tutorials/T4DiagramDirectX.md'; +import T4DiagramMultipleBindings from '@site/src/components/Tutorials/T4DiagramMultipleBindings.md'; + +# RabbitMQ tutorial - Routing + +## Routing +### (using the AMQP 1.0 Go client) + + + + + +In the [previous tutorial](./tutorial-three-go-amqp10) we built a simple publish/subscribe system. We were able to broadcast log messages to many receivers. + +In this tutorial we're going to add a feature to it - we're going to make it +possible to subscribe only to a subset of the messages. For example, we will be +able to direct only critical error messages to a log file (to save disk space), +while still being able to print all of the log messages on the console. + +Bindings +-------- + +In previous examples we were already creating bindings. You may recall code like: + +```go +_, err = conn.Management().Bind(ctx, &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: "logs", + DestinationQueue: qInfo.Name(), + BindingKey: "", +}) +``` + +A binding is a relationship between an exchange and a queue. This can be +simply read as: the queue is interested in messages from this exchange. + +The binding key is used differently depending on the exchange type. With a +`direct` exchange, messages are routed to queues whose binding key exactly matches +the routing key of the message. + + + +This logic allows for selective message delivery. Let's illustrate this with an +example: + + + +In this setup, we can see the `direct` exchange `X` with two queues bound to it. +The first queue is bound with binding key `orange`, and the second has two +bindings, one with binding key `black` and one with `green`. + +In such a setup a message published to the exchange with a routing key +`orange` will be routed to queue `Q1`. Messages with a routing key of `black` +or `green` will go to `Q2`. All other messages will be discarded. + +Multiple bindings +----------------- + + + +It is perfectly legal to bind multiple queues with the same binding key. In our +example we could add a binding between `X` and `Q1` with binding key `black`. +In that case, the `direct` exchange will behave like `fanout` and will broadcast +the message to all the matching queues. A message with routing key `black` would +be delivered to both `Q1` and `Q2`. + +Emitting logs +------------- + +We'll use this model for our logging system. Instead of `fanout` we'll send +messages to a `direct` exchange. We'll supply the log level as the `routing key`. +That way the receiving program will be able to select the severity level it +wants to log. Let's start with emitting logs: + +```bash +go run emit_log_direct.go warn "A warning message" +go run emit_log_direct.go error "An error message" +``` + +See [the tutorial source code](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/emit_log_direct.go) for the full implementation. The key difference from the previous tutorial is using `DirectExchangeSpecification` and providing routing keys when binding and publishing. + +Subscribing +----------- + +Receiving will work similarly to the previous tutorial, but with one +difference — we'll create a new binding for each severity level we're +interested in: + +```bash +go run receive_logs_direct.go info warn +``` + +This will receive only messages with severity level `info` or `warn`. + +See [the full `receive_logs_direct.go`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/receive_logs_direct.go) for the implementation. The consumer binds to multiple routing keys using `DirectExchangeSpecification` with separate bindings per key. + +Putting it all together +----------------------- + +The full code examples are available at: + +- [`emit_log_direct.go`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/emit_log_direct.go) +- [`receive_logs_direct.go`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/receive_logs_direct.go) + +The main pattern is using `DirectExchangeSpecification` instead of `FanOutExchangeSpecification`, and binding queues with specific routing keys instead of empty binding keys. + +Now we can move on to [tutorial 5](./tutorial-five-go-amqp10) to learn about pattern-based routing with topic exchanges. diff --git a/tutorials/tutorial-four-java-amqp10.md b/tutorials/tutorial-four-java-amqp10.md new file mode 100644 index 000000000..9efb52e0d --- /dev/null +++ b/tutorials/tutorial-four-java-amqp10.md @@ -0,0 +1,27 @@ +--- +title: RabbitMQ tutorial - Routing (AMQP 1.0) +--- + + +# RabbitMQ tutorial - Routing (AMQP 1.0) + +The AMQP 1.0 Java edition of this tutorial is not yet published on this site. + +For the same concepts using the **AMQP 0-9-1** Java client, see [Routing](./tutorial-four-java). + +For other languages, the [tutorials index](./) lists AMQP 1.0 versions where available. diff --git a/tutorials/tutorial-one-dotnet-amqp10.md b/tutorials/tutorial-one-dotnet-amqp10.md new file mode 100644 index 000000000..50b0ff9f9 --- /dev/null +++ b/tutorials/tutorial-one-dotnet-amqp10.md @@ -0,0 +1,188 @@ +--- +title: RabbitMQ tutorial - "Hello World!" (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import TutorialsIntro from '@site/src/components/Tutorials/TutorialsIntro.md'; +import T1DiagramHello from '@site/src/components/Tutorials/T1DiagramHello.md'; +import T1DiagramSending from '@site/src/components/Tutorials/T1DiagramSending.md'; +import T1DiagramReceiving from '@site/src/components/Tutorials/T1DiagramReceiving.md'; + +# RabbitMQ tutorial - "Hello World!" + +## Introduction + + + + +## "Hello World" +### (using the AMQP 1.0 .NET client) + +In this part of the tutorial we'll write two small programs in C#; a +producer that sends a single message, and a consumer that receives +messages and prints them out. We'll gloss over some of the detail in +the .NET AMQP 1.0 client API, concentrating on this very simple thing just to get +started. It's the "Hello World" of messaging. + +In the diagram below, "P" is our producer and "C" is our consumer. The +box in the middle is a queue - a message buffer that RabbitMQ keeps +on behalf of the consumer. + + + +> #### The AMQP 1.0 .NET client library +> +> RabbitMQ speaks multiple protocols. This tutorial uses **AMQP 1.0** over the same port as AMQP 0-9-1 (5672 by default). It requires **RabbitMQ 4.0 or later**. +> +> Use the RabbitMQ **AMQP 1.0** .NET client (`RabbitMQ.AMQP.Client` on NuGet), not the classic AMQP 0-9-1 client (`RabbitMQ.Client`). See [AMQP 1.0 client libraries](/client-libraries/amqp-client-libraries) and the [.NET client API](https://rabbitmq.github.io/rabbitmq-amqp-dotnet-client/api/RabbitMQ.AMQP.Client.html). +> +> Add the package to your project: +> +> ```bash +> dotnet add package RabbitMQ.AMQP.Client +> ``` +> +> Runnable sources for this tutorial series are available in the [RabbitMQ tutorials](https://github.com/rabbitmq/rabbitmq-tutorials) repository (`dotnet-amqp` directory) once merged upstream. + +Now we have the client referenced, we can write some code. + +### Sending + + + +We'll call our message publisher (sender) `Send` and our message consumer +`Receive`. The publisher will connect to RabbitMQ, send a single message, +then exit. + +In `Send/Program.cs`, use these namespaces: + +```csharp +using System.Text; +using RabbitMQ.AMQP.Client; +using RabbitMQ.AMQP.Client.Impl; +``` + +Create connection settings, an environment, and a connection. The URI uses the default virtual host (`%2f` is `/`): + +```csharp +const string brokerUri = "amqp://guest:guest@localhost:5672/%2f"; + +ConnectionSettings settings = ConnectionSettingsBuilder.Create() + .Uri(new Uri(brokerUri)) + .ContainerId("tutorial-send") + .Build(); + +IEnvironment environment = AmqpEnvironment.Create(settings); +IConnection connection = await environment.CreateConnectionAsync(); +``` + +Declare a **quorum queue** named `hello`, then create a publisher and publish a message. Check `PublishResult.Outcome.State` for `OutcomeState.Accepted`: + +```csharp +try +{ + IManagement management = connection.Management(); + IQueueSpecification queueSpec = management.Queue("hello").Type(QueueType.QUORUM); + await queueSpec.DeclareAsync(); + + IPublisher publisher = await connection.PublisherBuilder().Queue("hello").BuildAsync(); + try + { + const string body = "Hello World!"; + var message = new AmqpMessage(Encoding.UTF8.GetBytes(body)); + PublishResult pr = await publisher.PublishAsync(message); + if (pr.Outcome.State != OutcomeState.Accepted) + { + Console.Error.WriteLine($"Unexpected publish outcome: {pr.Outcome.State}"); + Environment.Exit(1); + } + + Console.WriteLine($" [x] Sent {body}"); + } + finally + { + await publisher.CloseAsync(); + } +} +finally +{ + await connection.CloseAsync(); + await environment.CloseAsync(); +} +``` + +[Full `Send/Program.cs` source](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/dotnet-amqp/Send/Program.cs) (once merged upstream). + +> #### Sending doesn't work! +> +> If this is your first time using RabbitMQ and you don't see the " [x] Sent" message then you may be left scratching your head wondering what could be wrong. Maybe the broker was started without enough free disk space (by default it needs at least 50 MB free) and is therefore refusing to accept messages. Check the broker [log file](/docs/logging/) to see if there is a [resource alarm](/docs/alarms) logged. + +### Receiving + +That's it for our publisher. Our consumer listens for messages from RabbitMQ, so unlike the publisher which publishes a single message, we'll keep the consumer running to listen for messages and print them out. + + + +The code in `Receive/Program.cs` declares the same quorum queue, then builds a consumer with a message handler. Call `ctx.Accept()` to settle the message (AMQP 1.0): + +```csharp +IManagement management = connection.Management(); +IQueueSpecification queueSpec = management.Queue("hello").Type(QueueType.QUORUM); +await queueSpec.DeclareAsync(); + +IConsumer consumer = await connection.ConsumerBuilder() + .Queue("hello") + .MessageHandler((ctx, message) => + { + Console.WriteLine($"Received a message: {Encoding.UTF8.GetString(message.Body()!)}"); + ctx.Accept(); + return Task.CompletedTask; + }) + .BuildAndStartAsync(); +``` + +Use Ctrl+C handling and `Task.Delay(Timeout.Infinite, cts.Token)` to keep the process alive until interrupted. + +[Full `Receive/Program.cs` source](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/dotnet-amqp/Receive/Program.cs) (once merged upstream). + +### Putting it all together + +From the `dotnet-amqp` directory, run the consumer then the publisher: + +```bash +dotnet run --project Receive/Receive.csproj +dotnet run --project Send/Send.csproj +``` + +The consumer will print the message it gets from the publisher via RabbitMQ. + +> #### Listing queues +> +> You may wish to see what queues RabbitMQ has and how many messages are in them. You can do it (as a privileged user) using the `rabbitmqctl` tool: +> +> ```bash +> sudo rabbitmqctl list_queues +> ``` +> +> On Windows, omit the sudo: +> ```PowerShell +> rabbitmqctl.bat list_queues +> ``` + +Time to move on to [part 2](./tutorial-two-dotnet-amqp10) and build a simple _work queue_. diff --git a/tutorials/tutorial-one-go-amqp10.md b/tutorials/tutorial-one-go-amqp10.md new file mode 100644 index 000000000..56dbc075a --- /dev/null +++ b/tutorials/tutorial-one-go-amqp10.md @@ -0,0 +1,256 @@ +--- +title: RabbitMQ tutorial - "Hello World!" (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import TutorialsIntro from '@site/src/components/Tutorials/TutorialsIntro.md'; +import T1DiagramHello from '@site/src/components/Tutorials/T1DiagramHello.md'; +import T1DiagramSending from '@site/src/components/Tutorials/T1DiagramSending.md'; +import T1DiagramReceiving from '@site/src/components/Tutorials/T1DiagramReceiving.md'; + +# RabbitMQ tutorial - "Hello World!" + +## Introduction + + + + +## "Hello World" +### (using the AMQP 1.0 Go client) + +In this part of the tutorial we'll write two small programs in Go; a +producer that sends a single message, and a consumer that receives +messages and prints them out. We'll gloss over some of the detail in +the Go AMQP 1.0 client API, concentrating on this very simple thing just to get +started. It's the "Hello World" of messaging. + +In the diagram below, "P" is our producer and "C" is our consumer. The +box in the middle is a queue - a message buffer that RabbitMQ keeps +on behalf of the consumer. + + + +> #### The AMQP 1.0 Go client library +> +> RabbitMQ speaks multiple protocols. This tutorial uses **AMQP 1.0** over the same port as AMQP 0-9-1 (5672 by default). It requires **RabbitMQ 4.0 or later**. +> +> Use the RabbitMQ **AMQP 1.0** Go client (`rabbitmq-amqp-go-client`), not the classic AMQP 0-9-1 client (`amqp091-go`). See [AMQP 1.0 client libraries](/client-libraries/amqp-client-libraries) and the [Go client package](https://pkg.go.dev/github.com/rabbitmq/rabbitmq-amqp-go-client). +> +> Install with: +> +> ```go +> go get github.com/rabbitmq/rabbitmq-amqp-go-client +> ``` + +Now we have the client installed, we can write some code. + +### Sending + + + +We'll call our message publisher (sender) `send.go` and our message consumer +`receive.go`. The publisher will connect to RabbitMQ, send a single message, +then exit. + +In `send.go`, we need to import the library first: + +```go +package main + +import ( + "context" + "log" + + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) +``` + +Set up the connection to the broker using an environment and connection: + +```go +const brokerURI = "amqp://guest:guest@localhost:5672/" + +func main() { + ctx := context.Background() + env := rmq.NewEnvironment(brokerURI, nil) + conn, err := env.NewConnection(ctx) + if err != nil { + log.Panicf("Failed to connect to RabbitMQ: %v", err) + } + defer func() { + _ = env.CloseConnections(context.Background()) + }() +``` + +The environment holds shared settings, and the connection represents a socket to the broker. Deferring `CloseConnections` ensures resources are released when the program exits. + +Declare a **quorum queue** named `"hello"`: + +```go + _, err = conn.Management().DeclareQueue(ctx, &rmq.QuorumQueueSpecification{Name: "hello"}) + if err != nil { + log.Panicf("Failed to declare a queue: %v", err) + } +``` + +Declaring a queue is idempotent: it will only be created if it does not already exist. Quorum queues are durable and replicated across RabbitMQ nodes. + +Create a publisher and send a message: + +```go + publisher, err := conn.NewPublisher(ctx, &rmq.QueueAddress{Queue: "hello"}, nil) + if err != nil { + log.Panicf("Failed to create publisher: %v", err) + } + defer func() { _ = publisher.Close(context.Background()) }() + + body := "Hello World!" + res, err := publisher.Publish(ctx, rmq.NewMessage([]byte(body))) + if err != nil { + log.Panicf("Failed to publish a message: %v", err) + } + switch res.Outcome.(type) { + case *rmq.StateAccepted: + default: + log.Panicf("Unexpected publish outcome: %v", res.Outcome) + } + log.Printf(" [x] Sent %s\n", body) +} +``` + +With AMQP 1.0, `Publish` returns an outcome. Check for `StateAccepted` to confirm the broker accepted the message. + +[Here's the whole `send.go` file](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/send.go). + +> #### Sending doesn't work! +> +> If this is your first time using RabbitMQ and you don't see the " [x] Sent" message then you may be left scratching your head wondering what could be wrong. Maybe the broker was started without enough free disk space (by default it needs at least 50 MB free) and is therefore refusing to accept messages. Check the broker [log file](/docs/logging/) to see if there is a [resource alarm](/docs/alarms) logged. + +### Receiving + +That's it for our publisher. Our consumer listens for messages from RabbitMQ, so unlike the publisher which publishes a single message, we'll keep the consumer running to listen for messages and print them out. + + + +The code in `receive.go` is similar. Set up the environment and connection, declare the same queue, then create a consumer: + +```go +package main + +import ( + "context" + "errors" + "log" + + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +const brokerURI = "amqp://guest:guest@localhost:5672/" + +func main() { + ctx := context.Background() + env := rmq.NewEnvironment(brokerURI, nil) + conn, err := env.NewConnection(ctx) + if err != nil { + log.Panicf("Failed to connect to RabbitMQ: %v", err) + } + defer func() { + _ = env.CloseConnections(context.Background()) + }() + + _, err = conn.Management().DeclareQueue(ctx, &rmq.QuorumQueueSpecification{Name: "hello"}) + if err != nil { + log.Panicf("Failed to declare a queue: %v", err) + } + + consumer, err := conn.NewConsumer(ctx, "hello", nil) + if err != nil { + log.Panicf("Failed to create consumer: %v", err) + } + defer func() { _ = consumer.Close(context.Background()) }() + + log.Printf(" [*] Waiting for messages. To exit press CTRL+C") + for { + delivery, err := consumer.Receive(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + log.Panicf("Failed to receive a message: %v", err) + } + msg := delivery.Message() + var body string + if len(msg.Data) > 0 { + body = string(msg.Data[0]) + } + log.Printf("Received a message: %s", body) + err = delivery.Accept(ctx) + if err != nil { + log.Panicf("Failed to accept message: %v", err) + } + } +} +``` + +Note that we declare the queue here as well. Because we might start the consumer before the publisher, we want to make sure the queue exists before we try to consume messages from it. + +With AMQP 1.0, the consumer **must settle** each message by calling `Accept` (or `Discard` / `Requeue`). Here we call `delivery.Accept(ctx)` after printing the message. + +[Here's the whole `receive.go` file](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/receive.go). + +### Putting it all together + +Create a `go.mod` file for your project: + +``` +module rabbitmq-tutorials + +go 1.21 + +require github.com/rabbitmq/rabbitmq-amqp-go-client v0.7.0 +``` + +Then compile and run the examples. In one terminal, run the consumer: + +```bash +go run receive.go +``` + +Then run the publisher: + +```bash +go run send.go +``` + +The consumer will print the message it gets from the publisher via RabbitMQ. The consumer will keep running, waiting for messages (use Ctrl+C to stop it), so try running the publisher from another terminal. + +> #### Listing queues +> +> You may wish to see what queues RabbitMQ has and how many messages are in them. You can do it (as a privileged user) using the `rabbitmqctl` tool: +> +> ```bash +> sudo rabbitmqctl list_queues +> ``` +> +> On Windows, omit the sudo: +> ```PowerShell +> rabbitmqctl.bat list_queues +> ``` + +Time to move on to [part 2](./tutorial-two-go-amqp10) and build a simple _work queue_. diff --git a/tutorials/tutorial-one-java-amqp10.md b/tutorials/tutorial-one-java-amqp10.md new file mode 100644 index 000000000..2567380de --- /dev/null +++ b/tutorials/tutorial-one-java-amqp10.md @@ -0,0 +1,296 @@ +--- +title: RabbitMQ tutorial - "Hello World!" (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import TutorialsIntro from '@site/src/components/Tutorials/TutorialsIntro.md'; +import T1DiagramHello from '@site/src/components/Tutorials/T1DiagramHello.md'; +import T1DiagramSending from '@site/src/components/Tutorials/T1DiagramSending.md'; +import T1DiagramReceiving from '@site/src/components/Tutorials/T1DiagramReceiving.md'; + +# RabbitMQ tutorial - "Hello World!" + +## Introduction + + + + +## "Hello World" +### (using the AMQP 1.0 Java client) + +In this part of the tutorial we'll write two programs in Java; a +producer that sends a single message, and a consumer that receives +messages and prints them out. We'll gloss over some of the detail in +the Java API, concentrating on this very simple thing just to get +started. It's the "Hello World" of messaging. + +In the diagram below, "P" is our producer and "C" is our consumer. The +box in the middle is a queue - a message buffer that RabbitMQ keeps +on behalf of the consumer. + + + +> #### The AMQP 1.0 Java client library +> +> RabbitMQ speaks multiple protocols. This tutorial uses **AMQP 1.0** over the same port as AMQP 0-9-1 (5672 by default). It requires **RabbitMQ 4.0 or later**. +> +> Use the RabbitMQ **AMQP 1.0** Java client (`com.rabbitmq.client:amqp-client`), not the classic AMQP 0-9-1 client (`com.rabbitmq:amqp-client`). See [AMQP 1.0 client libraries](/client-libraries/amqp-client-libraries) and the [client reference](https://rabbitmq.github.io/rabbitmq-amqp-java-client/stable/htmlsingle/). +> +> Add the dependency to your build, for example with Maven: +> +> ```xml +> +> com.rabbitmq.client +> amqp-client +> 0.10.0 +> +> ``` +> +> Runnable sources for this tutorial series live alongside the other ports in the [RabbitMQ tutorials](https://github.com/rabbitmq/rabbitmq-tutorials) repository (`java-amqp` directory). + +Now we have the client on the classpath, we can write some +code. + +### Sending + + + +We'll call our message publisher (sender) `Send` and our message consumer (receiver) +`Recv`. The publisher will connect to RabbitMQ, send a single message, +then exit. + +In +[`Send.java`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/java-amqp/Send.java), +we need some classes imported: + +```java +import com.rabbitmq.client.amqp.Connection; +import com.rabbitmq.client.amqp.Environment; +import com.rabbitmq.client.amqp.Publisher; +import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +``` + +Set up the class and name the queue: + +```java +public class Send { + private static final String QUEUE_NAME = "hello"; + public static void main(String[] argv) throws Exception { + ... + } +} +``` + +Then create an [environment](https://rabbitmq.github.io/rabbitmq-amqp-java-client/stable/htmlsingle/#connection-settings-at-the-environment-level) +and a **connection**. The environment holds shared settings; each connection targets the broker. +Here the URI points at a broker on the local machine with the default virtual host (`%2f` is `/`). + +```java +Environment environment = new AmqpEnvironmentBuilder() + .connectionSettings() + .uri("amqp://guest:guest@localhost:5672/%2f") + .environmentBuilder() + .build(); +Connection connection = environment.connectionBuilder().build(); +``` + +The connection abstracts the socket connection and takes care of protocol +negotiation and authentication. To connect to a different host, change the host (and credentials) in the URI. + +```java +connection.management().queue(QUEUE_NAME).quorum().queue().declare(); +``` + +RabbitMQ still exposes the **AMQ 0.9.1 model** (queues, exchanges, bindings) for topology. Declare a +**quorum queue** before publishing. The declare API uses a fluent chain; for quorum queues it must end +with `.quorum().queue().declare()`. Declaring a queue is idempotent: it is only created if it does not +already exist. + +To send a message, create a **publisher** addressed at the queue, build a message, and call `publish`. +The broker reports the outcome asynchronously; wait on a latch so the program does not exit before feedback +arrives. A successful publish has status `Publisher.Status.ACCEPTED`: + +```java +try (Publisher publisher = connection.publisherBuilder().queue(QUEUE_NAME).build()) { + String message = "Hello World!"; + CountDownLatch latch = new CountDownLatch(1); + publisher.publish( + publisher.message(message.getBytes(StandardCharsets.UTF_8)), + context -> { + if (context.status() == Publisher.Status.ACCEPTED) { + System.out.println(" [x] Sent '" + message + "'"); + } + latch.countDown(); + }); + if (!latch.await(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("Timed out waiting for publish outcome"); + } +} +``` + +[Here's the whole Send.java +class](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/java-amqp/Send.java). + +> #### Sending doesn't work! +> +> If this is your first time using RabbitMQ and you don't see the "Sent" +> message then you may be left scratching your head wondering what could +> be wrong. Maybe the broker was started without enough free disk space +> (by default it needs at least 50 MB free) and is therefore refusing to +> accept messages. Check the broker [log file](/docs/logging/) to see if there +> is a [resource alarm](/docs/alarms) logged and reduce the +> free disk space threshold if necessary. +> The [Configuration guide](/docs/configure#config-items) +> will show you how to set disk_free_limit. + + +### Receiving + +That's it for our publisher. Our consumer listens for messages from +RabbitMQ, so unlike the publisher which publishes a single message, we'll +keep the consumer running to listen for messages and print them out. + + + +The code in [`Recv.java`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/java-amqp/Recv.java) uses the same environment and connection setup. Open a connection, then declare the same queue so the consumer can start before the publisher: + +```java +import com.rabbitmq.client.amqp.Connection; +import com.rabbitmq.client.amqp.Consumer; +import com.rabbitmq.client.amqp.Environment; +import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +``` + +```java +public class Recv { + + private static final String QUEUE_NAME = "hello"; + + public static void main(String[] argv) throws Exception { + Environment environment = new AmqpEnvironmentBuilder() + .connectionSettings() + .uri("amqp://guest:guest@localhost:5672/%2f") + .environmentBuilder() + .build(); + Connection connection = environment.connectionBuilder().build(); + + connection.management().queue(QUEUE_NAME).quorum().queue().declare(); + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + Consumer consumer = connection.consumerBuilder() + .queue(QUEUE_NAME) + .messageHandler((context, message) -> { + String text = new String(message.body(), StandardCharsets.UTF_8); + System.out.println(" [x] Received '" + text + "'"); + context.accept(); + }) + .build(); + + new CountDownLatch(1).await(); + } +} +``` + +Note that we declare the queue here as well. Because we might start +the consumer before the publisher, we want to make sure the queue exists +before we try to consume messages from it. + +Why not use try-with-resources on `Environment` and `Connection` in the consumer? Closing them would stop the process as soon as the try block ends. The sample keeps the consumer running; use **Ctrl+C** to stop the JVM (or extend the example to close resources on shutdown). + +With AMQP 1.0, the consumer **must settle** each message (`accept`, `discard`, or `requeue`). Here we call `context.accept()` after printing the body. + +[Here's the whole Recv.java +class](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/java-amqp/Recv.java). + +### Putting it all together + +Create a `pom.xml` that includes the client and the [Exec Maven Plugin](https://www.mojohaus.org/exec-maven-plugin/) so you can run the classes by name: + +```xml + + 4.0.0 + com.rabbitmq.examples + amqp10-tutorials + 1.0-SNAPSHOT + + 11 + + + + com.rabbitmq.client + amqp-client + 0.10.0 + + + + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + + ${exec.mainClass} + + + + + +``` + +Place `Send.java` and `Recv.java` under `src/main/java/`, then in one terminal run the consumer: + +```bash +mvn -q compile exec:java -Dexec.mainClass=Recv +``` + +Then run the publisher: + +```bash +mvn -q compile exec:java -Dexec.mainClass=Send +``` + +The consumer will print the message it gets from the publisher via +RabbitMQ. The consumer will keep running, waiting for messages (use Ctrl+C to stop it), so try running +the publisher from another terminal. + +> #### Listing queues +> +> You may wish to see what queues RabbitMQ has and how many +> messages are in them. You can do it (as a privileged user) using the `rabbitmqctl` tool: +> +> ```bash +> sudo rabbitmqctl list_queues +> ``` +> +> On Windows, omit the sudo: +> ```PowerShell +> rabbitmqctl.bat list_queues +> ``` + + +Time to move on to [part 2](./tutorial-two-java-amqp10) and build a simple _work queue_. diff --git a/tutorials/tutorial-six-dotnet-amqp10.md b/tutorials/tutorial-six-dotnet-amqp10.md new file mode 100644 index 000000000..ba5277fc1 --- /dev/null +++ b/tutorials/tutorial-six-dotnet-amqp10.md @@ -0,0 +1,93 @@ +--- +title: RabbitMQ tutorial - Remote procedure call (RPC) (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T6DiagramToC from '@site/src/components/Tutorials/T6DiagramToC.md'; + +# RabbitMQ tutorial - Remote procedure call (RPC) + +## Remote procedure call (RPC) +### (using the AMQP 1.0 .NET client) + + + + + +In the [previous tutorial](./tutorial-five-dotnet-amqp10) we used topic exchanges. In this tutorial we implement **request/reply** (RPC): a client sends a request and waits for a response. + +The sample RPC service computes Fibonacci numbers on the server. The server uses **`IResponder`** on a quorum queue `rpc_queue`; the client uses **`IRequester`** to publish requests and await replies. + +### Server (`RPCServer`) + +```csharp +IResponder responder = await connection.ResponderBuilder() + .RequestQueue(rpcQueueName) + .Handler((ctx, request) => + { + string response = ""; + try + { + string message = Encoding.UTF8.GetString(request.Body()!); + int n = int.Parse(message); + Console.WriteLine($" [.] fib({message})"); + response += Fib(n); + } + catch (Exception e) + { + Console.WriteLine($" [.] {e.Message}"); + } + + return Task.FromResult(ctx.Message(Encoding.UTF8.GetBytes(response))); + }) + .BuildAsync(); +``` + +### Client (`RPCClient`) + +```csharp +IRequester requester = await connection.RequesterBuilder() + .RequestAddress() + .Queue("rpc_queue") + .Requester() + .BuildAsync(); + +IMessage request = new AmqpMessage(Encoding.UTF8.GetBytes(iStr)); +IMessage reply = await requester.PublishAsync(request); +``` + +The tutorial client requests `fib(0)` through `fib(31)` in a loop, matching the other AMQP 1.0 tutorial ports. + +### Running + +```bash +dotnet run --project RPCServer/RPCServer.csproj +dotnet run --project RPCClient/RPCClient.csproj +``` + +### Source + +- [`RPCServer/Program.cs`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/dotnet-amqp/RPCServer/Program.cs) +- [`RPCClient/Program.cs`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/dotnet-amqp/RPCClient/Program.cs) + +> #### A note on RPC +> +> RPC is common but easy to misuse: keep clear which calls are local vs remote, document dependencies, and handle broker or server outages. When in doubt, prefer asynchronous pipelines over blocking RPC. + +For more on AMQP 1.0 and RabbitMQ, see [AMQP in RabbitMQ](/docs/amqp) and [AMQP 1.0 client libraries](/client-libraries/amqp-client-libraries). diff --git a/tutorials/tutorial-six-go-amqp10.md b/tutorials/tutorial-six-go-amqp10.md new file mode 100644 index 000000000..40187f712 --- /dev/null +++ b/tutorials/tutorial-six-go-amqp10.md @@ -0,0 +1,125 @@ +--- +title: RabbitMQ tutorial - RPC (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T6DiagramToC from '@site/src/components/Tutorials/T6DiagramToC.md'; + +# RabbitMQ tutorial - RPC + +## RPC +### (using the AMQP 1.0 Go client) + + + + + +In the [previous tutorial](./tutorial-five-go-amqp10) we learned how to use +topic exchanges to broadcast messages across a distributed system. + +In this tutorial we're going to do something completely different - we're going +to implement a request/response pattern using RabbitMQ. This is useful when you +need a remote procedure call between processes. We'll show how to make an RPC +call using RabbitMQ. + +Our RPC system will consist of a client and a scalable RPC server. We won't be +using JSON - instead we'll encode a function number and use that to decide what +function to invoke on the RPC server. + +Remote procedure call implementation +------------------------------------ + +If you look at the previous tutorials, each tutorial program was pretty +self-contained and didn't depend on anything else. In this one we'll write two +programs; a client that calls a remote function and a server that implements +the remote functions. + +**RPC Server** + +The server exposes a function we can call - it computes Fibonacci numbers. + +```bash +go run rpc_server.go +# => [x] Awaiting RPC requests +``` + +The server handles requests from a queue named `rpc_queue`. See +[the full `rpc_server.go`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/rpc_server.go) for the implementation. + +**RPC Client** + +The client sends a request and waits for the response: + +```bash +go run rpc_client.go 30 +# => [x] Requesting fib(30) +# => [.] Got 832040 +``` + +The client sends the request to the `rpc_queue`, and the server responds on a +reply queue. See [the full `rpc_client.go`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/rpc_client.go) +for the implementation. + +Design pattern +-------------- + +The RPC pattern works like this: + +1. The client sends a request message with a `reply_to` address where it expects the response. +2. The RPC worker (server) performs the work and sends the response to the `reply_to` queue. +3. The client receives the response and processes it. + +The `rabbitmq-amqp-go-client` handles the message flow using the management +API to declare queues and the consumer/publisher APIs to send/receive messages. + +Correlation IDs +--------------- + +In the implementation, each RPC request includes a correlation ID. This ID is +echoed back in the response so the client can match requests with responses in +case multiple RPC calls are in flight. + +Putting it all together +----------------------- + +The full code examples are available at: + +- [`rpc_server.go`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/rpc_server.go) +- [`rpc_client.go`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/rpc_client.go) + +To try this, start the RPC server: + +```bash +go run rpc_server.go +# => [x] Awaiting RPC requests +``` + +And in another terminal, call the RPC function: + +```bash +go run rpc_client.go 10 +# => [x] Requesting fib(10) +# => [.] Got 55 +``` + +The RPC pattern demonstrates how RabbitMQ can be used for synchronous +request/response communication in a distributed system, complementing the +asynchronous patterns shown in earlier tutorials. + +Congratulations! We've covered the basics of RabbitMQ messaging patterns with AMQP 1.0. For more advanced topics, see the [RabbitMQ documentation](/docs/amqp) and [AMQP 1.0 client libraries](/client-libraries/amqp-client-libraries). diff --git a/tutorials/tutorial-six-java-amqp10.md b/tutorials/tutorial-six-java-amqp10.md new file mode 100644 index 000000000..3762748b5 --- /dev/null +++ b/tutorials/tutorial-six-java-amqp10.md @@ -0,0 +1,27 @@ +--- +title: RabbitMQ tutorial - RPC (AMQP 1.0) +--- + + +# RabbitMQ tutorial - RPC (AMQP 1.0) + +The AMQP 1.0 Java edition of this tutorial is not yet published on this site. + +For the same concepts using the **AMQP 0-9-1** Java client, see [RPC](./tutorial-six-java). + +For other languages, the [tutorials index](./) lists AMQP 1.0 versions where available. diff --git a/tutorials/tutorial-three-dotnet-amqp10.md b/tutorials/tutorial-three-dotnet-amqp10.md new file mode 100644 index 000000000..e112e3376 --- /dev/null +++ b/tutorials/tutorial-three-dotnet-amqp10.md @@ -0,0 +1,89 @@ +--- +title: RabbitMQ tutorial - Publish/Subscribe (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T3DiagramToC from '@site/src/components/Tutorials/T3DiagramToC.md'; +import T3DiagramExchange from '@site/src/components/Tutorials/T3DiagramExchange.md'; +import T3DiagramBinding from '@site/src/components/Tutorials/T3DiagramBinding.md'; + +# RabbitMQ tutorial - Publish/Subscribe + +## Publish/Subscribe +### (using the AMQP 1.0 .NET client) + + + + + +In the [previous tutorial](./tutorial-two-dotnet-amqp10) we created a work +queue. In this tutorial we'll deliver a message to **multiple** consumers — the "publish/subscribe" pattern. + +We'll build a simple logging system: one program emits logs, and one or more receivers print them. + +Exchanges +--------- + +The producer sends messages to an _exchange_, not directly to a queue. The exchange routes messages to queues according to its type. + + + +Declare a `fanout` exchange named `logs`: + +```csharp +IExchangeSpecification exchangeSpec = management.Exchange(exchangeName).Type("fanout"); +await exchangeSpec.DeclareAsync(); +``` + +Bindings +-------- + +Bind a temporary exclusive queue to the exchange: + + + +```csharp +IQueueSpecification tempQueue = management.Queue().Exclusive(true).AutoDelete(true); +IQueueInfo queueInfo = await tempQueue.DeclareAsync(); +string queueName = queueInfo.Name(); + +IBindingSpecification binding = management.Binding() + .SourceExchange(exchangeSpec) + .DestinationQueue(queueName) + .Key(string.Empty); +await binding.BindAsync(); +``` + +The publisher uses `PublisherBuilder().Exchange(exchangeName)`; the consumer subscribes on `queueName` and calls `ctx.Accept()` in the handler. + +### Running + +From `dotnet-amqp`: + +```bash +dotnet run --project ReceiveLogs/ReceiveLogs.csproj +dotnet run --project EmitLog/EmitLog.csproj +``` + +### Source + +- [`EmitLog/Program.cs`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/dotnet-amqp/EmitLog/Program.cs) +- [`ReceiveLogs/Program.cs`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/dotnet-amqp/ReceiveLogs/Program.cs) + +Now we can move on to [tutorial 4](./tutorial-four-dotnet-amqp10) and learn how to route messages based on routing keys. diff --git a/tutorials/tutorial-three-go-amqp10.md b/tutorials/tutorial-three-go-amqp10.md new file mode 100644 index 000000000..2ad3ddf9d --- /dev/null +++ b/tutorials/tutorial-three-go-amqp10.md @@ -0,0 +1,303 @@ +--- +title: RabbitMQ tutorial - Publish/Subscribe (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T3DiagramToC from '@site/src/components/Tutorials/T3DiagramToC.md'; +import T3DiagramExchange from '@site/src/components/Tutorials/T3DiagramExchange.md'; +import T3DiagramBinding from '@site/src/components/Tutorials/T3DiagramBinding.md'; + +# RabbitMQ tutorial - Publish/Subscribe + +## Publish/Subscribe +### (using the AMQP 1.0 Go client) + + + + + +In the [previous tutorial](./tutorial-two-go-amqp10) we created a work +queue. We assumed that the tasks were distributed among multiple workers. In this tutorial we'll do something completely different — we'll deliver a +message to multiple consumers. This pattern is known as "publish/subscribe". + +To illustrate the pattern, we're going to build a simple logging system. It +will consist of two programs — the first will emit log messages, and the +second will receive and print them. + +In our logging system every running copy of the receiver program will get +the messages. That way we'll be able to run one receiver and direct the +logs to the disk, and at the same time run another receiver in the terminal +and see the logs printed there. + +Essentially, published log messages are going to be broadcast to all the +receivers. + +Exchanges +--------- + +In the previous tutorials we sent and received messages to and from a queue. +Now it's time to introduce the full messaging model in RabbitMQ. + +Let's quickly cover what we've learned: + +- A _producer_ is a user application that sends messages. +- A _queue_ is a buffer that stores messages. +- A _consumer_ is a user application that receives messages. + +The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to a queue at all. + +Instead, the producer can only send messages to an _exchange_. An exchange is a +very simple thing. On one side it receives messages from producers and the +other side it pushes them to queues. The exchange must know exactly what to do +with a message it receives. Should it be appended to a particular queue? Should +it be appended to many queues? Or should it get discarded? The rules for that +are defined by the _exchange type_. + + + +There are a few exchange types available: `direct`, `topic`, `headers` and `fanout`. We'll focus on the +last one — the `fanout`. Let's create an exchange of this type: + +```go +_, err = conn.Management().DeclareExchange(ctx, &rmq.FanOutExchangeSpecification{Name: "logs"}) +if err != nil { + log.Panicf("Failed to declare an exchange: %v", err) +} +``` + +The `fanout` exchange broadcasts all the messages it receives to all the +queues it knows about. That's exactly what we need for our logger. + +Bindings +-------- + +We've already created a `fanout` exchange and a queue. Now we need to tell the +exchange to send messages to our queue. That relationship between exchange and a +queue is called a _binding_. + + + +```go +qInfo, err := conn.Management().DeclareQueue(ctx, &rmq.AutoGeneratedQueueSpecification{ + IsExclusive: true, + IsAutoDelete: true, +}) +if err != nil { + log.Panicf("Failed to declare a queue: %v", err) +} + +_, err = conn.Management().Bind(ctx, &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: "logs", + DestinationQueue: qInfo.Name(), + BindingKey: "", +}) +if err != nil { + log.Panicf("Failed to bind a queue: %v", err) +} +``` + +From now on the `logs` exchange will append messages to our queue. An exclusive queue is one which is only accessible by the declaring consumer and will be deleted when it disconnects. + +Let's run this code. We'll use `go run` to create the logger consumer: + +```bash +go run receive_logs.go +``` + +Now let's emit some logs and publish a message to the `logs` exchange: + +```bash +go run emit_log.go "Here is the first log" +``` + +Putting it all together +----------------------- + +The producer `emit_log.go`: + +```go +package main + +import ( + "context" + "log" + "os" + "strings" + + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +const brokerURI = "amqp://guest:guest@localhost:5672/" + +func main() { + ctx := context.Background() + env := rmq.NewEnvironment(brokerURI, nil) + conn, err := env.NewConnection(ctx) + if err != nil { + log.Panicf("Failed to connect to RabbitMQ: %v", err) + } + defer func() { + _ = env.CloseConnections(context.Background()) + }() + + _, err = conn.Management().DeclareExchange(ctx, &rmq.FanOutExchangeSpecification{Name: "logs"}) + if err != nil { + log.Panicf("Failed to declare an exchange: %v", err) + } + + publisher, err := conn.NewPublisher(ctx, &rmq.ExchangeAddress{Exchange: "logs", Key: ""}, nil) + if err != nil { + log.Panicf("Failed to create publisher: %v", err) + } + defer func() { _ = publisher.Close(context.Background()) }() + + body := bodyFrom(os.Args) + res, err := publisher.Publish(ctx, rmq.NewMessage([]byte(body))) + if err != nil { + log.Panicf("Failed to publish a message: %v", err) + } + switch res.Outcome.(type) { + case *rmq.StateAccepted: + default: + log.Panicf("Unexpected publish outcome: %v", res.Outcome) + } + log.Printf(" [x] Sent %s", body) +} + +func bodyFrom(args []string) string { + var s string + if (len(args) < 2) || args[1] == "" { + s = "hello" + } else { + s = strings.Join(args[1:], " ") + } + return s +} +``` + +[(emit_log.go source)](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/emit_log.go) + +And the consumer `receive_logs.go`: + +```go +package main + +import ( + "context" + "errors" + "log" + + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +const brokerURI = "amqp://guest:guest@localhost:5672/" + +func main() { + ctx := context.Background() + env := rmq.NewEnvironment(brokerURI, nil) + conn, err := env.NewConnection(ctx) + if err != nil { + log.Panicf("Failed to connect to RabbitMQ: %v", err) + } + defer func() { + _ = env.CloseConnections(context.Background()) + }() + + _, err = conn.Management().DeclareExchange(ctx, &rmq.FanOutExchangeSpecification{Name: "logs"}) + if err != nil { + log.Panicf("Failed to declare an exchange: %v", err) + } + + qInfo, err := conn.Management().DeclareQueue(ctx, &rmq.AutoGeneratedQueueSpecification{ + IsExclusive: true, + IsAutoDelete: true, + }) + if err != nil { + log.Panicf("Failed to declare a queue: %v", err) + } + + _, err = conn.Management().Bind(ctx, &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: "logs", + DestinationQueue: qInfo.Name(), + BindingKey: "", + }) + if err != nil { + log.Panicf("Failed to bind a queue: %v", err) + } + + consumer, err := conn.NewConsumer(ctx, qInfo.Name(), nil) + if err != nil { + log.Panicf("Failed to create consumer: %v", err) + } + defer func() { _ = consumer.Close(context.Background()) }() + + log.Printf(" [*] Waiting for logs. To exit press CTRL+C") + for { + delivery, err := consumer.Receive(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + log.Panicf("Failed to receive a message: %v", err) + } + msg := delivery.Message() + var body string + if len(msg.Data) > 0 { + body = string(msg.Data[0]) + } + log.Printf(" [x] %s", body) + err = delivery.Accept(ctx) + if err != nil { + log.Panicf("Failed to accept message: %v", err) + } + } +} +``` + +[(receive_logs.go source)](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/receive_logs.go) + +To try this, open a terminal and run the receiver: + +```bash +go run receive_logs.go +# => [*] Waiting for logs. To exit press CTRL+C +``` + +Then, in other terminal(s), run the emitter: + +```bash +go run emit_log.go "Here is the first log" +# => [x] Sent Here is the first log +go run emit_log.go "Here is the second log" +# => [x] Sent Here is the second log +``` + +The consumer receives every message the publisher sends. Run as many receivers as +you like — they'll each get their own copy of the messages. + +Received logs will appear in the receiving terminals as: + +```bash +go run receive_logs.go +# => [*] Waiting for logs. To exit press CTRL+C +# => [x] Here is the first log +# => [x] Here is the second log +``` + +Now we can move on to [tutorial 4](./tutorial-four-go-amqp10) and learn how to route messages based on routing keys. diff --git a/tutorials/tutorial-three-java-amqp10.md b/tutorials/tutorial-three-java-amqp10.md new file mode 100644 index 000000000..051fd02ce --- /dev/null +++ b/tutorials/tutorial-three-java-amqp10.md @@ -0,0 +1,27 @@ +--- +title: RabbitMQ tutorial - Publish/Subscribe (AMQP 1.0) +--- + + +# RabbitMQ tutorial - Publish/Subscribe (AMQP 1.0) + +The AMQP 1.0 Java edition of this tutorial is not yet published on this site. + +For the same concepts using the **AMQP 0-9-1** Java client, see [Publish/Subscribe](./tutorial-three-java). + +For other languages, the [tutorials index](./) lists AMQP 1.0 versions where available. diff --git a/tutorials/tutorial-two-dotnet-amqp10.md b/tutorials/tutorial-two-dotnet-amqp10.md new file mode 100644 index 000000000..d09a13eb6 --- /dev/null +++ b/tutorials/tutorial-two-dotnet-amqp10.md @@ -0,0 +1,133 @@ +--- +title: RabbitMQ tutorial - Work Queues (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T2DiagramToC from '@site/src/components/Tutorials/T2DiagramToC.md'; +import T2DiagramPrefetch from '@site/src/components/Tutorials/T2DiagramPrefetch.md'; + +# RabbitMQ tutorial - Work Queues + +## Work Queues +### (using the AMQP 1.0 .NET client) + + + + + +In the [first tutorial](./tutorial-one-dotnet-amqp10) we +wrote programs to send and receive messages from a named queue. In this +one we'll create a _Work Queue_ that will be used to distribute +time-consuming tasks among multiple workers. + +The main idea behind Work Queues (aka: _Task Queues_) is to avoid +doing a resource-intensive task immediately and having to wait for +it to complete. Instead we schedule the task to be done later. We encapsulate a +_task_ as a message and send it to a queue. A worker process running +in the background will pop the tasks and eventually execute the +job. When you run many workers the tasks will be shared between them. + +This concept is especially useful in web applications where it's +impossible to handle a complex task during a short HTTP request +window. + +This tutorial uses the [RabbitMQ AMQP 1.0 .NET client](/client-libraries/amqp-client-libraries) (`RabbitMQ.AMQP.Client`). It requires RabbitMQ **4.0 or later**. + +Preparation +----------- + +In the previous part of this tutorial we sent a message containing +"Hello World!". Now we'll be sending strings that stand for complex +tasks. We fake work by sleeping: each `.` in the string adds one second. + +The producer is `NewTask/Program.cs`; the consumer is `Worker/Program.cs`. + +`NewTask` publishes to a quorum queue `task_queue` and checks `OutcomeState.Accepted`: + +```csharp +IQueueSpecification queueSpec = management.Queue(taskQueueName).Type(QueueType.QUORUM); +await queueSpec.DeclareAsync(); + +IPublisher publisher = await connection.PublisherBuilder().Queue(taskQueueName).BuildAsync(); +// ... +PublishResult pr = await publisher.PublishAsync(amqpMessage); +if (pr.Outcome.State != OutcomeState.Accepted) +{ + Console.Error.WriteLine($"Unexpected publish outcome: {pr.Outcome.State}"); + Environment.Exit(1); +} +``` + +`Worker` uses `InitialCredits(1)` for fair dispatch and calls `ctx.Accept()` after `DoWork` in a `finally` block: + +```csharp +IConsumer consumer = await connection.ConsumerBuilder() + .Queue(taskQueueName) + .InitialCredits(1) + .MessageHandler((ctx, message) => + { + string body = Encoding.UTF8.GetString(message.Body()!); + Console.WriteLine($" [x] Received '{body}'"); + try + { + DoWork(body); + } + finally + { + Console.WriteLine(" [x] Done"); + ctx.Accept(); + } + + return Task.CompletedTask; + }) + .BuildAndStartAsync(); +``` + +Round-robin dispatching +----------------------- + +Run two workers and publish tasks from a third terminal (from `dotnet-amqp`): + +```bash +dotnet run --project Worker/Worker.csproj +dotnet run --project Worker/Worker.csproj +dotnet run --project NewTask/NewTask.csproj "First message." +dotnet run --project NewTask/NewTask.csproj "Second message.." +``` + +By default, RabbitMQ sends each message to the next consumer in sequence (round-robin). + +Message acknowledgment +---------------------- + +With AMQP 1.0, the consumer must **settle** each message (`Accept`, etc.). Settle **after** work completes so a crash mid-task allows redelivery. + +Fair dispatch +------------- + + + +Use **`InitialCredits(1)`** on the consumer builder so only one un-settled message is in flight per consumer (similar to prefetch 1 / `basicQos` in AMQP 0-9-1). + +Putting it all together +----------------------- + +See [`NewTask/Program.cs`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/dotnet-amqp/NewTask/Program.cs) and [`Worker/Program.cs`](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/dotnet-amqp/Worker/Program.cs) for the full sources (once merged upstream). + +Now we can move on to [tutorial 3](./tutorial-three-dotnet-amqp10) and learn how to deliver the same message to many consumers. diff --git a/tutorials/tutorial-two-go-amqp10.md b/tutorials/tutorial-two-go-amqp10.md new file mode 100644 index 000000000..bcea377a4 --- /dev/null +++ b/tutorials/tutorial-two-go-amqp10.md @@ -0,0 +1,393 @@ +--- +title: RabbitMQ tutorial - Work Queues (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T2DiagramToC from '@site/src/components/Tutorials/T2DiagramToC.md'; +import T2DiagramPrefetch from '@site/src/components/Tutorials/T2DiagramPrefetch.md'; + +# RabbitMQ tutorial - Work Queues + +## Work Queues +### (using the AMQP 1.0 Go client) + + + + + +In the [first tutorial](./tutorial-one-go-amqp10) we +wrote programs to send and receive messages from a named queue. In this +one we'll create a _Work Queue_ that will be used to distribute +time-consuming tasks among multiple workers. + +The main idea behind Work Queues (aka: _Task Queues_) is to avoid +doing a resource-intensive task immediately and having to wait for +it to complete. Instead we schedule the task to be done later. We encapsulate a +_task_ as a message and send it to a queue. A worker process running +in the background will pop the tasks and eventually execute the +job. When you run many workers the tasks will be shared between them. + +This concept is especially useful in web applications where it's +impossible to handle a complex task during a short HTTP request +window. + +This tutorial uses the [RabbitMQ AMQP 1.0 Go client](/client-libraries/amqp-client-libraries) (`rabbitmq-amqp-go-client`). It requires RabbitMQ **4.0 or later**. + +Preparation +----------- + +In the previous part of this tutorial we sent a message containing +"Hello World!". Now we'll be sending strings that stand for complex +tasks. We don't have a real-world task, like images to be resized or +pdf files to be rendered, so let's fake it by just pretending we're +busy - by using the `time.Sleep` function. We'll take the number of dots +in the string as its complexity; every dot will account for one second +of "work". For example, a fake task described by `Hello...` +will take three seconds. + +We will slightly modify the _send.go_ code from our previous example, +to allow arbitrary messages to be sent from the command line. This +program will schedule tasks to our work queue, so let's name it +`new_task.go`: + +```go +body := bodyFrom(os.Args) +res, err := publisher.Publish(ctx, rmq.NewMessage([]byte(body))) +if err != nil { + log.Panicf("Failed to publish a message: %v", err) +} +switch res.Outcome.(type) { +case *rmq.StateAccepted: +default: + log.Panicf("Unexpected publish outcome: %v", res.Outcome) +} +log.Printf(" [x] Sent %s", body) + +func bodyFrom(args []string) string { + var s string + if (len(args) < 2) || args[1] == "" { + s = "hello" + } else { + s = strings.Join(args[1:], " ") + } + return s +} +``` + +Our old consumer program also requires some changes: it needs to +fake a second of work for every dot in the message body. It will handle +delivered messages and perform the task, so let's call it `worker.go`: + +```go +consumer, err := conn.NewConsumer(ctx, "task_queue", &rmq.ConsumerOptions{InitialCredits: 1}) +if err != nil { + log.Panicf("Failed to create consumer: %v", err) +} +defer func() { _ = consumer.Close(context.Background()) }() + +log.Printf(" [*] Waiting for messages. To exit press CTRL+C") +for { + delivery, err := consumer.Receive(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + log.Panicf("Failed to receive a message: %v", err) + } + msg := delivery.Message() + var payload []byte + if len(msg.Data) > 0 { + payload = msg.Data[0] + } + log.Printf("Received a message: %s", payload) + dotCount := bytes.Count(payload, []byte(".")) + t := time.Duration(dotCount) + time.Sleep(t * time.Second) + log.Printf("Done") + err = delivery.Accept(ctx) + if err != nil { + log.Panicf("Failed to accept message: %v", err) + } +} +``` + +Note the use of `ConsumerOptions{InitialCredits: 1}` to limit messages in flight (the AMQP 1.0 analog to prefetch). + +Run each program using `go run`: + +```bash +go run new_task.go hello world +go run worker.go +``` + +Round-robin dispatching +----------------------- + +One of the advantages of using a Task Queue is the ability to easily +parallelise work. If we are building up a backlog of work, we can just +add more workers and that way, scale easily. + +First, let's try to run two worker instances at the same time. They +will both get messages from the queue, but how exactly? Let's see. + +You need three terminals open. Two will run the worker +program. These terminals will be our two consumers - C1 and C2. + +```bash +# shell 1 +go run worker.go +# => [*] Waiting for messages. To exit press CTRL+C +``` + +```bash +# shell 2 +go run worker.go +# => [*] Waiting for messages. To exit press CTRL+C +``` + +In the third one we'll publish new tasks. Once you've started +the consumers you can publish a few messages: + +```bash +# shell 3 +go run new_task.go First message. +# => [x] Sent 'First message.' +go run new_task.go Second message.. +# => [x] Sent 'Second message..' +go run new_task.go Third message... +# => [x] Sent 'Third message...' +``` + +Let's see what is delivered to our workers: + +```bash +go run worker.go +# => [*] Waiting for messages. To exit press CTRL+C +# => Received a message: First message. +# => Done +# => Received a message: Third message... +# => Done +``` + +```bash +go run worker.go +# => [*] Waiting for messages. To exit press CTRL+C +# => Received a message: Second message.. +# => Done +``` + +By default, RabbitMQ will send each message to the next consumer, +in sequence. On average every consumer will get the same number of +messages. This way of distributing messages is called round-robin. Try +this out with three or more workers. + + +Message acknowledgment +---------------------- + +Doing a task can take a few seconds, you may wonder what happens if +a consumer starts a long task and it terminates before it completes. +With AMQP 1.0, the consumer must **settle** each message (`Accept`, `Discard`, or `Requeue`). Until you settle, the broker can redeliver the message if the worker stops. + +In order to make sure a message is not lost when a worker dies after receiving it but before finishing processing, settle only **after** the task is done. Here we call `delivery.Accept(ctx)` after `time.Sleep` completes. + +If a consumer dies without settling, RabbitMQ will redeliver the message to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die. + +[The complete `worker.go` file](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/worker.go) uses `delivery.Accept` after work completes. + +> #### Forgotten settlement +> +> It's a common mistake to omit `delivery.Accept()` (or to call it before work finishes). Messages will be redelivered when your client quits (which may look like random redelivery), and unacknowledged messages accumulate on the broker. +> +> You can use `rabbitmqctl` to inspect queues: +> +> ```bash +> sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged +> ``` +> +> On Windows, drop the sudo: +> ```bash +> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged +> ``` + +Fair dispatch +---------------- + +You might have noticed that the dispatching still doesn't work exactly +as we want. For example in a situation with two workers, when all +odd messages are heavy and even messages are light, one worker will be +constantly busy and the other one will do hardly any work. Well, +RabbitMQ doesn't know anything about that and will still dispatch +messages evenly. + +This happens because the broker may deliver multiple messages before earlier ones are settled. + + + +With the AMQP 1.0 Go client, limit how many messages are **in flight** per consumer by setting **`InitialCredits`** to `1` on the consumer builder. This is the analogue of prefetch 1 in AMQP 0-9-1: + +```go +consumer, err := conn.NewConsumer(ctx, "task_queue", &rmq.ConsumerOptions{InitialCredits: 1}) +``` + +This ensures that RabbitMQ won't prefetch messages; instead, it will only deliver a new message after the current one is settled. + +> #### Note about queue size +> +> If all the workers are busy, your queue can fill up. You will want to keep an +> eye on that, and maybe add more workers, or have some other strategy. + +Putting it all together +----------------------- + +Final outline of `new_task.go`: + +```go +package main + +import ( + "context" + "log" + "os" + "strings" + + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +const brokerURI = "amqp://guest:guest@localhost:5672/" + +func main() { + ctx := context.Background() + env := rmq.NewEnvironment(brokerURI, nil) + conn, err := env.NewConnection(ctx) + if err != nil { + log.Panicf("Failed to connect to RabbitMQ: %v", err) + } + defer func() { + _ = env.CloseConnections(context.Background()) + }() + + _, err = conn.Management().DeclareQueue(ctx, &rmq.QuorumQueueSpecification{Name: "task_queue"}) + if err != nil { + log.Panicf("Failed to declare a queue: %v", err) + } + + publisher, err := conn.NewPublisher(ctx, &rmq.QueueAddress{Queue: "task_queue"}, nil) + if err != nil { + log.Panicf("Failed to create publisher: %v", err) + } + defer func() { _ = publisher.Close(context.Background()) }() + + body := bodyFrom(os.Args) + res, err := publisher.Publish(ctx, rmq.NewMessage([]byte(body))) + if err != nil { + log.Panicf("Failed to publish a message: %v", err) + } + switch res.Outcome.(type) { + case *rmq.StateAccepted: + default: + log.Panicf("Unexpected publish outcome: %v", res.Outcome) + } + log.Printf(" [x] Sent %s", body) +} + +func bodyFrom(args []string) string { + var s string + if (len(args) < 2) || args[1] == "" { + s = "hello" + } else { + s = strings.Join(args[1:], " ") + } + return s +} +``` + +[(new_task.go source)](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/new_task.go) + +And `worker.go`: + +```go +package main + +import ( + "bytes" + "context" + "errors" + "log" + "time" + + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +const brokerURI = "amqp://guest:guest@localhost:5672/" + +func main() { + ctx := context.Background() + env := rmq.NewEnvironment(brokerURI, nil) + conn, err := env.NewConnection(ctx) + if err != nil { + log.Panicf("Failed to connect to RabbitMQ: %v", err) + } + defer func() { + _ = env.CloseConnections(context.Background()) + }() + + _, err = conn.Management().DeclareQueue(ctx, &rmq.QuorumQueueSpecification{Name: "task_queue"}) + if err != nil { + log.Panicf("Failed to declare a queue: %v", err) + } + + consumer, err := conn.NewConsumer(ctx, "task_queue", &rmq.ConsumerOptions{InitialCredits: 1}) + if err != nil { + log.Panicf("Failed to create consumer: %v", err) + } + defer func() { _ = consumer.Close(context.Background()) }() + + log.Printf(" [*] Waiting for messages. To exit press CTRL+C") + for { + delivery, err := consumer.Receive(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + log.Panicf("Failed to receive a message: %v", err) + } + msg := delivery.Message() + var payload []byte + if len(msg.Data) > 0 { + payload = msg.Data[0] + } + log.Printf("Received a message: %s", payload) + dotCount := bytes.Count(payload, []byte(".")) + t := time.Duration(dotCount) + time.Sleep(t * time.Second) + log.Printf("Done") + err = delivery.Accept(ctx) + if err != nil { + log.Panicf("Failed to accept message: %v", err) + } + } +} +``` + +[(worker.go source)](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go-amqp/worker.go) + +Using explicit settlement and `InitialCredits: 1` you can set up a work queue. Quorum queues ensure tasks survive broker restarts. + +Now we can move on to [tutorial 3](./tutorial-three-go-amqp10) and learn how to deliver the same message to many consumers. diff --git a/tutorials/tutorial-two-java-amqp10.md b/tutorials/tutorial-two-java-amqp10.md new file mode 100644 index 000000000..8b7a73272 --- /dev/null +++ b/tutorials/tutorial-two-java-amqp10.md @@ -0,0 +1,407 @@ +--- +title: RabbitMQ tutorial - Work Queues (AMQP 1.0) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T2DiagramToC from '@site/src/components/Tutorials/T2DiagramToC.md'; +import T2DiagramPrefetch from '@site/src/components/Tutorials/T2DiagramPrefetch.md'; + +# RabbitMQ tutorial - Work Queues + +## Work Queues +### (using the AMQP 1.0 Java client) + + + + + +In the [first tutorial](./tutorial-one-java-amqp10) we +wrote programs to send and receive messages from a named queue. In this +one we'll create a _Work Queue_ that will be used to distribute +time-consuming tasks among multiple workers. + +The main idea behind Work Queues (aka: _Task Queues_) is to avoid +doing a resource-intensive task immediately and having to wait for +it to complete. Instead we schedule the task to be done later. We encapsulate a +_task_ as a message and send it to a queue. A worker process running +in the background will pop the tasks and eventually execute the +job. When you run many workers the tasks will be shared between them. + +This concept is especially useful in web applications where it's +impossible to handle a complex task during a short HTTP request +window. + +This tutorial uses the [RabbitMQ AMQP 1.0 Java client](/client-libraries/amqp-client-libraries) (`com.rabbitmq.client:amqp-client`). It requires RabbitMQ **4.0 or later**. Runnable sources live in the [RabbitMQ tutorials](https://github.com/rabbitmq/rabbitmq-tutorials) repository (`java-amqp` directory). + +Preparation +----------- + +In the previous part of this tutorial we sent a message containing +"Hello World!". Now we'll be sending strings that stand for complex +tasks. We don't have a real-world task, like images to be resized or +pdf files to be rendered, so let's fake it by just pretending we're +busy - by using the `Thread.sleep()` function. We'll take the number of dots +in the string as its complexity; every dot will account for one second +of "work". For example, a fake task described by `Hello...` +will take three seconds. + +We will slightly modify the _Send.java_ pattern from our previous example, +to allow arbitrary messages to be sent from the command line. This +program will schedule tasks to our work queue, so let's name it +`NewTask.java`: + +```java +String message = String.join(" ", argv); +// ... obtain a Publisher for the task queue, then: +publisher.publish( + publisher.message(message.getBytes(StandardCharsets.UTF_8)).durable(true), + context -> { /* wait for broker outcome; print on ACCEPTED */ }); +``` + +Our old consumer program also requires some changes: it needs to +fake a second of work for every dot in the message body. It will handle +delivered messages and perform the task, so let's call it `Worker.java`: + +```java +connection.consumerBuilder() + .queue(TASK_QUEUE_NAME) + .messageHandler((context, message) -> { + String text = new String(message.body(), StandardCharsets.UTF_8); + System.out.println(" [x] Received '" + text + "'"); + try { + doWork(text); + } finally { + System.out.println(" [x] Done"); + context.accept(); + } + }) + .build(); +``` + +Settlement (`accept`) is covered in more detail below; the worker must **not** use `preSettled()` delivery if tasks must survive worker failures. + +Our fake task to simulate execution time: + +```java +private static void doWork(String task) throws InterruptedException { + for (char ch: task.toCharArray()) { + if (ch == '.') Thread.sleep(1000); + } +} +``` + +Build and run with Maven as in [tutorial one](./tutorial-one-java-amqp10) (same `com.rabbitmq.client:amqp-client` dependency and Exec plugin). + +Round-robin dispatching +----------------------- + +One of the advantages of using a Task Queue is the ability to easily +parallelise work. If we are building up a backlog of work, we can just +add more workers and that way, scale easily. + +First, let's try to run two worker instances at the same time. They +will both get messages from the queue, but how exactly? Let's see. + +You need three consoles open. Two will run the worker +program. These consoles will be our two consumers - C1 and C2. + +```bash +# shell 1 +mvn -q compile exec:java -Dexec.mainClass=Worker +# => [*] Waiting for messages. To exit press CTRL+C +``` + +```bash +# shell 2 +mvn -q compile exec:java -Dexec.mainClass=Worker +# => [*] Waiting for messages. To exit press CTRL+C +``` + +In the third one we'll publish new tasks. Once you've started +the consumers you can publish a few messages: + +```bash +# shell 3 +mvn -q compile exec:java -Dexec.mainClass=NewTask -Dexec.args='First message.' +# => [x] Sent 'First message.' +mvn -q compile exec:java -Dexec.mainClass=NewTask -Dexec.args='Second message..' +# => [x] Sent 'Second message..' +``` + +Use your shell's quoting rules so the task string is passed as program arguments to `NewTask` (the examples above work with `-Dexec.args` under Unix; on Windows you may prefer running from your IDE or a small script). + +Let's see what is delivered to our workers: + +```bash +mvn -q compile exec:java -Dexec.mainClass=Worker +# => [*] Waiting for messages. To exit press CTRL+C +# => [x] Received 'First message.' +# => [x] Received 'Third message...' +# => [x] Received 'Fifth message.....' +``` + +```bash +mvn -q compile exec:java -Dexec.mainClass=Worker +# => [*] Waiting for messages. To exit press CTRL+C +# => [x] Received 'Second message..' +# => [x] Received 'Fourth message....' +``` + +By default, RabbitMQ will send each message to the next consumer, +in sequence. On average every consumer will get the same number of +messages. This way of distributing messages is called round-robin. Try +this out with three or more workers. + + +Message acknowledgment +---------------------- + +Doing a task can take a few seconds, you may wonder what happens if +a consumer starts a long task and it terminates before it completes. +With the default **at-least-once** consumption mode, the consumer must **settle** each message (`accept`, `discard`, or `requeue`). Until you settle, the broker can redeliver the message if the worker stops. + +In order to make sure a message is not lost when a worker dies after receiving it but before finishing processing, settle only **after** the task is done. Here we call `context.accept()` in a `finally` block after `doWork` returns. + +If a consumer dies without settling, RabbitMQ will redeliver the message. If there are other consumers online at the same time, it will quickly redeliver it +to another consumer. That way you can be sure that no message is lost, +even if the workers occasionally die. + +A timeout is enforced on consumer delivery acknowledgement (see [Delivery Acknowledgement Timeout](/docs/consumers#acknowledgement-timeout)). + +Do **not** enable `ConsumerBuilder.preSettled()` for this tutorial: that mode delivers messages already settled and they cannot be redelivered if the worker crashes. + +```java +connection.consumerBuilder() + .queue(TASK_QUEUE_NAME) + .messageHandler((context, message) -> { + String text = new String(message.body(), StandardCharsets.UTF_8); + System.out.println(" [x] Received '" + text + "'"); + try { + doWork(text); + } finally { + System.out.println(" [x] Done"); + context.accept(); + } + }) + .build(); +``` + +Using this pattern, if you terminate a worker using +CTRL+C while it was processing a message, the message can be redelivered to another consumer once the acknowledgement timeout elapses. + +> #### Forgotten settlement +> +> It's a common mistake to omit `context.accept()` (or to call it before work finishes). Messages will be redelivered +> when your client quits (which may look like random redelivery), and unacked deliveries accumulate on the broker. +> +> You can use `rabbitmqctl` to inspect queues: +> +> ```bash +> sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged +> ``` +> +> On Windows, drop the sudo: +> ```bash +> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged +> ``` + +Message durability +------------------ + +We have learned how to make sure that even if the consumer dies, the +task isn't lost before settlement. But our tasks will still be lost if the RabbitMQ server stops. + +When RabbitMQ quits or crashes it will forget queues and messages unless you configure durability. +In modern versions of RabbitMQ, that is 4.0 or later, transient non-exclusive queues are deprecated. +In virtually all uses cases, you should use durable queues. Quorum Queues are **always** durable. +Classic Queues can be made durable, which is a strong recommendation. + +This library makes durable queues by default. For messages, mark outbound messages as **durable** when publishing. Set `durable(true)` on the `Message` to mark messages as durable. This is important for +Classic Queues. In Quorum Queues, messages are always durable. + +First, declare a **quorum** queue named `task_queue` (not `hello`, to avoid clashing with an existing non-durable queue from experiments): + +```java +connection.management().queue("task_queue").quorum().queue().declare(); +``` + +Apply the same declaration in both producer and consumer before publishing or consuming. + +When publishing, build a durable message: + +```java +publisher.publish( + publisher.message(message.getBytes(StandardCharsets.UTF_8)).durable(true), + callback); +``` + +> #### Note on message persistence +> +> Marking messages as durable does not fully guarantee that a message +> will never be lost. There is still a small window around broker I/O. For stronger guarantees consider [publisher confirms](/docs/confirms) and the AMQP 1.0 client's per-publish callbacks (`Publisher.Status`). + + +Fair dispatch +---------------- + +You might have noticed that the dispatching still doesn't work exactly +as we want. For example in a situation with two workers, when all +odd messages are heavy and even messages are light, one worker will be +constantly busy and the other one will do hardly any work. Well, +RabbitMQ doesn't know anything about that and will still dispatch +messages evenly. + +This happens because the broker may dispatch several messages before earlier ones are settled. + + + +With the AMQP 1.0 Java client, limit how many messages are **in flight** per consumer by setting **initial credits** to `1` on the consumer builder. This is the analogue of `basicQos(1)` / prefetch 1 in AMQP 0-9-1: + +```java +connection.consumerBuilder() + .queue(TASK_QUEUE_NAME) + .initialCredits(1) + .messageHandler((context, message) -> { ... }) + .build(); +``` + +> #### Note about queue size +> +> If all the workers are busy, your queue can fill up. You will want to keep an +> eye on that, and maybe add more workers, or have some other strategy. + +Putting it all together +----------------------- + +Final outline of `NewTask.java`: + +```java +import com.rabbitmq.client.amqp.Connection; +import com.rabbitmq.client.amqp.Environment; +import com.rabbitmq.client.amqp.Publisher; +import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class NewTask { + + private static final String TASK_QUEUE_NAME = "task_queue"; + + public static void main(String[] argv) throws Exception { + try (Environment environment = new AmqpEnvironmentBuilder() + .connectionSettings() + .uri("amqp://guest:guest@localhost:5672/%2f") + .environmentBuilder() + .build(); + Connection connection = environment.connectionBuilder().build()) { + + connection.management().queue(TASK_QUEUE_NAME).quorum().queue().declare(); + + String message = String.join(" ", argv); + + try (Publisher publisher = connection.publisherBuilder().queue(TASK_QUEUE_NAME).build()) { + CountDownLatch latch = new CountDownLatch(1); + publisher.publish( + publisher.message(message.getBytes(StandardCharsets.UTF_8)).durable(true), + context -> { + if (context.status() == Publisher.Status.ACCEPTED) { + System.out.println(" [x] Sent '" + message + "'"); + } + latch.countDown(); + }); + if (!latch.await(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("Timed out waiting for publish outcome"); + } + } + } + } +} +``` + +[(NewTask.java source)](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/java-amqp/NewTask.java) + +And `Worker.java`: + +```java +import com.rabbitmq.client.amqp.Connection; +import com.rabbitmq.client.amqp.Consumer; +import com.rabbitmq.client.amqp.Environment; +import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; + +public class Worker { + + private static final String TASK_QUEUE_NAME = "task_queue"; + + public static void main(String[] argv) throws Exception { + Environment environment = new AmqpEnvironmentBuilder() + .connectionSettings() + .uri("amqp://guest:guest@localhost:5672/%2f") + .environmentBuilder() + .build(); + Connection connection = environment.connectionBuilder().build(); + + connection.management().queue(TASK_QUEUE_NAME).quorum().queue().declare(); + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + Consumer consumer = connection.consumerBuilder() + .queue(TASK_QUEUE_NAME) + .initialCredits(1) + .messageHandler((context, message) -> { + String text = new String(message.body(), StandardCharsets.UTF_8); + System.out.println(" [x] Received '" + text + "'"); + try { + doWork(text); + } finally { + System.out.println(" [x] Done"); + context.accept(); + } + }) + .build(); + + new CountDownLatch(1).await(); + } + + private static void doWork(String task) { + for (char ch : task.toCharArray()) { + if (ch == '.') { + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + } + } +} +``` + +[(Worker.java source)](https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/java-amqp/Worker.java) + +Using explicit settlement and `initialCredits(1)` you can set up a +work queue. Quorum queues and durable messages let tasks survive broker restarts as in the AMQP 0-9-1 tutorial. + +For API details see the [AMQP 1.0 Java client Javadoc](https://rabbitmq.github.io/rabbitmq-amqp-java-client/stable/api/). + +Now we can move on to [tutorial 3](./tutorial-three-java-amqp10) and learn how +to deliver the same message to many consumers.