.. currentmodule:: event_collector.queue CollectorQueue ============== **Source code:** `event_collector.queue.py` .. py:module:: event_collector.queue This module implements the collector queue. .. pull-quote:: A *collector queue* is publisher-consumer queue implemented using only standard library functions and asyncio primitives. Constants & Data Structures --------------------------- .. autotype:: event_collector.queue.MonotonicTime .. autodata:: event_collector.queue.DEFAULT_MIN_CONSUMERS .. autodata:: event_collector.queue.DEFAULT_MAX_CONSUMERS .. autodata:: event_collector.queue.CONSUMER_SHUTDOWN_TIMEOUT .. autodata:: event_collector.queue.WIP_MARGIN .. autodata:: event_collector.queue.MONITOR_INTERVAL .. autodata:: event_collector.queue.REPORT_INTERVAL .. autodata:: event_collector.queue.AVG_LOAD_INTERVAL .. autodata:: event_collector.queue.METRICS_COLLECTOR_WINDOW_SIZE .. autodata:: event_collector.queue.EVENT_SERVICE_DATA_WINDOW_SIZE .. autodata:: event_collector.queue.METRICS_WARMUP_INTERVAL CollectorQueue & Support Classes -------------------------------- .. autoclass:: event_collector.queue.Metrics .. autoattribute:: timestamp .. autoattribute:: qsize .. autoattribute:: busy .. autoattribute:: consumers .. autoattribute:: load .. autoattribute:: avg_load .. autoattribute:: service_time .. autoattribute:: wait_time .. autoattribute:: arrival_rate .. autoattribute:: departure_rate .. autoattribute:: wip .. autoattribute:: event_count .. autoclass:: event_collector.queue.CollectorQueue :members: init_queue, shut_down_queue, get_publisher, get_metrics, consumer_count, current_load, get_message_bus :special-members: __aenter__, __aexit__ :member-order: bysource The ``CollectorQueue`` can be initialized with the following parameters: .. autoattribute:: collectors The collector queue must be intialized with a list of ``Collector`` classes. When initializing, the ``CollectorQueue`` will create ``Collector`` instances and call their ``setup`` methods. After the collector queue is up and running, the ``Collectors`` will receive event payloads by the consumers as the input events get put onto the event queue. Note: this parameter must be a list of *classes*, not a list of class instances. .. autoattribute:: router If supplied with a ``Router`` subclass, the collector queue will use it as the event handler for all incoming event payloads. If not set, the collector queue will use ``SimpleRouter``. Note: this parameter must be a *class*, not an instance of a class. .. autoattribute:: autoscale If ``autoscale`` is True, the collector queue will scale the consumer pool up or down (increasing or reducing the number of consumer tasks) to try to maintain as low a load ratio between input queue events and consumer tasks as possible. If ``autoscale`` is False, the collector queue will initialize a pool of consumer tasks of ``max_consumers`` size and use these to handle all incoming events. .. autoattribute:: min_consumers The minimum number of consumers to create on startup. .. autoattribute:: max_consumers The maximum number of consumers the collector queue should create. When the load increases, the collector queue will not create more than this number of consumers. .. autoattribute:: wip_margin Set the "work-in-progress" margin (between 0 and 1). The margin is used as an addition to the calculated size of the consumer pool when the input load increases. The default is 0.15 (15%). .. autoattribute:: monitor_interval The periodic interval in seconds for the load monitor task to run. The default interval is 0.01 seconds (or 10ms). .. autoattribute:: report_interval The periodic interval in seconds for the reporter task to run. The default interval is 0 (off). Set the interval to a positive number (in seconds) to enable metrics reports on the log output. .. autoattribute:: avg_load_interval The interval in seconds to keep load values for calculating the average load of the running collector queue. The default average load interval is 60 seconds. .. autoattribute:: metrics_collector_window_size The window size in seconds to keep arrival and departure data for incoming events. The data is used to calculate the arrival and departure rates of the queue events. The default data collector window size is 60 seconds. The minimum window size is 5 seconds. .. autoattribute:: event_service_data_window_size The number of event service data points to store when calculating service and wait times. The default data window size is 500. .. autoattribute:: metrics_warmup_interval The interval for metrics to wait before reporting valid averages. Within this interval, the metrics reporter will reporting 0 for averages. This allows the collector queue to accumulate actual runtime metrics data so it can report valid measurements. The default "metrics warmup interval" is 3 seconds. .. autoattribute:: queue_type The queue type to use for the internal event queue. The default is the ``asyncio`` FIFO queue. The collector queue can use any one of the ``asyncio.Queue`` types (FIFO, PriorityQueue, or LIFO). .. autoattribute:: default_priority If using a Priority Queue, set the default priority level (``int``). The default is 100. The ``asyncio.PriorityQueue`` uses a min-heap so the lower the number the higher the priority when pulling events off the queue. .. autoattribute:: consumer_shutdown_timeout The timeout in seconds to use when shutting down the collector queue. When backend processing that takes a long time to complete, the shutdown process can block if consumer tasks are waiting. The collector queue will use this timeout value to cancel any unfinished consumers. The default is 10 seconds. .. autoattribute:: logging If ``logging`` is set to True (default), the ``CollectorQueue`` will output log entries using the ``event_collector`` structlog logger. If ``logging`` is set to False, the collector queue will produce no log output. This flag is useful if your application does not need any information produced by the collector queue on the log output. **CollectorQueue public methods:** MessageBus events ----------------- .. autoclass:: event_collector.queue.ReportedMetrics .. autoattribute:: metrics .. autoclass:: event_collector.queue.InitializedQueue .. autoattribute:: collector_queue .. autoclass:: event_collector.queue.ShutDownQueue .. autoattribute:: collector_queue To attach message handlers to the collector queue events, use the collector queue's message bus instance and add custom handlers for the bus event: .. code-block:: python from event_collector import Collector, CollectorQueue from your_application import event_handler # import messagebus module and bus events from event_collector.queue import event_collector.messagebus as bus from event_collector.queue import InitializedQueue, ShutDownQueue # define message handlers for each "BusEvent" we want to listen to async def handle_init_queue(msg: InitializedQueue, queue: bus.BusEventQueue) -> None: print("collector queue initialized") async def handle_shut_down_queue(msg: ShutDownQueue, queue: bus.BusEventQueue) -> None: print("collector queue has shut down") collector = Collector("demo-collector", event_handler) collectors = [collector] collector_queue = CollectorQueue(collectors) # add message handlers to the collector queue message bus msgbus = collector_queue.get_message_bus() msgbus.add_msg_handler(InitializedQueue, handle_init_queue) msgbus.add_msg_handler(ShutDownQueue, handle_shut_down_queue) Using CollectorQueue as an Async Context Manager ------------------------------------------------ When used as a context manager the collector queue will perform any initialization and shutdown operations automatically: .. code-block:: python import asyncio import sys from event_collector import Collector, CollectorQueue from your_application import event_handler async run_collector_queue() -> None: collector = Collector("demo-collector", event_handler) collectors = [collector] min_consumers = 1 max_consumers = 1000 collector_queue = CollectorQueue( collectors, min_consumers=min_consumers, max_consumers=max_consumers, ) async with collector_queue: publisher = collector_queue.get_publisher() collector_events = [] for i in range(100): payload = { "data": f"payload-{i}", } event = await publisher.send(payload) collector_events.append(event) for event in collector_events: result_list = await event.wait() result = result_list.first() match result: case Error(_): result.unwrap() case Ok(value): print(value) async def main() -> None: await run_collector_queue() if __name__ == "__main__": asyncio.run(main()) sys.exit(0)