Collector

Source code: event_collector.collector.py

This module contains definitions for the Collector class.

A Collector is a pipeline object for processing payloads. Collectors are designed for processing incoming data from a message or event queue.

Type Aliases

type event_collector.collector.CollectorList = Sequence[Collector]
type event_collector.collector.CollectorDict = Mapping[str, Collector]
type event_collector.collector.Processor = Callable[[Payload, Context], Awaitable[Result]]
type event_collector.collector.LifecycleManager = Callable[[Context], AbstractAsyncContextManager[None]]

Collector

class event_collector.collector.Collector(key: str, processor: Processor, *, topics: Sequence[str] | None = None, ctx: Context | None = None, lifecycle: LifecycleManager | None = None)

Collector is a container class to store attributes and functions to process event payloads. It accepts the following parameters:

key: str

The collector key is used for routing event payloads by direct key.

topics: Sequence[str]

Topics is a list of topics the collector is listening to.

ctx: Context

The Context object is used for storing application-specific values.

processor: Processor

The processor is the event handler.

lifecycle: LifecycleManager | None

The lifecycle async context manager is used to run setup and shutdown operations.

async process(payload: Payload) Result

Process an event payload.

Usage

Example Collector initialization:

from contextlib import asynccontextmanager

from event_collector import Collector, Context, Payload, Result


async def event_handler(payload: Payload, ctx: Context) -> Result:
    """The event handler."""
    ...

@asynccontextmanager
async def event_handler_lifecycle(ctx: Context) -> None:
    """Perform setup and shutdown operations."""
    try:
        print("setup")
        ...
        yield
    finally:
        print("shutdown")

 """Initialize a Collector."""
 key = "example-collector"
 collector = Collector(
     key, event_handler, lifecycle=event_handler_lifecycle
 )