EventPublisher

Source code: event_collector.publisher.py

This module implements the CollectorEvent and EventPublisher.

An EventPublisher sends CollectorEvents to the collector queue. The publisher is automatically created when the collector queue is initialized.

Constants & Data Structures

event_collector.publisher.DEFAULT_PRIORITY: int = 1000

Default priority level for a priority queue.

class event_collector.publisher.QueueType(*values)

QueueType is an Enum class representing asyncio.Queue types.

FIFO = 0
PRIORITY = 1
LIFO = 2

Event Publisher & CollectorEvent

class event_collector.publisher.EventPublisher(event_queue: Queue, *, queue_type: QueueType = QueueType.FIFO, default_priority: int = 1000)

The EventPublisher is used to send events to the event queue.

An EventPublisher is automatically created by the CollectorQueue at initialization so you do not need to create this manually. The CollectorQueue will also set the queue_type based on its input parameters.

When the CollectorQueue is running, you can access the EventPublisher using the CollectorQueue.get_publisher method. To send event payloads to the queue, use the EventPublisher.send method.

async send(data: Payload) CollectorEvent

The send method is used to put an input event payload onto the event queue. The method returns a CollectorEvent instance which can be used to await the result.

Example:

...

publisher = collector_queue.get_publisher()

payload = Payload({"data": "some data"})
event = await publisher.send(payload)

# event.wait() will block until processing finishes
result = await event.wait()

If result is not needed – in a “fire-and-forget” use case – just use send and ignore the return value:

payload = Payload({"data": "some data"})
await publisher.send(payload)

If using a priority queue, include a priority value (int) in the Payload:

payload = Payload({"data": "some data"}, priority=1)
await publisher.send(payload)

Python’s asyncio.PriorityQueue uses a min-heap and is sorted by lowest first (lower value has higher priority).

async wait() None

The wait method waits for all events to be processed and will block until all events sent the collector queue are processed.

class event_collector.publisher.CollectorEvent(event_id: int, payload: Payload, *, priority: int = 1000)

A CollectorEvent is a container for an event payload and the event’s result after processing. It’s created automatically by the EventPublisher for each event payload sent to the collector queue.

get_result() ResultList | None
Returns:

a ResultList or None if result is not yet available.

async wait() ResultList

Return the ResultList if the event is finished, or else create a Future object and await its completion. This will block until the Future is marked as finished by a consumer setting its result.

service_time() MonotonicTime

Return service_time (in nanoseconds) If event is not yet finished, return 0.

wait_time() MonotonicTime

Return total processing time (in nanoseconds) from creation to stop. If event is not yet finished, return 0.