Skip to content

notifications.registry

registry

Deprecated compatibility imports for :mod:codex_platform.messaging.registry.

Classes

ChannelRegistry

Registry for config-driven delivery channel factories.

Source code in src/codex_platform/messaging/registry.py
class ChannelRegistry:
    """Registry for config-driven delivery channel factories."""

    def __init__(self) -> None:
        self._factories: list[tuple[str, Callable[[Any], DeliveryChannel | None]]] = []

    def register(
        self,
        name: str,
        factory: Callable[[Any], DeliveryChannel | None],
    ) -> None:
        """Register a channel factory."""

        if any(existing == name for existing, _factory in self._factories):
            log.warning("ChannelRegistry | duplicate registration for %s", name)
        self._factories.append((name, factory))

    def build_channels(self, config: Any) -> list[DeliveryChannel]:
        """Build available channels from registered factories."""

        channels: list[DeliveryChannel] = []
        for name, factory in self._factories:
            try:
                channel = factory(config)
                if channel is not None and channel.is_available():
                    channels.append(channel)
                    log.info("ChannelRegistry | %s enabled", name)
                else:
                    log.debug("ChannelRegistry | %s skipped (not configured)", name)
            except Exception:
                log.exception("ChannelRegistry | %s factory failed", name)
        return channels
Functions
register(name, factory)

Register a channel factory.

Source code in src/codex_platform/messaging/registry.py
def register(
    self,
    name: str,
    factory: Callable[[Any], DeliveryChannel | None],
) -> None:
    """Register a channel factory."""

    if any(existing == name for existing, _factory in self._factories):
        log.warning("ChannelRegistry | duplicate registration for %s", name)
    self._factories.append((name, factory))
build_channels(config)

Build available channels from registered factories.

Source code in src/codex_platform/messaging/registry.py
def build_channels(self, config: Any) -> list[DeliveryChannel]:
    """Build available channels from registered factories."""

    channels: list[DeliveryChannel] = []
    for name, factory in self._factories:
        try:
            channel = factory(config)
            if channel is not None and channel.is_available():
                channels.append(channel)
                log.info("ChannelRegistry | %s enabled", name)
            else:
                log.debug("ChannelRegistry | %s skipped (not configured)", name)
        except Exception:
            log.exception("ChannelRegistry | %s factory failed", name)
    return channels