Getting Started =============== To run a collector queue with an event collector, we need to perform the following steps: 1. create a ``Collector`` to process events; 2. configure a ``CollectorQueue`` to run background consumer tasks that will wait for input events; 3. run the collector queue and publish events to the queue. 1. Create Collector ------------------- To create a ``Collector`` object to process incoming event payloads, we define a function that takes two parameters: a ``Payload`` and a ``Context``. For this example, we will create an event handler function that returns a ``dict`` as the result, containing the payload ``id`` and a timestamp. The result ``dict`` is wrapped in an ``Ok`` object. To simulate task processing, we will sleep for ``0.25`` second (``250ms``). .. code-block:: python from typing import Any import datetime from event_collector import Collector, Ok, Payload, Result async def event_handler(payload: Payload, ctx: Context) -> Result: """"Process incoming payload and return a Result.""" now = datetime.datetime.now(datetime.timezone.utc).isoformat() data: dict[str, Any] = payload.value result = { "id": data["id"], "received_at": now, } await asyncio.sleep(0.25) return Ok(result) Next, we instantiate a ``Collector`` object, which is a container for the ``processor`` or event handler function (defined as a ``Processor`` type in the ``event_handler.collector`` module). A ``Collector`` object must also have a unique identifying ``key`` attribute. For our example collector, we'll use "``demo-collector``" as the key: .. code-block:: python collector = Collector("demo-collector", event_handler) The collector queue supports various delivery modes and the key is used for *direct key* delivery to specified collectors. For this getting-started example, we don't need to worry about the details of how the key is used. .. note:: The ``Collector`` also accepts a ``lifecycle`` parameter that you can define to perform setup and teardown operations -- like initializing and closing database connections. In this example, we do not need to perform any setup or shutdown steps. The ``event_handler`` function is called when a consumer receives an event in the queue and routes the event payload to the collector. The function call will have ``Payload`` and ``Context`` objects as parameters: * A ``Payload`` is a wrapper for the event payload value, * and ``Context`` is an object for storing application-specific context values (such as a database connection). The event handler must return a ``Result`` object, which is defined as either ``Ok`` or ``Error`` in ``event_collector.data``: .. code-block:: python type Result = Ok | Error * ``Ok`` and ``Error`` are containers to wrap a return value or an error, respectively: .. code-block:: python result = Ok({"status": "OK", "status_code": 200}) result = Error(ValueError("bad input")) # or just Error("bad input") 2. Configure a ``CollectorQueue`` --------------------------------- Next, we'll create a ``CollectorQueue`` to manage a pool of background consumer tasks. To configure the ``CollectorQueue``, we supply the ``Collector`` instance from above (as an item in a list) as a parameter and set ``max_consumers`` to a value that can handle the input load we're expecting. .. code-block:: python from event_collector import Collector, CollectorQueue collector = Collector("demo-collector", event_handler) collector_queue = CollectorQueue( [collector], min_consumers=1, max_consumers=100, ) The ``CollectorQueue`` instance will create and manage a consumer pool and run all ``asyncio`` tasks in the background. The ``collectors`` parameter expects a list of ``Collector`` instances (because you can have multiple collectors as downstream processors). Event payloads sent to the collector queue will be routed to one or more of the collectors provided at initialization -- in this case, since the only collector is the ``demo-collector``, it will receive all input event payloads. The ``CollectorQueue`` is configured with a minimum of one consumer and a maximum of ``100`` consumers. By default, the collector queue will autoscale the consumer pool based on the input event load. .. note:: ``CollectorQueue`` accepts various different configuration parameters. For general use cases, it's enough to set ``collectors``, ``min_consumers``, and ``max_consumers`` and use default values for the rest. 3. Run the ``CollectorQueue`` ----------------------------- To put everything together, we add two helper functions: one to send events to the collector queue (``send_events``), and the the other one to print the return results (``output_results``). .. code-block:: python import asyncio import sys import time from event_collector import Collector, CollectorEvent, CollectorQueue, EventPublisher async def send_events(publisher: EventPublisher) -> list[CollectorEvent]: """ To send events to the consumers, use publisher.send() and supply a Payload object. publisher.send() will return a CollectorEvent instance. """ events: list[CollectorEvent] = [] for i in range(100): data = { "id": f"payload-{i}", } payload = Payload(data) event = await publisher.send(payload) events.append(event) return events async def output_results(events: list[CollectorEvent]) -> None: """ Fetch the result for each CollectorEvent using event.wait() and print out the result. event.wait() will block until the collector event is processed by a consumer and the result is available. """ for event in events: result_list = await event.wait() result = result_list.first() value = result.unwrap() print(value) async def main() -> None: """ Run the CollectorQueue as an async context manager. """ collector = Collector("demo-collector", event_handler) collector_queue = CollectorQueue( [collector], min_consumers=1, max_consumers=100, ) start = time.monotonic_ns() async with collector_queue: publisher = collector_queue.get_publisher() events = await send_events(publisher) await output_results(events) elapsed = (time.monotonic_ns() - start) / 1.0e9 print(f"elapsed={elapsed:.3f}") if __name__ == "__main__": asyncio.run(main()) sys.exit(0) The ``CollectorQueue`` is designed to run as a background service -- in a docker container, for instance -- that listens for input events until it's stopped. For this example we will run the collector queue in a command-line script's ``main`` function and send events in a loop to the background consumers. After sending a batch of event payloads, we'll fetch and print the results from each collector event, then exit. It's important to note here that after sending an event payload to the collector queue, the ``EventPublisher`` will return a ``CollectorEvent``. The ``CollectorEvent`` works just like a `Future `__ object. It's a placeholder for a result that we can retrieve but may not be ready right now. The ``CollectorEvent`` returns a ``ResultList`` when the backend processing by the collectors is done: .. code-block:: python ... # "event" is a CollectorEvent. event = await publisher.send(payload) # event.wait() will block until the ResultList is ready. # "result_list" is a ResultList. result_list = await event.wait() ... A ``ResultList`` is a container for a list of ``Result`` objects. We use a collection (list) as a return value because it's valid for the collector queue to deliver the event payload to more than one collector, which means there may be more than one ``Result`` object being returned. So the ``ResultList`` can contain one or more results depending on the method of delivery. For this example, we only need the first result since we have one collector, so we use the ``ResultList.first()`` method to retrieve the ``Result`` object. The value contained in the ``Result`` object can be obtained using the ``Result.unwrap()`` method. Example for retrieving a ``Result`` from a ``ResultList``: .. code-block:: python # Send payload to collector queue. payload_data = { "data": "payload data", } payload = Payload(payload_data) event = await publisher.send(payload) ... # event.wait() will block until the ResultList is ready. result_list = event.wait() # Get the first result in the result set. result = result_list.first() # Check whether the result object is Ok or Error. match result: case Ok(value): print(value) case Error(_): # result.unwrap() will raise exception. result.unwrap() Complete ``CollectorQueue`` Example ----------------------------------- Here is the complete example script, minus the explanatory comments: .. code-block:: python """ example.py """ from typing import Any import asyncio import datetime import sys import time from event_collector import ( Collector, CollectorEvent, CollectorQueue, Context, EventPublisher, Ok, Payload, Result, ) async def event_handler(payload: Payload, ctx: Context) -> Result: now = datetime.datetime.now(datetime.timezone.utc).isoformat() data: dict[str, Any] = payload.value result = { "id": data["id"], "received_at": now, } await asyncio.sleep(0.25) return Ok(result) async def send_events(publisher: EventPublisher) -> list[CollectorEvent]: events: list[CollectorEvent] = [] for i in range(100): data = { "id": f"payload-{i}", } payload = Payload(data) event = await publisher.send(payload) events.append(event) return events async def output_results(events: list[CollectorEvent]) -> None: for event in events: result_list = await event.wait() result = result_list.first() value = result.unwrap() print(value) async def main() -> None: collector = Collector("demo-collector", event_handler) collector_queue = CollectorQueue( [collector], min_consumers=1, max_consumers=100, ) start = time.monotonic_ns() async with collector_queue: publisher = collector_queue.get_publisher() events = await send_events(publisher) await output_results(events) elapsed = (time.monotonic_ns() - start) / 1.0e9 print(f"elapsed={elapsed:.3f}") if __name__ == "__main__": asyncio.run(main()) To run the script, install the dependencies first with: .. code-block:: bash # if you do not have uv, install it using the instructions here: # https://docs.astral.sh/uv/#installation cd /path/to/event-collector make depend Run the example script with: .. code-block:: bash . .venv/bin/activate python example_collector_queue.py The output should look something like this:: (.venv) [event-collector]$ python example.py {"logger":"CollectorQueue-8f6ad6fc75004f63aa1c17be7a88ef19","metrics_timestamp":"2026-05-01T06:47:14.291420+00:00","qsize":100,"busy":0,"consumers":1,"load":100.0,"avg_load":0.0,"service_time":0.0,"wait_time":0.0,"arrival_rate":147335.9451,"departure_rate":0.0,"wip":0.0,"event_count":0,"event":"collector queue metrics","level":"info","timestamp":"2026-05-01T06:47:14.292887Z"} {'id': 'payload-0', 'received_at': '2026-05-01T06:47:14.293308+00:00'} {'id': 'payload-1', 'received_at': '2026-05-01T06:47:14.304057+00:00'} {'id': 'payload-2', 'received_at': '2026-05-01T06:47:14.304085+00:00'} ... {'id': 'payload-97', 'received_at': '2026-05-01T06:47:14.304848+00:00'} {'id': 'payload-98', 'received_at': '2026-05-01T06:47:14.304855+00:00'} {'id': 'payload-99', 'received_at': '2026-05-01T06:47:14.304862+00:00'} elapsed=0.267 The collector queue emits some log messages indicating that it has started up, then the collectors returned the results of the event handler. Finally, the collector queue shuts down and exits. The output shows that to process ``100`` events, the collector queue only required ``0.267`` second (``267ms``), which is the *service time* of the event handler. This is because the collector queue scaled the consumer pool to ``100`` consumers to handle the input load. This short implementation example shows that we're able to set up an event-processing workflow, run a producer-consumer queue, and fetch the results -- without a lot of external dependencies.