Skip to content

Feat: Improved implementation of NNG broker support#612

Open
alexted wants to merge 5 commits into
taskiq-python:feature/nngfrom
alexted:feature/add-support-NNG-as-broker
Open

Feat: Improved implementation of NNG broker support#612
alexted wants to merge 5 commits into
taskiq-python:feature/nngfrom
alexted:feature/add-support-NNG-as-broker

Conversation

@alexted
Copy link
Copy Markdown

@alexted alexted commented Apr 26, 2026

Refactoring the #607

@alexted alexted force-pushed the feature/add-support-NNG-as-broker branch from 78a2868 to b8c7212 Compare April 26, 2026 15:37
… affinity policy, and scheduler abstraction.
@alexted alexted mentioned this pull request Apr 26, 2026
Copy link
Copy Markdown
Member

@s3rius s3rius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a whole broker-server implementation, which I'm not quite sure if we want to have it in the main repo.

I thought that it would be like ZeroMQ, but it's much more impressive. Maybe we might consider moving it to another package to keep the main lib thin.

Comment thread taskiq/brokers/nng/hub.py

# ── standalone CLI entry point ────────────────────────────────────────────────

def _build_config() -> HubConfig:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of defining a stand-alone file to run, let's make it a sub-command of the taskiq itself.

You can do it by following our CLI extending guide.

https://taskiq-python.github.io/extending-taskiq/cli.html

Comment thread taskiq/brokers/nng/hub.py
the least-loaded worker instead of relying on NNG round-robin.

**State** — :class:`~taskiq.brokers.nng.storage.InMemoryStore`. All
store operations are synchronous and execute directly on the asyncio event
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
store operations are synchronous and execute directly on the asyncio event
store operations are asynchronous and execute directly on the asyncio event

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods are actually def, not async def, so the wording is correct.

Comment thread taskiq/brokers/nng/broker.py Outdated
"labels": message.labels,
"lease_id": "", # hub assigns the real lease_id at dispatch time
"attempts": int(message.labels.get("attempts", 0)),
"max_retries": int(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have retry-middlewares that do exactly that, I guess we can remove retry functionality from the broker and the hub.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

lease_id = envelope.lease_id # hub-assigned; correct by construction

async def _ack(
_task_id: str = task_id,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating those parameters as defaults, just use them inside the function. In that case your closure will borrow variables from the outer environment. It will make it more rock solid to use.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default arguments here are the canonical fix for late binding in Python. task_id/lease_id/worker_id are reassigned on every iteration, and without snapshotting via defaults, a delayed ack will capture someone else’s values.

Comment thread taskiq/brokers/nng/protocol.py Outdated
payload_b64: str
labels: dict[str, Any] = field(default_factory=dict)
lease_id: str = ""
attempts: int = 0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry-related again.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

status: str = "starting"
version: str = "unknown"

def as_dict(self) -> dict[str, Any]:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to define a function for that. There's a built-in asdict function for dataclasses.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reworked

Comment on lines +135 to +136
separators=(",", ":"),
ensure_ascii=False,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separators will save you like few bytes. But it might make messages more readable.

Suggested change
separators=(",", ":"),
ensure_ascii=False,
ensure_ascii=False,

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the wire format, compactness is the norm, not a micro-optimization. These are bytes on the socket, not logs.

payload: dict[str, Any] = {
"task_id": message.task_id,
"task_name": message.task_name,
"payload_b64": base64.b64encode(message.message).decode("ascii"),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"payload_b64": base64.b64encode(message.message).decode("ascii"),
"payload_b64": base64.b64encode(message.message).decode("utf-8"),

UTF-8 is used in all other places. Let's keep it everywhere to not have any encoding\decoding issues.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Base64 output is ASCII by definition, .decode("ascii") is stricter and semantically more accurate. "Consistency" is not relevant here.

Comment thread taskiq/brokers/nng/hub.py
# ── control plane ─────────────────────────────────────────────────────────

async def _control_handler(self, ctx: Any) -> None:
"""Run one Rep0 context: receive → dispatch → reply, in a loop."""
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Run one Rep0 context: receive dispatch reply, in a loop."""
"""Run one Rep0 context: receive -> dispatch -> reply, in a loop."""

@codecov
Copy link
Copy Markdown

codecov Bot commented May 6, 2026

Codecov Report

❌ Patch coverage is 70.53942% with 213 lines in your changes missing coverage. Please review.
✅ Project coverage is 77.08%. Comparing base (2fd7a3c) to head (fe3b0a5).

Files with missing lines Patch % Lines
taskiq/brokers/nng/broker.py 0.00% 129 Missing ⚠️
taskiq/brokers/nng/hub.py 71.61% 65 Missing ⚠️
taskiq/brokers/nng/storage.py 94.11% 16 Missing ⚠️
taskiq/brokers/nng/protocol.py 96.62% 3 Missing ⚠️
Additional details and impacted files
@@               Coverage Diff               @@
##           feature/nng     #612      +/-   ##
===============================================
- Coverage        78.30%   77.08%   -1.23%     
===============================================
  Files               70       74       +4     
  Lines             2485     3186     +701     
===============================================
+ Hits              1946     2456     +510     
- Misses             539      730     +191     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@alexted
Copy link
Copy Markdown
Author

alexted commented May 7, 2026

That's a whole broker-server implementation, which I'm not quite sure if we want to have it in the main repo.

I thought that it would be like ZeroMQ, but it's much more impressive. Maybe we might consider moving it to another package to keep the main lib thin.

I intended this PR as a full implementation proposal, rather than a requirement to merge it as-is. Feel free to adopt it fully, partially, or not at all, depending on what aligns best with the project’s direction.

@alexted alexted changed the title Feature: add support nng as broker Feat: Full-featured NNG broker with hub-worker architecture May 23, 2026
@alexted alexted changed the title Feat: Full-featured NNG broker with hub-worker architecture Feat: Improved implementation of NNG broker support May 23, 2026
Replace as_dict with dataclasses.asdict;
Remove user-level retry orchestration from broker/hub/protocol
@alexted
Copy link
Copy Markdown
Author

alexted commented May 23, 2026

That's a whole broker-server implementation, which I'm not quite sure if we want to have it in the main repo.

I thought that it would be like ZeroMQ, but it's much more impressive. Maybe we might consider moving it to another package to keep the main lib thin.

Strategically valid, you are right.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants