CollectorQueue¶
Source code: event_collector.queue.py
This module implements the collector queue.
A collector queue is publisher-consumer queue implemented using only standard library functions and asyncio primitives.
Constants & Data Structures¶
- type event_collector.queue.MonotonicTime = int¶
- event_collector.queue.DEFAULT_MIN_CONSUMERS = 1¶
Default min_consumer value.
- event_collector.queue.DEFAULT_MAX_CONSUMERS = 100000¶
Default max_consumer value.
- event_collector.queue.CONSUMER_SHUTDOWN_TIMEOUT = 10.0¶
Convert a string or number to a floating-point number, if possible.
- event_collector.queue.WIP_MARGIN: float = 0.15¶
Margin to calculate additional consumers to add to the average WIP task value (15%).
- event_collector.queue.MONITOR_INTERVAL: float = 0.01¶
Consumer monitor sleep interval (in seconds).
- event_collector.queue.REPORT_INTERVAL: float = 0.0¶
Collector queue status report log interval (in seconds).
- event_collector.queue.AVG_LOAD_INTERVAL: float = 60.0¶
Keep load values for this interval (in seconds) to calculate average load.
- event_collector.queue.METRICS_COLLECTOR_WINDOW_SIZE: float = 60.0¶
Keep arrival and departure data for this period (in seconds) to calculate arrival and departure rates.
- event_collector.queue.EVENT_SERVICE_DATA_WINDOW_SIZE: int = 1000¶
Keep this number of event service data points to calculate service and wait times.
- event_collector.queue.METRICS_WARMUP_INTERVAL: float = 3.0¶
Time to allow for metrics to stabilize on startup before reporting actual values. In the warmup period, averages will return 0.
CollectorQueue & Support Classes¶
- class event_collector.queue.Metrics(timestamp: datetime, qsize: int, busy: int, consumers: int, load: float, avg_load: float, service_time: float, wait_time: float, arrival_rate: float, departure_rate: float, wip: float, t_delta: float, event_count: int)¶
A container for collector queue runtime metrics.
The latest metrics can be retrieved using the
CollectorQueue.get_metricsmethod.- timestamp: datetime¶
- qsize: int¶
- busy: int¶
- consumers: int¶
- load: float¶
- avg_load: float¶
- service_time: float¶
- wait_time: float¶
- arrival_rate: float¶
- departure_rate: float¶
- wip: float¶
- event_count: int¶
- class event_collector.queue.CollectorQueue(collectors: CollectorList, router: type[Router] = <class 'event_collector.router.SimpleRouter'>, autoscale: bool = True, min_consumers: int = 1, max_consumers: int = 100000, wip_margin: float = 0.15, monitor_interval: float = 0.01, report_interval: float = 0.0, avg_load_interval: float = 60.0, metrics_collector_window_size: float = 60.0, metrics_warmup_interval: float = 3.0, event_service_data_window_size: int = 1000, metrics_history_size: int = 1000, queue_type: QueueType = QueueType.FIFO, default_priority: int = 1000, consumer_shutdown_timeout: float = 10.0, logging: bool = True)¶
The
CollectorQueuemanages a pool of consumers to process incoming event payloads (sent using theEventPublisher). Each consumer runs as a background asyncio.Task.The
CollectorQueuecan be initialized with the following parameters:- collectors: CollectorDict¶
The collector queue must be intialized with a list of
Collectorclasses. When initializing, theCollectorQueuewill createCollectorinstances and call theirsetupmethods. After the collector queue is up and running, theCollectorswill receive event payloads by the consumers as the input events get put onto the event queue.Note: this parameter must be a list of classes, not a list of class instances.
- router¶
If supplied with a
Routersubclass, the collector queue will use it as the event handler for all incoming event payloads. If not set, the collector queue will useSimpleRouter.Note: this parameter must be a class, not an instance of a class.
- autoscale: bool¶
If
autoscaleis True, the collector queue will scale the consumer pool up or down (increasing or reducing the number of consumer tasks) to try to maintain as low a load ratio between input queue events and consumer tasks as possible.If
autoscaleis False, the collector queue will initialize a pool of consumer tasks ofmax_consumerssize and use these to handle all incoming events.
- min_consumers: int¶
The minimum number of consumers to create on startup.
- max_consumers: int¶
The maximum number of consumers the collector queue should create. When the load increases, the collector queue will not create more than this number of consumers.
- wip_margin: float¶
Set the “work-in-progress” margin (between 0 and 1). The margin is used as an addition to the calculated size of the consumer pool when the input load increases. The default is 0.15 (15%).
- monitor_interval: float¶
The periodic interval in seconds for the load monitor task to run. The default interval is 0.01 seconds (or 10ms).
- report_interval: float¶
The periodic interval in seconds for the reporter task to run. The default interval is 0 (off). Set the interval to a positive number (in seconds) to enable metrics reports on the log output.
- avg_load_interval: float¶
The interval in seconds to keep load values for calculating the average load of the running collector queue. The default average load interval is 60 seconds.
- metrics_collector_window_size: float¶
The window size in seconds to keep arrival and departure data for incoming events. The data is used to calculate the arrival and departure rates of the queue events. The default data collector window size is 60 seconds. The minimum window size is 5 seconds.
- event_service_data_window_size: int¶
The number of event service data points to store when calculating service and wait times. The default data window size is 500.
- metrics_warmup_interval¶
The interval for metrics to wait before reporting valid averages. Within this interval, the metrics reporter will reporting 0 for averages. This allows the collector queue to accumulate actual runtime metrics data so it can report valid measurements. The default “metrics warmup interval” is 3 seconds.
- queue_type: QueueType¶
The queue type to use for the internal event queue. The default is the
asyncioFIFO queue. The collector queue can use any one of theasyncio.Queuetypes (FIFO, PriorityQueue, or LIFO).
- default_priority: int¶
If using a Priority Queue, set the default priority level (
int). The default is 100. Theasyncio.PriorityQueueuses a min-heap so the lower the number the higher the priority when pulling events off the queue.
- consumer_shutdown_timeout: float¶
The timeout in seconds to use when shutting down the collector queue. When backend processing that takes a long time to complete, the shutdown process can block if consumer tasks are waiting. The collector queue will use this timeout value to cancel any unfinished consumers. The default is 10 seconds.
- logging: bool¶
If
loggingis set to True (default), theCollectorQueuewill output log entries using theevent_collectorstructlog logger.If
loggingis set to False, the collector queue will produce no log output.This flag is useful if your application does not need any information produced by the collector queue on the log output.
CollectorQueue public methods:
- get_publisher() EventPublisher¶
Return an EventPublisher.
Use the publisher to send event payloads to the event queue.
Example:
collector_queue = CollectorQueue( collectors, min_consumers=min_consumers, max_consumers=max_consumers, ) async with collector_queue: publisher = collector_queue.get_publisher() payload = Payload({ "data": "payload data", }) event = await event_publisher.send(payload) result_list = await event.wait()
- consumer_count() int¶
Get the current consumer pool size.
- current_load() Load¶
Get current load values.
- async init_queue() None¶
Initialize the collector queue.
- async shut_down_queue() None¶
Shut down the collector queue.
- async __aenter__() CollectorQueue¶
Async context manager entry function.
Example usage:
collector = Collector("demo-collector", event_handler) collectors = [collector] min_consumers = 1 max_consumers = 1000 collector_queue = CollectorQueue( collectors, min_consumers=min_consumers, max_consumers=max_consumers, ) async with collector_queue: publisher = queue.get_publisher() ...
The ContextManager interface is equivalent to:
try: collector = Collector("demo-collector", event_handler) collectors = [collector] min_consumers = 1 max_consumers = 1000 collector_queue = CollectorQueue( collectors, min_consumers=min_consumers, max_consumers=max_consumers, ) await collector_queue.init_queue() publisher = queue.get_publisher() ... except Exception as e: # handle exception ... finally: # shut down collector queue await collector_queue.shut_down_queue()
- async __aexit__(exc_type, exc_value, exc_tb)¶
Async context manager exit function.
When exiting the context manager, this function will call “shut_down_queue.”
MessageBus events¶
- class event_collector.queue.ReportedMetrics(metrics: Metrics)¶
A message that is sent to the MessageBus after the collector queue outputs a Metrics report to the log output.
- class event_collector.queue.InitializedQueue(collector_queue: CollectorQueue)¶
A message that is sent to the MessageBus after the collector queue is initialized and before processing any event payloads.
- collector_queue: CollectorQueue¶
- class event_collector.queue.ShutDownQueue(collector_queue: CollectorQueue)¶
A message that is sent to the MessageBus after the collector queue has shut down.
- collector_queue: CollectorQueue¶
To attach message handlers to the collector queue events, use the collector queue’s message bus instance and add custom handlers for the bus event:
from event_collector import Collector, CollectorQueue
from your_application import event_handler
# import messagebus module and bus events from event_collector.queue
import event_collector.messagebus as bus
from event_collector.queue import InitializedQueue, ShutDownQueue
# define message handlers for each "BusEvent" we want to listen to
async def handle_init_queue(msg: InitializedQueue, queue: bus.BusEventQueue) -> None:
print("collector queue initialized")
async def handle_shut_down_queue(msg: ShutDownQueue, queue: bus.BusEventQueue) -> None:
print("collector queue has shut down")
collector = Collector("demo-collector", event_handler)
collectors = [collector]
collector_queue = CollectorQueue(collectors)
# add message handlers to the collector queue message bus
msgbus = collector_queue.get_message_bus()
msgbus.add_msg_handler(InitializedQueue, handle_init_queue)
msgbus.add_msg_handler(ShutDownQueue, handle_shut_down_queue)
Using CollectorQueue as an Async Context Manager¶
When used as a context manager the collector queue will perform any initialization and shutdown operations automatically:
import asyncio
import sys
from event_collector import Collector, CollectorQueue
from your_application import event_handler
async run_collector_queue() -> None:
collector = Collector("demo-collector", event_handler)
collectors = [collector]
min_consumers = 1
max_consumers = 1000
collector_queue = CollectorQueue(
collectors,
min_consumers=min_consumers,
max_consumers=max_consumers,
)
async with collector_queue:
publisher = collector_queue.get_publisher()
collector_events = []
for i in range(100):
payload = {
"data": f"payload-{i}",
}
event = await publisher.send(payload)
collector_events.append(event)
for event in collector_events:
result_list = await event.wait()
result = result_list.first()
match result:
case Error(_):
result.unwrap()
case Ok(value):
print(value)
async def main() -> None:
await run_collector_queue()
if __name__ == "__main__":
asyncio.run(main())
sys.exit(0)