Skip to content

engine.middlewares — Ready-to-use middleware for Aiogram

⬅️ Back | 🏠 Docs Root


ContainerMiddleware

Обязательная мидлварь, которая внедряет DI-контейнер в контекст каждого запроса.

ContainerMiddleware

Bases: BaseMiddleware

Middleware for injecting the Dependency Injection container.

Parameters:

Name Type Description Default
container ContainerProtocol

Object implementing ContainerProtocol.

required
Source code in src/codex_bot/engine/middlewares/container.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class ContainerMiddleware(BaseMiddleware):
    """Middleware for injecting the Dependency Injection container.

    Args:
        container: Object implementing ContainerProtocol.
    """

    def __init__(self, container: ContainerProtocol) -> None:
        self.container = container

    async def __call__(
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        """Injects the container into the context data."""
        data["container"] = self.container
        return await handler(event, data)

Functions

__call__(handler, event, data) async

Injects the container into the context data.

Source code in src/codex_bot/engine/middlewares/container.py
33
34
35
36
37
38
39
40
41
async def __call__(
    self,
    handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
    event: TelegramObject,
    data: dict[str, Any],
) -> Any:
    """Injects the container into the context data."""
    data["container"] = self.container
    return await handler(event, data)

UserValidationMiddleware

Первичная проверка пользователя по БД/Кэшу. Управляет бан-листами и обновляет статус активности.

UserValidationMiddleware

Bases: BaseMiddleware

Middleware for validating users and injecting RBAC data.

Key features: - Universal: Supports all update types via 'event_from_user'. - RBAC: Injects is_admin flag using ContainerProtocol. - Safe: Handles events without an associated user.

Parameters:

Name Type Description Default
container ContainerProtocol

Object implementing ContainerProtocol for admin checks.

required
Source code in src/codex_bot/engine/middlewares/user_validation.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class UserValidationMiddleware(BaseMiddleware):
    """Middleware for validating users and injecting RBAC data.

    Key features:
    - **Universal**: Supports all update types via 'event_from_user'.
    - **RBAC**: Injects ``is_admin`` flag using ContainerProtocol.
    - **Safe**: Handles events without an associated user.

    Args:
        container: Object implementing ContainerProtocol for admin checks.
    """

    def __init__(self, container: ContainerProtocol) -> None:
        self.container = container

    async def __call__(
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        """Processes the incoming event."""
        # aiogram 3.x automatically extracts the user into 'event_from_user'
        user: User | None = data.get("event_from_user")

        if user:
            data["user"] = user
            data["is_admin"] = self.container.is_admin(user.id)
            log.debug(f"UserValidation | user_id={user.id} is_admin={data['is_admin']}")
        else:
            data["is_admin"] = False
            log.debug("UserValidation | No user found in event")

        return await handler(event, data)

Functions

__call__(handler, event, data) async

Processes the incoming event.

Source code in src/codex_bot/engine/middlewares/user_validation.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def __call__(
    self,
    handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
    event: TelegramObject,
    data: dict[str, Any],
) -> Any:
    """Processes the incoming event."""
    # aiogram 3.x automatically extracts the user into 'event_from_user'
    user: User | None = data.get("event_from_user")

    if user:
        data["user"] = user
        data["is_admin"] = self.container.is_admin(user.id)
        log.debug(f"UserValidation | user_id={user.id} is_admin={data['is_admin']}")
    else:
        data["is_admin"] = False
        log.debug("UserValidation | No user found in event")

    return await handler(event, data)

ThrottlingMiddleware

Защита от спама (Rate Limit). Использует Redis для контроля частоты запросов.

ThrottlingMiddleware

Bases: BaseMiddleware

Rate limiting middleware via atomic Redis SET NX.

One network request instead of two (EXISTS + SET). Atomicity eliminates race conditions during parallel updates.

Parameters:

Name Type Description Default
redis Any

Async Redis client (redis.asyncio.Redis).

required
rate_limit float

Minimum interval between requests in seconds. Supports fractional values (e.g., 0.5).

1.0
Example
from redis.asyncio import Redis
from codex_bot.engine.middlewares import ThrottlingMiddleware

redis = Redis.from_url("redis://localhost")
builder.add_middleware(ThrottlingMiddleware(redis=redis, rate_limit=0.5))
Source code in src/codex_bot/engine/middlewares/throttling.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class ThrottlingMiddleware(BaseMiddleware):
    """
    Rate limiting middleware via atomic Redis SET NX.

    One network request instead of two (EXISTS + SET).
    Atomicity eliminates race conditions during parallel updates.

    Args:
        redis: Async Redis client (``redis.asyncio.Redis``).
        rate_limit: Minimum interval between requests in seconds.
                    Supports fractional values (e.g., ``0.5``).

    Example:
        ```python
        from redis.asyncio import Redis
        from codex_bot.engine.middlewares import ThrottlingMiddleware

        redis = Redis.from_url("redis://localhost")
        builder.add_middleware(ThrottlingMiddleware(redis=redis, rate_limit=0.5))
        ```
    """

    def __init__(self, redis: Any, rate_limit: float = 1.0) -> None:
        self.redis = redis
        self.rate_limit = rate_limit

    async def __call__(
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        user_id: int | None = event.from_user.id if hasattr(event, "from_user") and event.from_user else None

        if not user_id:
            return await handler(event, data)

        key = f"throttle:{user_id}"

        # Atomic operation: creates a key with TTL only if it didn't exist.
        # Returns True on creation, None if the key already existed.
        # px = milliseconds, supports fractional rate_limit (0.5 → 500 ms)
        is_new = await self.redis.set(key, "1", px=int(self.rate_limit * 1000), nx=True)

        if not is_new:
            log.warning(f"Throttling | user={user_id} blocked")
            if isinstance(event, CallbackQuery):
                await event.answer("⏳ Not so fast!", show_alert=False)
            return None

        return await handler(event, data)

DirectorMiddleware

Автоматическая инициализация объекта Director для текущего запроса.

DirectorMiddleware

Bases: BaseMiddleware

Middleware that injects a Director instance into the handler data.

Uses 'ContextHelper' to normalize IDs from different event types. Depends on: - ContainerMiddleware (for data["container"]) - state (provided by aiogram Dispatcher/FSM)

Source code in src/codex_bot/engine/middlewares/director_middleware.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class DirectorMiddleware(BaseMiddleware):
    """
    Middleware that injects a `Director` instance into the handler data.

    Uses 'ContextHelper' to normalize IDs from different event types.
    Depends on:
    - `ContainerMiddleware` (for `data["container"]`)
    - `state` (provided by aiogram Dispatcher/FSM)
    """

    async def __call__(
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        # 1. Extract required dependencies from context
        container = data.get("container")
        state: FSMContext | None = data.get("state")

        if not container:
            log.error("DirectorMiddleware | 'container' not found in data. Check middleware order!")
            return await handler(event, data)

        # 2. Extract unified context using Helper
        ctx = ContextHelper.extract_base_context(event)

        # 3. Instantiate Director
        director = Director(
            container=container,
            state=state,
            session_key=ctx.user_id,
            context_id=ctx.chat_id,
            trigger_id=ctx.message_id,
        )

        # 4. Inject into data for handlers
        data["director"] = director

        return await handler(event, data)

DatabaseTransactionMiddleware

Автоматическое создание и фиксация транзакции SQLAlchemy для каждого запроса.

DatabaseTransactionMiddleware

Bases: BaseMiddleware

Middleware for managing the lifecycle of an async database session.

Injects data["db_session"] into the handler context.

Source code in src/codex_bot/engine/middlewares/database.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class DatabaseTransactionMiddleware(BaseMiddleware):
    """
    Middleware for managing the lifecycle of an async database session.

    Injects `data["db_session"]` into the handler context.
    """

    def __init__(self, session_maker: async_sessionmaker[AsyncSession]):
        """
        Args:
            session_maker: SQLAlchemy async_sessionmaker instance.
        """
        self.session_maker = session_maker

    async def __call__(
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        # Start a new session for the current event context
        async with self.session_maker() as session:
            # Add session to data so it's accessible in handlers and other middlewares
            data["db_session"] = session

            try:
                # Pass control to the handler
                result = await handler(event, data)

                # Success: Commit all changes made during the event
                await session.commit()
                return result

            except Exception as e:
                # Error: Rollback changes to keep DB consistent
                log.error(f"Transaction failed, rolling back: {e}")
                await session.rollback()
                # Re-raise the exception for the global error handler
                raise e

Functions

__init__(session_maker)

Parameters:

Name Type Description Default
session_maker async_sessionmaker[AsyncSession]

SQLAlchemy async_sessionmaker instance.

required
Source code in src/codex_bot/engine/middlewares/database.py
27
28
29
30
31
32
def __init__(self, session_maker: async_sessionmaker[AsyncSession]):
    """
    Args:
        session_maker: SQLAlchemy async_sessionmaker instance.
    """
    self.session_maker = session_maker

FSMContextI18nManager

Управление локализацией на основе данных FSM.

FSMContextI18nManager

Bases: BaseManager

Language manager via FSM storage (Redis).

Locale determination priority: 1. FSM storage (key "locale") — user explicitly selected a language. 2. Telegram language_code — if it's in the allowed_locales list. 3. default_locale — fallback.

Parameters:

Name Type Description Default
allowed_locales list[str] | None

List of allowed language codes (e.g., ["ru", "en", "de"]).

None
default_locale str

Default language if nothing matches.

'en'
Example
from aiogram_i18n import I18nMiddleware
from aiogram_i18n.cores import FluentRuntimeCore

i18n = I18nMiddleware(
    core=FluentRuntimeCore(path="locales/{locale}"),
    manager=FSMContextI18nManager(allowed_locales=["ru", "en"], default_locale="en"),
    default_locale="en",
)
i18n.setup(dp)
Source code in src/codex_bot/engine/middlewares/i18n.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class FSMContextI18nManager(BaseManager):
    """
    Language manager via FSM storage (Redis).

    Locale determination priority:
    1. FSM storage (key "locale") — user explicitly selected a language.
    2. Telegram language_code — if it's in the allowed_locales list.
    3. default_locale — fallback.

    Args:
        allowed_locales: List of allowed language codes (e.g., ["ru", "en", "de"]).
        default_locale: Default language if nothing matches.

    Example:
        ```python
        from aiogram_i18n import I18nMiddleware
        from aiogram_i18n.cores import FluentRuntimeCore

        i18n = I18nMiddleware(
            core=FluentRuntimeCore(path="locales/{locale}"),
            manager=FSMContextI18nManager(allowed_locales=["ru", "en"], default_locale="en"),
            default_locale="en",
        )
        i18n.setup(dp)
        ```
    """

    def __init__(
        self,
        allowed_locales: list[str] | None = None,
        default_locale: str = "en",
    ) -> None:
        super().__init__(default_locale=default_locale)
        self.allowed_locales: list[str] = allowed_locales or []

    async def get_locale(self, event_from_user: User | None = None, **kwargs: Any) -> str:
        """
        Determines the user's current locale.

        Args:
            event_from_user: Telegram user.
            **kwargs: aiogram-i18n context (includes "state").

        Returns:
            String locale code (e.g., "ru").
        """
        state: FSMContext | None = kwargs.get("state")
        if state:
            locale = await StateHelper.get_value(state, "locale")
            if isinstance(locale, str):
                return locale

        if event_from_user:
            lang = event_from_user.language_code
            if lang and (not self.allowed_locales or lang in self.allowed_locales):
                return lang

        return str(self.default_locale)

    async def set_locale(self, locale: str, **kwargs: Any) -> None:
        """
        Saves the selected locale to FSM.

        Args:
            locale: Language code to save.
            **kwargs: Context (includes "state").
        """
        state: FSMContext | None = kwargs.get("state")
        if state:
            await StateHelper.update_value(state, "locale", locale)

Functions

get_locale(event_from_user=None, **kwargs) async

Determines the user's current locale.

Parameters:

Name Type Description Default
event_from_user User | None

Telegram user.

None
**kwargs Any

aiogram-i18n context (includes "state").

{}

Returns:

Type Description
str

String locale code (e.g., "ru").

Source code in src/codex_bot/engine/middlewares/i18n.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def get_locale(self, event_from_user: User | None = None, **kwargs: Any) -> str:
    """
    Determines the user's current locale.

    Args:
        event_from_user: Telegram user.
        **kwargs: aiogram-i18n context (includes "state").

    Returns:
        String locale code (e.g., "ru").
    """
    state: FSMContext | None = kwargs.get("state")
    if state:
        locale = await StateHelper.get_value(state, "locale")
        if isinstance(locale, str):
            return locale

    if event_from_user:
        lang = event_from_user.language_code
        if lang and (not self.allowed_locales or lang in self.allowed_locales):
            return lang

    return str(self.default_locale)

set_locale(locale, **kwargs) async

Saves the selected locale to FSM.

Parameters:

Name Type Description Default
locale str

Language code to save.

required
**kwargs Any

Context (includes "state").

{}
Source code in src/codex_bot/engine/middlewares/i18n.py
81
82
83
84
85
86
87
88
89
90
91
async def set_locale(self, locale: str, **kwargs: Any) -> None:
    """
    Saves the selected locale to FSM.

    Args:
        locale: Language code to save.
        **kwargs: Context (includes "state").
    """
    state: FSMContext | None = kwargs.get("state")
    if state:
        await StateHelper.update_value(state, "locale", locale)