CollectorQueue

Source code: event_collector.queue.py

This module implements the collector queue.

A collector queue is publisher-consumer queue implemented using only standard library functions and asyncio primitives.

Constants & Data Structures

type event_collector.queue.MonotonicTime = int
event_collector.queue.DEFAULT_MIN_CONSUMERS = 1

Default min_consumer value.

event_collector.queue.DEFAULT_MAX_CONSUMERS = 100000

Default max_consumer value.

event_collector.queue.CONSUMER_SHUTDOWN_TIMEOUT = 10.0

Convert a string or number to a floating-point number, if possible.

event_collector.queue.WIP_MARGIN: float = 0.15

Margin to calculate additional consumers to add to the average WIP task value (15%).

event_collector.queue.MONITOR_INTERVAL: float = 0.01

Consumer monitor sleep interval (in seconds).

event_collector.queue.REPORT_INTERVAL: float = 0.0

Collector queue status report log interval (in seconds).

event_collector.queue.AVG_LOAD_INTERVAL: float = 60.0

Keep load values for this interval (in seconds) to calculate average load.

event_collector.queue.METRICS_COLLECTOR_WINDOW_SIZE: float = 60.0

Keep arrival and departure data for this period (in seconds) to calculate arrival and departure rates.

event_collector.queue.EVENT_SERVICE_DATA_WINDOW_SIZE: int = 1000

Keep this number of event service data points to calculate service and wait times.

event_collector.queue.METRICS_WARMUP_INTERVAL: float = 3.0

Time to allow for metrics to stabilize on startup before reporting actual values. In the warmup period, averages will return 0.

CollectorQueue & Support Classes

class event_collector.queue.Metrics(timestamp: datetime, qsize: int, busy: int, consumers: int, load: float, avg_load: float, service_time: float, wait_time: float, arrival_rate: float, departure_rate: float, wip: float, t_delta: float, event_count: int)

A container for collector queue runtime metrics.

The latest metrics can be retrieved using the CollectorQueue.get_metrics method.

timestamp: datetime
qsize: int
busy: int
consumers: int
load: float
avg_load: float
service_time: float
wait_time: float
arrival_rate: float
departure_rate: float
wip: float
event_count: int
class event_collector.queue.CollectorQueue(collectors: CollectorList, router: type[Router] = <class 'event_collector.router.SimpleRouter'>, autoscale: bool = True, min_consumers: int = 1, max_consumers: int = 100000, wip_margin: float = 0.15, monitor_interval: float = 0.01, report_interval: float = 0.0, avg_load_interval: float = 60.0, metrics_collector_window_size: float = 60.0, metrics_warmup_interval: float = 3.0, event_service_data_window_size: int = 1000, metrics_history_size: int = 1000, queue_type: QueueType = QueueType.FIFO, default_priority: int = 1000, consumer_shutdown_timeout: float = 10.0, logging: bool = True)

The CollectorQueue manages a pool of consumers to process incoming event payloads (sent using the EventPublisher). Each consumer runs as a background asyncio.Task.

The CollectorQueue can be initialized with the following parameters:

collectors: CollectorDict

The collector queue must be intialized with a list of Collector classes. When initializing, the CollectorQueue will create Collector instances and call their setup methods. After the collector queue is up and running, the Collectors will receive event payloads by the consumers as the input events get put onto the event queue.

Note: this parameter must be a list of classes, not a list of class instances.

router

If supplied with a Router subclass, the collector queue will use it as the event handler for all incoming event payloads. If not set, the collector queue will use SimpleRouter.

Note: this parameter must be a class, not an instance of a class.

autoscale: bool

If autoscale is True, the collector queue will scale the consumer pool up or down (increasing or reducing the number of consumer tasks) to try to maintain as low a load ratio between input queue events and consumer tasks as possible.

If autoscale is False, the collector queue will initialize a pool of consumer tasks of max_consumers size and use these to handle all incoming events.

min_consumers: int

The minimum number of consumers to create on startup.

max_consumers: int

The maximum number of consumers the collector queue should create. When the load increases, the collector queue will not create more than this number of consumers.

wip_margin: float

Set the “work-in-progress” margin (between 0 and 1). The margin is used as an addition to the calculated size of the consumer pool when the input load increases. The default is 0.15 (15%).

monitor_interval: float

The periodic interval in seconds for the load monitor task to run. The default interval is 0.01 seconds (or 10ms).

report_interval: float

The periodic interval in seconds for the reporter task to run. The default interval is 0 (off). Set the interval to a positive number (in seconds) to enable metrics reports on the log output.

avg_load_interval: float

The interval in seconds to keep load values for calculating the average load of the running collector queue. The default average load interval is 60 seconds.

metrics_collector_window_size: float

The window size in seconds to keep arrival and departure data for incoming events. The data is used to calculate the arrival and departure rates of the queue events. The default data collector window size is 60 seconds. The minimum window size is 5 seconds.

event_service_data_window_size: int

The number of event service data points to store when calculating service and wait times. The default data window size is 500.

metrics_warmup_interval

The interval for metrics to wait before reporting valid averages. Within this interval, the metrics reporter will reporting 0 for averages. This allows the collector queue to accumulate actual runtime metrics data so it can report valid measurements. The default “metrics warmup interval” is 3 seconds.

queue_type: QueueType

The queue type to use for the internal event queue. The default is the asyncio FIFO queue. The collector queue can use any one of the asyncio.Queue types (FIFO, PriorityQueue, or LIFO).

default_priority: int

If using a Priority Queue, set the default priority level (int). The default is 100. The asyncio.PriorityQueue uses a min-heap so the lower the number the higher the priority when pulling events off the queue.

consumer_shutdown_timeout: float

The timeout in seconds to use when shutting down the collector queue. When backend processing that takes a long time to complete, the shutdown process can block if consumer tasks are waiting. The collector queue will use this timeout value to cancel any unfinished consumers. The default is 10 seconds.

logging: bool

If logging is set to True (default), the CollectorQueue will output log entries using the event_collector structlog logger.

If logging is set to False, the collector queue will produce no log output.

This flag is useful if your application does not need any information produced by the collector queue on the log output.

CollectorQueue public methods:

get_publisher() EventPublisher

Return an EventPublisher.

Use the publisher to send event payloads to the event queue.

Example:

collector_queue = CollectorQueue(
    collectors,
    min_consumers=min_consumers,
    max_consumers=max_consumers,
)

async with collector_queue:
    publisher = collector_queue.get_publisher()
    payload = Payload({
        "data": "payload data",
    })
    event = await event_publisher.send(payload)
    result_list = await event.wait()
get_metrics() Metrics

Return the latest Metrics snapshot.

consumer_count() int

Get the current consumer pool size.

current_load() Load

Get current load values.

async init_queue() None

Initialize the collector queue.

async shut_down_queue() None

Shut down the collector queue.

async __aenter__() CollectorQueue

Async context manager entry function.

Example usage:

collector = Collector("demo-collector", event_handler)
collectors = [collector]
min_consumers = 1
max_consumers = 1000

collector_queue = CollectorQueue(
    collectors,
    min_consumers=min_consumers,
    max_consumers=max_consumers,
)

async with collector_queue:
    publisher = queue.get_publisher()
    ...

The ContextManager interface is equivalent to:

try:
    collector = Collector("demo-collector", event_handler)
    collectors = [collector]
    min_consumers = 1
    max_consumers = 1000

    collector_queue = CollectorQueue(
        collectors,
        min_consumers=min_consumers,
        max_consumers=max_consumers,
    )
    await collector_queue.init_queue()

    publisher = queue.get_publisher()
    ...

except Exception as e:
    # handle exception
    ...

finally:
    # shut down collector queue
    await collector_queue.shut_down_queue()
async __aexit__(exc_type, exc_value, exc_tb)

Async context manager exit function.

When exiting the context manager, this function will call “shut_down_queue.”

MessageBus events

class event_collector.queue.ReportedMetrics(metrics: Metrics)

A message that is sent to the MessageBus after the collector queue outputs a Metrics report to the log output.

metrics: Metrics
class event_collector.queue.InitializedQueue(collector_queue: CollectorQueue)

A message that is sent to the MessageBus after the collector queue is initialized and before processing any event payloads.

collector_queue: CollectorQueue
class event_collector.queue.ShutDownQueue(collector_queue: CollectorQueue)

A message that is sent to the MessageBus after the collector queue has shut down.

collector_queue: CollectorQueue

To attach message handlers to the collector queue events, use the collector queue’s message bus instance and add custom handlers for the bus event:

from event_collector import Collector, CollectorQueue
from your_application import event_handler

# import messagebus module and bus events from event_collector.queue
import event_collector.messagebus as bus
from event_collector.queue import InitializedQueue, ShutDownQueue


# define message handlers for each "BusEvent" we want to listen to
async def handle_init_queue(msg: InitializedQueue, queue: bus.BusEventQueue) -> None:
    print("collector queue initialized")

async def handle_shut_down_queue(msg: ShutDownQueue, queue: bus.BusEventQueue) -> None:
    print("collector queue has shut down")

collector = Collector("demo-collector", event_handler)
collectors = [collector]
collector_queue = CollectorQueue(collectors)

# add message handlers to the collector queue message bus
msgbus = collector_queue.get_message_bus()
msgbus.add_msg_handler(InitializedQueue, handle_init_queue)
msgbus.add_msg_handler(ShutDownQueue, handle_shut_down_queue)

Using CollectorQueue as an Async Context Manager

When used as a context manager the collector queue will perform any initialization and shutdown operations automatically:

import asyncio
import sys

from event_collector import Collector, CollectorQueue

from your_application import event_handler


async run_collector_queue() -> None:
    collector = Collector("demo-collector", event_handler)
    collectors = [collector]
    min_consumers = 1
    max_consumers = 1000

    collector_queue = CollectorQueue(
        collectors,
        min_consumers=min_consumers,
        max_consumers=max_consumers,
    )

    async with collector_queue:
        publisher = collector_queue.get_publisher()

        collector_events = []
        for i in range(100):
            payload = {
                "data": f"payload-{i}",
            }
            event = await publisher.send(payload)
            collector_events.append(event)

        for event in collector_events:
            result_list = await event.wait()
            result = result_list.first()
            match result:
                case Error(_):
                    result.unwrap()
                case Ok(value):
                    print(value)


async def main() -> None:
    await run_collector_queue()

if __name__ == "__main__":
    asyncio.run(main())
    sys.exit(0)