Getting Started¶
To run a collector queue with an event collector, we need to perform the following steps:
create a
Collectorto process events;configure a
CollectorQueueto run background consumer tasks that will wait for input events;run the collector queue and publish events to the queue.
1. Create Collector¶
To create a Collector object to process incoming event payloads,
we define a function that takes two parameters: a Payload and a
Context. For this example, we will create an event handler
function that returns a dict as the result, containing the payload
id and a timestamp. The result dict is wrapped in an Ok
object. To simulate task processing, we will sleep for 0.25 second
(250ms).
from typing import Any
import datetime
from event_collector import Collector, Ok, Payload, Result
async def event_handler(payload: Payload, ctx: Context) -> Result:
""""Process incoming payload and return a Result."""
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
data: dict[str, Any] = payload.value
result = {
"id": data["id"],
"received_at": now,
}
await asyncio.sleep(0.25)
return Ok(result)
Next, we instantiate a Collector object, which is a container for
the processor or event handler function (defined as a
Processor type in the event_handler.collector module). A
Collector object must also have a unique identifying key
attribute. For our example collector, we’ll use “demo-collector”
as the key:
collector = Collector("demo-collector", event_handler)
The collector queue supports various delivery modes and the key is used for direct key delivery to specified collectors. For this getting-started example, we don’t need to worry about the details of how the key is used.
Note
The Collector also accepts a lifecycle parameter that you can
define to perform setup and teardown operations – like initializing
and closing database connections. In this example, we do not need to
perform any setup or shutdown steps.
The event_handler function is called when a consumer receives an
event in the queue and routes the event payload to the collector. The
function call will have Payload and Context objects as
parameters:
A
Payloadis a wrapper for the event payload value,and
Contextis an object for storing application-specific context values (such as a database connection).
The event handler must return a Result object, which is
defined as either Ok or Error in event_collector.data:
type Result = Ok | Error
OkandErrorare containers to wrap a return value or an error, respectively:
result = Ok({"status": "OK", "status_code": 200})
result = Error(ValueError("bad input")) # or just Error("bad input")
2. Configure a CollectorQueue¶
Next, we’ll create a CollectorQueue to manage a pool of background
consumer tasks. To configure the CollectorQueue, we supply the
Collector instance from above (as an item in a list) as a
parameter and set max_consumers to a value that can handle the
input load we’re expecting.
from event_collector import Collector, CollectorQueue
collector = Collector("demo-collector", event_handler)
collector_queue = CollectorQueue(
[collector],
min_consumers=1,
max_consumers=100,
)
The CollectorQueue instance will create and manage a consumer pool
and run all asyncio tasks in the background.
The collectors parameter expects a list of Collector instances
(because you can have multiple collectors as downstream processors).
Event payloads sent to the collector queue will be routed to one or
more of the collectors provided at initialization – in this case,
since the only collector is the demo-collector, it will receive
all input event payloads.
The CollectorQueue is configured with a minimum of one consumer
and a maximum of 100 consumers. By default, the collector queue
will autoscale the consumer pool based on the input event load.
Note
CollectorQueue accepts various different configuration parameters.
For general use cases, it’s enough to set collectors,
min_consumers, and max_consumers and use default values for
the rest.
3. Run the CollectorQueue¶
To put everything together, we add two helper functions: one to send
events to the collector queue (send_events), and the the other one
to print the return results (output_results).
import asyncio
import sys
import time
from event_collector import Collector, CollectorEvent, CollectorQueue, EventPublisher
async def send_events(publisher: EventPublisher) -> list[CollectorEvent]:
"""
To send events to the consumers, use publisher.send() and
supply a Payload object. publisher.send() will return a
CollectorEvent instance.
"""
events: list[CollectorEvent] = []
for i in range(100):
data = {
"id": f"payload-{i}",
}
payload = Payload(data)
event = await publisher.send(payload)
events.append(event)
return events
async def output_results(events: list[CollectorEvent]) -> None:
"""
Fetch the result for each CollectorEvent using event.wait()
and print out the result. event.wait() will block until the
collector event is processed by a consumer and the result is
available.
"""
for event in events:
result_list = await event.wait()
result = result_list.first()
value = result.unwrap()
print(value)
async def main() -> None:
"""
Run the CollectorQueue as an async context manager.
"""
collector = Collector("demo-collector", event_handler)
collector_queue = CollectorQueue(
[collector],
min_consumers=1,
max_consumers=100,
)
start = time.monotonic_ns()
async with collector_queue:
publisher = collector_queue.get_publisher()
events = await send_events(publisher)
await output_results(events)
elapsed = (time.monotonic_ns() - start) / 1.0e9
print(f"elapsed={elapsed:.3f}")
if __name__ == "__main__":
asyncio.run(main())
sys.exit(0)
The CollectorQueue is designed to run as a background service –
in a docker container, for instance – that listens for input events
until it’s stopped. For this example we will run the collector queue
in a command-line script’s main function and send events in a loop
to the background consumers. After sending a batch of event payloads,
we’ll fetch and print the results from each collector event, then exit.
It’s important to note here that after sending an event payload to the
collector queue, the EventPublisher will return a
CollectorEvent. The CollectorEvent works just like a Future object.
It’s a placeholder for a result that we can retrieve but may not be
ready right now. The CollectorEvent returns a ResultList when
the backend processing by the collectors is done:
...
# "event" is a CollectorEvent.
event = await publisher.send(payload)
# event.wait() will block until the ResultList is ready.
# "result_list" is a ResultList.
result_list = await event.wait()
...
A ResultList is a container for a list of Result objects. We
use a collection (list) as a return value because it’s valid for the
collector queue to deliver the event payload to more than one
collector, which means there may be more than one Result object
being returned. So the ResultList can contain one or more results
depending on the method of delivery.
For this example, we only need the first result since we have one
collector, so we use the ResultList.first() method to retrieve the
Result object. The value contained in the Result object can be
obtained using the Result.unwrap() method.
Example for retrieving a Result from a ResultList:
# Send payload to collector queue.
payload_data = {
"data": "payload data",
}
payload = Payload(payload_data)
event = await publisher.send(payload)
...
# event.wait() will block until the ResultList is ready.
result_list = event.wait()
# Get the first result in the result set.
result = result_list.first()
# Check whether the result object is Ok or Error.
match result:
case Ok(value):
print(value)
case Error(_):
# result.unwrap() will raise exception.
result.unwrap()
Complete CollectorQueue Example¶
Here is the complete example script, minus the explanatory comments:
"""
example.py
"""
from typing import Any
import asyncio
import datetime
import sys
import time
from event_collector import (
Collector,
CollectorEvent,
CollectorQueue,
Context,
EventPublisher,
Ok,
Payload,
Result,
)
async def event_handler(payload: Payload, ctx: Context) -> Result:
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
data: dict[str, Any] = payload.value
result = {
"id": data["id"],
"received_at": now,
}
await asyncio.sleep(0.25)
return Ok(result)
async def send_events(publisher: EventPublisher) -> list[CollectorEvent]:
events: list[CollectorEvent] = []
for i in range(100):
data = {
"id": f"payload-{i}",
}
payload = Payload(data)
event = await publisher.send(payload)
events.append(event)
return events
async def output_results(events: list[CollectorEvent]) -> None:
for event in events:
result_list = await event.wait()
result = result_list.first()
value = result.unwrap()
print(value)
async def main() -> None:
collector = Collector("demo-collector", event_handler)
collector_queue = CollectorQueue(
[collector],
min_consumers=1,
max_consumers=100,
)
start = time.monotonic_ns()
async with collector_queue:
publisher = collector_queue.get_publisher()
events = await send_events(publisher)
await output_results(events)
elapsed = (time.monotonic_ns() - start) / 1.0e9
print(f"elapsed={elapsed:.3f}")
if __name__ == "__main__":
asyncio.run(main())
To run the script, install the dependencies first with:
# if you do not have uv, install it using the instructions here:
# https://docs.astral.sh/uv/#installation
cd /path/to/event-collector
make depend
Run the example script with:
. .venv/bin/activate
python example_collector_queue.py
The output should look something like this:
(.venv) [event-collector]$ python example.py
{"logger":"CollectorQueue-8f6ad6fc75004f63aa1c17be7a88ef19","metrics_timestamp":"2026-05-01T06:47:14.291420+00:00","qsize":100,"busy":0,"consumers":1,"load":100.0,"avg_load":0.0,"service_time":0.0,"wait_time":0.0,"arrival_rate":147335.9451,"departure_rate":0.0,"wip":0.0,"event_count":0,"event":"collector queue metrics","level":"info","timestamp":"2026-05-01T06:47:14.292887Z"}
{'id': 'payload-0', 'received_at': '2026-05-01T06:47:14.293308+00:00'}
{'id': 'payload-1', 'received_at': '2026-05-01T06:47:14.304057+00:00'}
{'id': 'payload-2', 'received_at': '2026-05-01T06:47:14.304085+00:00'}
...
{'id': 'payload-97', 'received_at': '2026-05-01T06:47:14.304848+00:00'}
{'id': 'payload-98', 'received_at': '2026-05-01T06:47:14.304855+00:00'}
{'id': 'payload-99', 'received_at': '2026-05-01T06:47:14.304862+00:00'}
elapsed=0.267
The collector queue emits some log messages indicating that it has started up, then the collectors returned the results of the event handler. Finally, the collector queue shuts down and exits.
The output shows that to process 100 events, the collector queue
only required 0.267 second (267ms), which is the
service time of the event handler. This is because the
collector queue scaled the consumer pool to 100 consumers to
handle the input load.
This short implementation example shows that we’re able to set up an event-processing workflow, run a producer-consumer queue, and fetch the results – without a lot of external dependencies.