Skip to content

[HELP] How to properly implement the dead_letter logic using Taskiq? #578

@Hspu1

Description

@Hspu1

Hey there, I've encountered with a problem whilst implementating the dead_letter logic using Taskiq.
There's a permanent consumer ack on the error msg, I've actually tried to do nack manually, tried to implement a middleware and to mention some arguments in the config such as

ack_on_error=False

but unsuccessfully. Here's the code below:

taskiq_broker.py

from contextlib import asynccontextmanager
from aio_pika import ExchangeType, connect_robust
from asyncio import run

from taskiq_aio_pika import AioPikaBroker
from pydantic import BaseModel


class RabbitConfig(BaseModel):
    """BaseSettings model_config = SettingsConfigDict (+ .env)"""

    host: str = "localhost"
    port: int = 5672
    username: str = "guest"
    password: str = "guest"
    reconnect_on_fail: bool = True
    reconnect_interval: int = 5
    reconnect_max_attempts: int = 10

    main_exchange: str = "main_x"
    main_queue: str = "main_q"

    dlx_exchange: str = "dlx"
    dlx_queue: str = "dlq"
    dlx_routing_key: str = "dlq"

    declare_queues: bool = True
    declare_exchange: bool = True
    queue_durable: bool = True
    exchange_durable: bool = True

    max_connection_pool_size: int = 3
    prefetch_count: int = 1
    socket_timeout: int = 30 
    heartbeat: int = 60 
    blocked_connection_timeout: int = 60 


@asynccontextmanager
async def get_connection(config: RabbitConfig):
    connection = await connect_robust(
        url=f"amqp://{config.username}:{config.password}@{config.host}:{config.port}",
        timeout=config.socket_timeout,
        heartbeat=config.heartbeat
    )
    try:
        yield connection
    finally:
        await connection.close()


async def declare_dlx(config: RabbitConfig):
    async with get_connection(config) as connection:
        async with connection.channel() as channel:
            dlx = await channel.declare_exchange(config.dlx_exchange, ExchangeType.DIRECT, durable=True)
            dlq = await channel.declare_queue(
                config.dlx_queue,
                durable=True,
                arguments={
                    "x-queue-type": "quorum",
                    "x-message-ttl": 24 * 60 * 60 * 1000 
                }
            )
            await dlq.bind(dlx, config.dlx_routing_key)


async def setup_broker_async() -> AioPikaBroker:
    config = RabbitConfig()
    await declare_dlx(config)

    broker = AioPikaBroker(
        url=f"amqp://{config.username}:{config.password}@{config.host}:{config.port}",
        reconnect_on_fail=config.reconnect_on_fail,
        reconnect_interval=config.reconnect_interval,
        reconnect_max_attempts=config.reconnect_max_attempts,

        queue_name=config.main_queue,
        exchange_name=config.main_exchange,
        declare_queues=config.declare_queues,
        declare_exchange=config.declare_exchange,
        queue_durable=config.queue_durable,
        exchange_durable=config.exchange_durable,
        queue_arguments={
            "x-dead-letter-exchange": config.dlx_exchange,
            "x-dead-letter-routing-key": config.dlx_routing_key,
            "x-queue-type": "quorum",
            "x-max-priority": 3
        },

        max_connection_pool_size=config.max_connection_pool_size,
        prefetch_count=config.prefetch_count,
        socket_timeout=config.socket_timeout,
        heartbeat=config.heartbeat,
        blocked_connection_timeout=config.blocked_connection_timeout,
    )

    return broker


def setup_broker() -> AioPikaBroker:
    return run(setup_broker_async())


broker = setup_broker()

lifespan.py

from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

from fastapi import FastAPI

from app.core.taskiq_broker import broker


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    if not broker.is_worker_process:
        await broker.startup()
    app.state.broker = broker

    yield

    if not broker.is_worker_process:
        await broker.shutdown()

task example:

from app.core.taskiq_broker import broker


@broker.task(task_name="save_email", timeout=40, priority=0, retry_count=2, retry_backoff=True, retry_backoff_delay=60, retry_jitter=True)
async def send_email_async(recipient: EmailStr, subject: str, body: str):
    # await send_email(recipient=recipient, subject=subject, body=body)
    raise Exception("Test DLQ")

As you can see, I'm testing my dead_letter implementation for now. After making RabbitMQ clean, I run the command below:

taskiq worker app.core.taskiq_broker:broker app.google_mailing.send_email

And I receive this:

taskiq worker app.core.taskiq_broker:broker app.google_mailing.send_email
[2026-01-07 18:56:07,388][taskiq.worker][INFO   ][MainProcess] Pid of a main process: 1476
[2026-01-07 18:56:07,389][taskiq.worker][INFO   ][MainProcess] Starting 2 worker processes.
[2026-01-07 18:56:07,395][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 19976 
[2026-01-07 18:56:07,396][taskiq.process-manager][INFO   ][MainProcess] Started process worker-1 with pid 21072 
[2026-01-07 18:56:08,411][taskiq.worker][INFO   ][worker-0] Importing tasks from module app.google_mailing.send_email
[2026-01-07 18:56:08,433][taskiq.worker][INFO   ][worker-1] Importing tasks from module app.google_mailing.send_email
[2026-01-07 18:56:08,484][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
[2026-01-07 18:56:08,485][taskiq.receiver.receiver][INFO   ][worker-1] Listening started.
[2026-01-07 18:56:19,176][taskiq.receiver.receiver][INFO   ][worker-0] Executing task save_email with ID: 7002250d91b044fcba9ca2cfa7f64077
[2026-01-07 18:56:19,187][taskiq.receiver.receiver][ERROR  ][worker-0] Exception found while executing function: Test DLQ
Traceback (most recent call last):
  File "C:\Users\Макс\Notifications-P\venv\Lib\site-packages\taskiq\receiver\receiver.py", line 263, in run_task
    target_future = await target_future
                    ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Макс\Notifications-P\app\google_mailing\send_email.py", line 32, in send_email_async
    raise Exception("Test DLQ")
Exception: Test DLQ

Next, what I see in RabbitMQ UI is that:
Image

What am I supposed to do and what is your implementation of dead_letter logic?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions