Skip to content

engine.middlewares — Ready-to-use middleware for Aiogram

⬅️ Back | 🏠 Docs Root

UserValidationMiddleware

UserValidationMiddleware

Bases: BaseMiddleware

User validation middleware in an incoming event.

For Message and CallbackQuery: - Blocks events without from_user (returns None). - Adds data["user"] — an aiogram User object — for handlers.

For other event types (Update, etc.) — passes without checking, allowing the bot to send messages to channels.

Example
dp.message.middleware(UserValidationMiddleware())
dp.callback_query.middleware(UserValidationMiddleware())
Source code in src/codex_bot/engine/middlewares/user_validation.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class UserValidationMiddleware(BaseMiddleware):
    """
    User validation middleware in an incoming event.

    For Message and CallbackQuery:
    - Blocks events without from_user (returns None).
    - Adds `data["user"]` — an aiogram User object — for handlers.

    For other event types (Update, etc.) — passes without checking,
    allowing the bot to send messages to channels.

    Example:
        ```python
        dp.message.middleware(UserValidationMiddleware())
        dp.callback_query.middleware(UserValidationMiddleware())
        ```
    """

    async def __call__(
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        if isinstance(event, Message | CallbackQuery):
            user = event.from_user
            if not user:
                log.warning(f"UserValidation | no from_user event_type={type(event).__name__}")
                return None
            data["user"] = user

        return await handler(event, data)

ThrottlingMiddleware

ThrottlingMiddleware

Bases: BaseMiddleware

Rate limiting middleware via atomic Redis SET NX.

One network request instead of two (EXISTS + SET). Atomicity eliminates race conditions during parallel updates.

Parameters:

Name Type Description Default
redis Any

Async Redis client (redis.asyncio.Redis).

required
rate_limit float

Minimum interval between requests in seconds. Supports fractional values (e.g., 0.5).

1.0
Example
from redis.asyncio import Redis
from codex_bot.engine.middlewares import ThrottlingMiddleware

redis = Redis.from_url("redis://localhost")
builder.add_middleware(ThrottlingMiddleware(redis=redis, rate_limit=0.5))
Source code in src/codex_bot/engine/middlewares/throttling.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class ThrottlingMiddleware(BaseMiddleware):
    """
    Rate limiting middleware via atomic Redis SET NX.

    One network request instead of two (EXISTS + SET).
    Atomicity eliminates race conditions during parallel updates.

    Args:
        redis: Async Redis client (``redis.asyncio.Redis``).
        rate_limit: Minimum interval between requests in seconds.
                    Supports fractional values (e.g., ``0.5``).

    Example:
        ```python
        from redis.asyncio import Redis
        from codex_bot.engine.middlewares import ThrottlingMiddleware

        redis = Redis.from_url("redis://localhost")
        builder.add_middleware(ThrottlingMiddleware(redis=redis, rate_limit=0.5))
        ```
    """

    def __init__(self, redis: Any, rate_limit: float = 1.0) -> None:
        self.redis = redis
        self.rate_limit = rate_limit

    async def __call__(
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        user_id: int | None = event.from_user.id if hasattr(event, "from_user") and event.from_user else None

        if not user_id:
            return await handler(event, data)

        key = f"throttle:{user_id}"

        # Atomic operation: creates a key with TTL only if it didn't exist.
        # Returns True on creation, None if the key already existed.
        # px = milliseconds, supports fractional rate_limit (0.5 → 500 ms)
        is_new = await self.redis.set(key, "1", px=int(self.rate_limit * 1000), nx=True)

        if not is_new:
            log.warning(f"Throttling | user={user_id} blocked")
            if isinstance(event, CallbackQuery):
                await event.answer("⏳ Not so fast!", show_alert=False)
            return None

        return await handler(event, data)

ContainerMiddleware

ContainerMiddleware

Bases: BaseMiddleware

Middleware for injecting a DI container into handlers.

Passes any container object via data["container"]. Does not know about the specific container type — works with any object.

Parameters:

Name Type Description Default
container Any

Project's DI container (any object).

required
Example
container = BotContainer(settings=settings, redis=redis)
dp.update.middleware(ContainerMiddleware(container=container))

# In a handler:
async def my_handler(callback: CallbackQuery, container: BotContainer):
    result = await container.booking_service.get_slots()
Source code in src/codex_bot/engine/middlewares/container.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class ContainerMiddleware(BaseMiddleware):
    """
    Middleware for injecting a DI container into handlers.

    Passes any container object via `data["container"]`.
    Does not know about the specific container type — works with any object.

    Args:
        container: Project's DI container (any object).

    Example:
        ```python
        container = BotContainer(settings=settings, redis=redis)
        dp.update.middleware(ContainerMiddleware(container=container))

        # In a handler:
        async def my_handler(callback: CallbackQuery, container: BotContainer):
            result = await container.booking_service.get_slots()
        ```
    """

    def __init__(self, container: Any) -> None:
        self.container = container

    async def __call__(
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        data["container"] = self.container
        return await handler(event, data)

FSMContextI18nManager

FSMContextI18nManager

Bases: BaseManager

Language manager via FSM storage (Redis).

Locale determination priority: 1. FSM storage (key "locale") — user explicitly selected a language. 2. Telegram language_code — if it's in the allowed_locales list. 3. default_locale — fallback.

Parameters:

Name Type Description Default
allowed_locales list[str] | None

List of allowed language codes (e.g., ["ru", "en", "de"]).

None
default_locale str

Default language if nothing matches.

'en'
Example
from aiogram_i18n import I18nMiddleware
from aiogram_i18n.cores import FluentRuntimeCore

i18n = I18nMiddleware(
    core=FluentRuntimeCore(path="locales/{locale}"),
    manager=FSMContextI18nManager(allowed_locales=["ru", "en"], default_locale="en"),
    default_locale="en",
)
i18n.setup(dp)
Source code in src/codex_bot/engine/middlewares/i18n.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class FSMContextI18nManager(BaseManager):
    """
    Language manager via FSM storage (Redis).

    Locale determination priority:
    1. FSM storage (key "locale") — user explicitly selected a language.
    2. Telegram language_code — if it's in the allowed_locales list.
    3. default_locale — fallback.

    Args:
        allowed_locales: List of allowed language codes (e.g., ["ru", "en", "de"]).
        default_locale: Default language if nothing matches.

    Example:
        ```python
        from aiogram_i18n import I18nMiddleware
        from aiogram_i18n.cores import FluentRuntimeCore

        i18n = I18nMiddleware(
            core=FluentRuntimeCore(path="locales/{locale}"),
            manager=FSMContextI18nManager(allowed_locales=["ru", "en"], default_locale="en"),
            default_locale="en",
        )
        i18n.setup(dp)
        ```
    """

    def __init__(
        self,
        allowed_locales: list[str] | None = None,
        default_locale: str = "en",
    ) -> None:
        self.allowed_locales: list[str] = allowed_locales or []
        self.default_locale: str = default_locale

    async def get_locale(self, event_from_user: User | None = None, **kwargs: Any) -> str:
        """
        Determines the user's current locale.

        Args:
            event_from_user: Telegram user.
            **kwargs: aiogram-i18n context (includes "state").

        Returns:
            String locale code (e.g., "ru").
        """
        state: FSMContext | None = kwargs.get("state")
        if state:
            locale = await StateHelper.get_value(state, "locale")
            if isinstance(locale, str):
                return locale

        if event_from_user:
            lang = event_from_user.language_code
            if lang and (not self.allowed_locales or lang in self.allowed_locales):
                return lang

        return str(self.default_locale)

    async def set_locale(self, locale: str, **kwargs: Any) -> None:
        """
        Saves the selected locale to FSM.

        Args:
            locale: Language code to save.
            **kwargs: Context (includes "state").
        """
        state: FSMContext | None = kwargs.get("state")
        if state:
            await StateHelper.update_value(state, "locale", locale)

Functions

get_locale(event_from_user=None, **kwargs) async

Determines the user's current locale.

Parameters:

Name Type Description Default
event_from_user User | None

Telegram user.

None
**kwargs Any

aiogram-i18n context (includes "state").

{}

Returns:

Type Description
str

String locale code (e.g., "ru").

Source code in src/codex_bot/engine/middlewares/i18n.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def get_locale(self, event_from_user: User | None = None, **kwargs: Any) -> str:
    """
    Determines the user's current locale.

    Args:
        event_from_user: Telegram user.
        **kwargs: aiogram-i18n context (includes "state").

    Returns:
        String locale code (e.g., "ru").
    """
    state: FSMContext | None = kwargs.get("state")
    if state:
        locale = await StateHelper.get_value(state, "locale")
        if isinstance(locale, str):
            return locale

    if event_from_user:
        lang = event_from_user.language_code
        if lang and (not self.allowed_locales or lang in self.allowed_locales):
            return lang

    return str(self.default_locale)

set_locale(locale, **kwargs) async

Saves the selected locale to FSM.

Parameters:

Name Type Description Default
locale str

Language code to save.

required
**kwargs Any

Context (includes "state").

{}
Source code in src/codex_bot/engine/middlewares/i18n.py
80
81
82
83
84
85
86
87
88
89
90
async def set_locale(self, locale: str, **kwargs: Any) -> None:
    """
    Saves the selected locale to FSM.

    Args:
        locale: Language code to save.
        **kwargs: Context (includes "state").
    """
    state: FSMContext | None = kwargs.get("state")
    if state:
        await StateHelper.update_value(state, "locale", locale)