Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ updates:
versions: [ "[6.0,)" ]
- dependency-name: "org.junit.platform:*"
versions: [ "[6.0,)" ]
- package-ecosystem: "maven"
directory: "/java-amqp"
schedule:
interval: "daily"
open-pull-requests-limit: 20
target-branch: "main"
ignore:
- dependency-name: "org.junit.jupiter:*"
versions: [ "[6.0,)" ]
- dependency-name: "org.junit.platform:*"
versions: [ "[6.0,)" ]
- package-ecosystem: "maven"
directory: "/java-stream-mvn"
schedule:
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@ target/
.classpath
.project
.settings

/.cursor/plans
13 changes: 13 additions & 0 deletions dotnet-amqp/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Set the default behavior, in case people don't have core.autocrlf set.
* text=auto

# Auto detect text files and perform LF normalization
*.cs text=auto eol=lf
*.txt text=auto

# Declare files that will always have CRLF line endings on checkout.
*.sln text eol=crlf
*.csproj text eol=crlf

# Custom for Visual Studio
*.cs diff=csharp
6 changes: 6 additions & 0 deletions dotnet-amqp/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*.dll
*.exe
*.lock.json
packages/
bin/
obj/
14 changes: 14 additions & 0 deletions dotnet-amqp/EmitLog/EmitLog.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.AMQP.Client" Version="1.0.0" />
</ItemGroup>

</Project>
46 changes: 46 additions & 0 deletions dotnet-amqp/EmitLog/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Text;
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;

const string brokerUri = "amqp://guest:guest@localhost:5672/%2f";
const string exchangeName = "logs";

string message = args.Length < 1 ? "info: Hello World!" : string.Join(" ", args);

ConnectionSettings settings = ConnectionSettingsBuilder.Create()
.Uri(new Uri(brokerUri))
.ContainerId("tutorial-emitlog")
.Build();

IEnvironment environment = AmqpEnvironment.Create(settings);
IConnection connection = await environment.CreateConnectionAsync();

try
{
IManagement management = connection.Management();
IExchangeSpecification exchangeSpec = management.Exchange(exchangeName).Type("fanout");
await exchangeSpec.DeclareAsync();

IPublisher publisher = await connection.PublisherBuilder().Exchange(exchangeName).BuildAsync();
try
{
var amqpMessage = new AmqpMessage(Encoding.UTF8.GetBytes(message));
PublishResult pr = await publisher.PublishAsync(amqpMessage);
if (pr.Outcome.State != OutcomeState.Accepted)
{
Console.Error.WriteLine($"Unexpected publish outcome: {pr.Outcome.State}");
Environment.Exit(1);
}

Console.WriteLine($" [x] Sent '{message}'");
}
finally
{
await publisher.CloseAsync();
}
}
finally
{
await connection.CloseAsync();
await environment.CloseAsync();
}
14 changes: 14 additions & 0 deletions dotnet-amqp/EmitLogDirect/EmitLogDirect.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.AMQP.Client" Version="1.0.0" />
</ItemGroup>

</Project>
59 changes: 59 additions & 0 deletions dotnet-amqp/EmitLogDirect/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System.Text;
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;

const string brokerUri = "amqp://guest:guest@localhost:5672/%2f";
const string exchangeName = "logs_direct";

string severity = GetSeverity(args);
string message = GetMessage(args);

ConnectionSettings settings = ConnectionSettingsBuilder.Create()
.Uri(new Uri(brokerUri))
.ContainerId("tutorial-emitlogdirect")
.Build();

IEnvironment environment = AmqpEnvironment.Create(settings);
IConnection connection = await environment.CreateConnectionAsync();

try
{
IManagement management = connection.Management();
IExchangeSpecification exchangeSpec = management.Exchange(exchangeName).Type("direct");
await exchangeSpec.DeclareAsync();

IPublisher publisher = await connection.PublisherBuilder().Exchange(exchangeName).Key(severity).BuildAsync();
try
{
var amqpMessage = new AmqpMessage(Encoding.UTF8.GetBytes(message));
PublishResult pr = await publisher.PublishAsync(amqpMessage);
if (pr.Outcome.State != OutcomeState.Accepted)
{
Console.Error.WriteLine($"Unexpected publish outcome: {pr.Outcome.State}");
Environment.Exit(1);
}

Console.WriteLine($" [x] Sent '{severity}':'{message}'");
}
finally
{
await publisher.CloseAsync();
}
}
finally
{
await connection.CloseAsync();
await environment.CloseAsync();
}

static string GetSeverity(string[] strings) => strings.Length < 1 ? "info" : strings[0];

static string GetMessage(string[] strings)
{
if (strings.Length < 2)
{
return "Hello World!";
}

return string.Join(" ", strings.Skip(1));
}
14 changes: 14 additions & 0 deletions dotnet-amqp/EmitLogTopic/EmitLogTopic.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.AMQP.Client" Version="1.0.0" />
</ItemGroup>

</Project>
59 changes: 59 additions & 0 deletions dotnet-amqp/EmitLogTopic/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System.Text;
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;

const string brokerUri = "amqp://guest:guest@localhost:5672/%2f";
const string exchangeName = "logs_topic";

string routingKey = GetRouting(args);
string message = GetMessage(args);

ConnectionSettings settings = ConnectionSettingsBuilder.Create()
.Uri(new Uri(brokerUri))
.ContainerId("tutorial-emitlogtopic")
.Build();

IEnvironment environment = AmqpEnvironment.Create(settings);
IConnection connection = await environment.CreateConnectionAsync();

try
{
IManagement management = connection.Management();
IExchangeSpecification exchangeSpec = management.Exchange(exchangeName).Type("topic");
await exchangeSpec.DeclareAsync();

IPublisher publisher = await connection.PublisherBuilder().Exchange(exchangeName).Key(routingKey).BuildAsync();
try
{
var amqpMessage = new AmqpMessage(Encoding.UTF8.GetBytes(message));
PublishResult pr = await publisher.PublishAsync(amqpMessage);
if (pr.Outcome.State != OutcomeState.Accepted)
{
Console.Error.WriteLine($"Unexpected publish outcome: {pr.Outcome.State}");
Environment.Exit(1);
}

Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");
}
finally
{
await publisher.CloseAsync();
}
}
finally
{
await connection.CloseAsync();
await environment.CloseAsync();
}

static string GetRouting(string[] strings) => strings.Length < 1 ? "anonymous.info" : strings[0];

static string GetMessage(string[] strings)
{
if (strings.Length < 2)
{
return "Hello World!";
}

return string.Join(" ", strings.Skip(1));
}
14 changes: 14 additions & 0 deletions dotnet-amqp/NewTask/NewTask.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.AMQP.Client" Version="1.0.0" />
</ItemGroup>

</Project>
46 changes: 46 additions & 0 deletions dotnet-amqp/NewTask/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Text;
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;

const string brokerUri = "amqp://guest:guest@localhost:5672/%2f";
const string taskQueueName = "task_queue";

string message = args.Length > 0 ? string.Join(" ", args) : "Hello World!";

ConnectionSettings settings = ConnectionSettingsBuilder.Create()
.Uri(new Uri(brokerUri))
.ContainerId("tutorial-newtask")
.Build();

IEnvironment environment = AmqpEnvironment.Create(settings);
IConnection connection = await environment.CreateConnectionAsync();

try
{
IManagement management = connection.Management();
IQueueSpecification queueSpec = management.Queue(taskQueueName).Type(QueueType.QUORUM);
await queueSpec.DeclareAsync();

IPublisher publisher = await connection.PublisherBuilder().Queue(taskQueueName).BuildAsync();
try
{
var amqpMessage = new AmqpMessage(Encoding.UTF8.GetBytes(message));
PublishResult pr = await publisher.PublishAsync(amqpMessage);
if (pr.Outcome.State != OutcomeState.Accepted)
{
Console.Error.WriteLine($"Unexpected publish outcome: {pr.Outcome.State}");
Environment.Exit(1);
}

Console.WriteLine($" [x] Sent '{message}'");
}
finally
{
await publisher.CloseAsync();
}
}
finally
{
await connection.CloseAsync();
await environment.CloseAsync();
}
69 changes: 69 additions & 0 deletions dotnet-amqp/PublisherConfirms/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using System.Text;
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;

const string brokerUri = "amqp://guest:guest@localhost:5672/%2f";

ConnectionSettings settings = ConnectionSettingsBuilder.Create()
.Uri(new Uri(brokerUri))
.ContainerId("tutorial-publisherconfirms")
.Build();

IEnvironment environment = AmqpEnvironment.Create(settings);
IConnection connection = await environment.CreateConnectionAsync();

try
{
IManagement management = connection.Management();
string queueName = Guid.NewGuid().ToString();
IQueueSpecification queueSpec = management.Queue(queueName).Exclusive(true).AutoDelete(true);
await queueSpec.DeclareAsync();

IConsumer consumer = await connection.ConsumerBuilder()
.Queue(queueName)
.MessageHandler((ctx, message) =>
{
Console.WriteLine($"Received a message: {message.BodyAsString()}");
ctx.Accept();
return Task.CompletedTask;
})
.BuildAndStartAsync();

try
{
await Task.Delay(300);

IPublisher publisher = await connection.PublisherBuilder().Queue(queueName).BuildAsync();
try
{
const string text = "hello";
PublishResult pr = await publisher.PublishAsync(new AmqpMessage(Encoding.UTF8.GetBytes(text)));
switch (pr.Outcome.State) {
case OutcomeState.Accepted:
Console.WriteLine($" Accepted Message: {pr.Message.BodyAsString()} confirmed");
break;
case OutcomeState.Released: // here the message is not routed
Console.WriteLine($" Released Message: {pr.Message.BodyAsString()} Released");
break;
case OutcomeState.Rejected: // here there is also the error: `pr.Outcome.Error`
Console.WriteLine($"[Publisher] Message: {pr.Message.BodyAsString()} Rejected with error: {pr.Outcome.Error}");
break;
}

await Task.Delay(500);
}
finally
{
await publisher.CloseAsync();
}
}
finally
{
await consumer.CloseAsync();
}
}
finally
{
await connection.CloseAsync();
await environment.CloseAsync();
}
Loading