Getting Started

To run a collector queue with an event collector, we need to perform the following steps:

  1. create a Collector to process events;

  2. configure a CollectorQueue to run background consumer tasks that will wait for input events;

  3. run the collector queue and publish events to the queue.

1. Create Collector

To create a Collector object to process incoming event payloads, we define a function that takes two parameters: a Payload and a Context. For this example, we will create an event handler function that returns a dict as the result, containing the payload id and a timestamp. The result dict is wrapped in an Ok object. To simulate task processing, we will sleep for 0.25 second (250ms).

from typing import Any

import datetime

from event_collector import Collector, Ok, Payload, Result

async def event_handler(payload: Payload, ctx: Context) -> Result:
    """"Process incoming payload and return a Result."""
    now = datetime.datetime.now(datetime.timezone.utc).isoformat()
    data: dict[str, Any] = payload.value

    result = {
        "id": data["id"],
        "received_at": now,
    }
    await asyncio.sleep(0.25)

    return Ok(result)

Next, we instantiate a Collector object, which is a container for the processor or event handler function (defined as a Processor type in the event_handler.collector module). A Collector object must also have a unique identifying key attribute. For our example collector, we’ll use “demo-collector” as the key:

collector = Collector("demo-collector", event_handler)

The collector queue supports various delivery modes and the key is used for direct key delivery to specified collectors. For this getting-started example, we don’t need to worry about the details of how the key is used.

Note

The Collector also accepts a lifecycle parameter that you can define to perform setup and teardown operations – like initializing and closing database connections. In this example, we do not need to perform any setup or shutdown steps.

The event_handler function is called when a consumer receives an event in the queue and routes the event payload to the collector. The function call will have Payload and Context objects as parameters:

  • A Payload is a wrapper for the event payload value,

  • and Context is an object for storing application-specific context values (such as a database connection).

The event handler must return a Result object, which is defined as either Ok or Error in event_collector.data:

type Result = Ok | Error
  • Ok and Error are containers to wrap a return value or an error, respectively:

result = Ok({"status": "OK", "status_code": 200})

result = Error(ValueError("bad input")) # or just Error("bad input")

2. Configure a CollectorQueue

Next, we’ll create a CollectorQueue to manage a pool of background consumer tasks. To configure the CollectorQueue, we supply the Collector instance from above (as an item in a list) as a parameter and set max_consumers to a value that can handle the input load we’re expecting.

from event_collector import Collector, CollectorQueue

collector = Collector("demo-collector", event_handler)
collector_queue = CollectorQueue(
        [collector],
        min_consumers=1,
        max_consumers=100,
 )

The CollectorQueue instance will create and manage a consumer pool and run all asyncio tasks in the background.

The collectors parameter expects a list of Collector instances (because you can have multiple collectors as downstream processors). Event payloads sent to the collector queue will be routed to one or more of the collectors provided at initialization – in this case, since the only collector is the demo-collector, it will receive all input event payloads.

The CollectorQueue is configured with a minimum of one consumer and a maximum of 100 consumers. By default, the collector queue will autoscale the consumer pool based on the input event load.

Note

CollectorQueue accepts various different configuration parameters. For general use cases, it’s enough to set collectors, min_consumers, and max_consumers and use default values for the rest.

3. Run the CollectorQueue

To put everything together, we add two helper functions: one to send events to the collector queue (send_events), and the the other one to print the return results (output_results).

import asyncio
import sys
import time

from event_collector import Collector, CollectorEvent, CollectorQueue, EventPublisher

async def send_events(publisher: EventPublisher) -> list[CollectorEvent]:
    """
    To send events to the consumers, use publisher.send() and
    supply a Payload object. publisher.send() will return a
    CollectorEvent instance.
    """
    events: list[CollectorEvent] = []

    for i in range(100):
        data = {
            "id": f"payload-{i}",
        }
        payload = Payload(data)
        event = await publisher.send(payload)
        events.append(event)

    return events

async def output_results(events: list[CollectorEvent]) -> None:
    """
    Fetch the result for each CollectorEvent using event.wait()
    and print out the result. event.wait() will block until the
    collector event is processed by a consumer and the result is
    available.
    """
    for event in events:
        result_list = await event.wait()
        result = result_list.first()
        value = result.unwrap()
        print(value)

async def main() -> None:
    """
    Run the CollectorQueue as an async context manager.
    """
    collector = Collector("demo-collector", event_handler)
    collector_queue = CollectorQueue(
            [collector],
            min_consumers=1,
            max_consumers=100,
     )

    start = time.monotonic_ns()

    async with collector_queue:
        publisher = collector_queue.get_publisher()
        events = await send_events(publisher)
        await output_results(events)

    elapsed = (time.monotonic_ns() - start) / 1.0e9
    print(f"elapsed={elapsed:.3f}")


if __name__ == "__main__":
    asyncio.run(main())
    sys.exit(0)

The CollectorQueue is designed to run as a background service – in a docker container, for instance – that listens for input events until it’s stopped. For this example we will run the collector queue in a command-line script’s main function and send events in a loop to the background consumers. After sending a batch of event payloads, we’ll fetch and print the results from each collector event, then exit.

It’s important to note here that after sending an event payload to the collector queue, the EventPublisher will return a CollectorEvent. The CollectorEvent works just like a Future object. It’s a placeholder for a result that we can retrieve but may not be ready right now. The CollectorEvent returns a ResultList when the backend processing by the collectors is done:

...

# "event" is a CollectorEvent.
event = await publisher.send(payload)

# event.wait() will block until the ResultList is ready.
# "result_list" is a ResultList.
result_list = await event.wait()

...

A ResultList is a container for a list of Result objects. We use a collection (list) as a return value because it’s valid for the collector queue to deliver the event payload to more than one collector, which means there may be more than one Result object being returned. So the ResultList can contain one or more results depending on the method of delivery.

For this example, we only need the first result since we have one collector, so we use the ResultList.first() method to retrieve the Result object. The value contained in the Result object can be obtained using the Result.unwrap() method.

Example for retrieving a Result from a ResultList:

# Send payload to collector queue.
payload_data = {
   "data": "payload data",
}
payload = Payload(payload_data)
event = await publisher.send(payload)

...

# event.wait() will block until the ResultList is ready.
result_list = event.wait()

# Get the first result in the result set.
result = result_list.first()

# Check whether the result object is Ok or Error.
match result:
    case Ok(value):
        print(value)
    case Error(_):
        # result.unwrap() will raise exception.
        result.unwrap()

Complete CollectorQueue Example

Here is the complete example script, minus the explanatory comments:

"""
example.py
"""

from typing import Any

import asyncio
import datetime
import sys
import time

from event_collector import (
    Collector,
    CollectorEvent,
    CollectorQueue,
    Context,
    EventPublisher,
    Ok,
    Payload,
    Result,
)


async def event_handler(payload: Payload, ctx: Context) -> Result:
    now = datetime.datetime.now(datetime.timezone.utc).isoformat()
    data: dict[str, Any] = payload.value

    result = {
        "id": data["id"],
        "received_at": now,
    }
    await asyncio.sleep(0.25)

    return Ok(result)


async def send_events(publisher: EventPublisher) -> list[CollectorEvent]:
    events: list[CollectorEvent] = []

    for i in range(100):
        data = {
            "id": f"payload-{i}",
        }
        payload = Payload(data)
        event = await publisher.send(payload)
        events.append(event)

    return events


async def output_results(events: list[CollectorEvent]) -> None:
    for event in events:
        result_list = await event.wait()
        result = result_list.first()
        value = result.unwrap()
        print(value)


async def main() -> None:
    collector = Collector("demo-collector", event_handler)
    collector_queue = CollectorQueue(
            [collector],
            min_consumers=1,
            max_consumers=100,
     )

    start = time.monotonic_ns()

    async with collector_queue:
        publisher = collector_queue.get_publisher()
        events = await send_events(publisher)
        await output_results(events)

    elapsed = (time.monotonic_ns() - start) / 1.0e9
    print(f"elapsed={elapsed:.3f}")


if __name__ == "__main__":
    asyncio.run(main())

To run the script, install the dependencies first with:

# if you do not have uv, install it using the instructions here:
# https://docs.astral.sh/uv/#installation

cd /path/to/event-collector
make depend

Run the example script with:

. .venv/bin/activate
python example_collector_queue.py

The output should look something like this:

(.venv) [event-collector]$ python example.py
{"logger":"CollectorQueue-8f6ad6fc75004f63aa1c17be7a88ef19","metrics_timestamp":"2026-05-01T06:47:14.291420+00:00","qsize":100,"busy":0,"consumers":1,"load":100.0,"avg_load":0.0,"service_time":0.0,"wait_time":0.0,"arrival_rate":147335.9451,"departure_rate":0.0,"wip":0.0,"event_count":0,"event":"collector queue metrics","level":"info","timestamp":"2026-05-01T06:47:14.292887Z"}
{'id': 'payload-0', 'received_at': '2026-05-01T06:47:14.293308+00:00'}
{'id': 'payload-1', 'received_at': '2026-05-01T06:47:14.304057+00:00'}
{'id': 'payload-2', 'received_at': '2026-05-01T06:47:14.304085+00:00'}
...
{'id': 'payload-97', 'received_at': '2026-05-01T06:47:14.304848+00:00'}
{'id': 'payload-98', 'received_at': '2026-05-01T06:47:14.304855+00:00'}
{'id': 'payload-99', 'received_at': '2026-05-01T06:47:14.304862+00:00'}
elapsed=0.267

The collector queue emits some log messages indicating that it has started up, then the collectors returned the results of the event handler. Finally, the collector queue shuts down and exits.

The output shows that to process 100 events, the collector queue only required 0.267 second (267ms), which is the service time of the event handler. This is because the collector queue scaled the consumer pool to 100 consumers to handle the input load.

This short implementation example shows that we’re able to set up an event-processing workflow, run a producer-consumer queue, and fetch the results – without a lot of external dependencies.