Data Structures

Source code: event_collector.data.py

This module contains definitions for basic data structures used in event_collector.

Type Aliases

The following are type aliases used in the event_collector codebase for common data structures.

type event_collector.data.Record = dict[str, Any]

Payload

class event_collector.data.Payload(value: T, delivery: Delivery = Delivery.BROADCAST, keys: Sequence[str] | None = None, topics: Sequence[str] | None = None, priority: int | None = None)

The Payload object is a wrapper for a value. The Payload also carries metadata about how to deliver the payload.

value

value is the payload’s initialized value. The type is Any and depends on what the caller passes as a parameter.

delivery: Delivery

delivery is the Delivery mode (Delivery.BROADCAST, Delivery.KEY, or Delivery.TOPIC). Default is Delivery.BROADCAST.

keys: Sequence[str]

If delivery mode is Delivery.KEY, keys should be a list of collector keys that the event payload should be routed to.

topics: Sequence[str]

If delivery mode is Delivery.TOPIC, topics should be a list of topics that the event payload should be routed to.

priority: int | None

The priority level of the event if using a priority queue.

set_delivery(delivery: Delivery, keys: Sequence[str] | None = None, topics: Sequence[str] | None = None) None

Set the delivery mode, the collector keys (if delivery mode is Delivery.KEY), and topics (if delivery mode is Delivery.TOPIC).

set_priority(priority: int) None

Set the priority level (if using a priority queue).

class event_collector.data.Delivery(*values)

This is an enum class used for specifying the delivery mode of a Payload.

BROADCAST = 0
KEY = 1
TOPIC = 2

Context

class event_collector.data.Context(data: dict[str, Any] | None)

An object to store application-specific values.

Based on the State class in Starlette. https://github.com/Kludex/starlette/blob/main/starlette/datastructures.py

The Context class implements an object that can store other objects or application-specific data. Stored values in the Context object are accessed like any attribute of an object:

Example:

ctx = Context()
ctx.database_dsn = "postgresql://postgres:postgres@localhost:5432/mydb"
ctx.repository = MyDBRepository(ctx.database_dsn)

The Collector class automatically creates a Context object at initialization.

Example usage:

key = "demo-collector"
ctx = Context()
ctx.config_value = os.getenv("CONFIG_VALUE")

collector = Collector(
    key,
    event_handler,
    lifecycle=lifecycle,
    ctx=ctx,
)

Result

type event_collector.data.Result = Ok | Error

A Result is either an Ok object or an Error object. It’s used to wrap a value or an error (depending on the output of a function). Using the Result object allows event_collector’s asynchronous tasks to pass valid values or errors/exceptions up the function call stack in a clean, unified way without having to deal with complex exception and error handling semantics.

class event_collector.data.Ok(value)

This is a class that defines a wrapper for an OK, successful result.

error() bool

Returns False.

ok() bool

Returns True.

unwrap() T

The unwrap function returns the value set when the Ok object was initialized.

class event_collector.data.Error(err: E, details: dict[str, Any] | None = None)

This is a class that defines a wrapper for an Error result.

error() bool

Returns True.

ok() bool

Returns False.

unwrap() NoReturn

The unwrap function will raise a RuntimeError or the exception that was set when the Error object was initialized.

Result Example Usage

Ok indicates success and Error indicates failure or an exception. The Ok and Error objects wrap a value. In the case of Ok the value is the payload being returned. If the case of an Error, the wrapped value should be an error message or an Exception.

Example:

async def func(payload: Payload, ctx: Context) -> Result:
    """
    Return an Ok result if processing is successful. The
    Ok object will wrap the "payload" as a return value.
    """
    value = {
        "status": "successful",
    }
    return Ok(value)

or:

async def func(payload: Payload, ctx: Context) -> Result:
    """
    Return an Error result if processing encounters an error.
    The Error object will wrap an error message and the error
    will unwrap as a RuntimeError.
    """
    ...

    return Ok("event payload is missing required field")

    # or

    return Error(ValueError)

Example results:

>>> from event_collector.data import Ok, Error, Result

>>> result = Ok({"status": "successful"})
>>> result.unwrap()
{'status': 'successful'}

>>> result = Error("payload is missing required field")
>>> result.unwrap()
Traceback (most recent call last):
  File "<python-input-4>", line 1, in <module>
    result.unwrap()
    ~~~~~~~~~~~~~--
  File ".../event-collector/src/event_collector/data.py", line 55, in unwrap
    raise RuntimeError(self.err)
RuntimeError: payload is missing required field

ResultList

class event_collector.data.ResultList(results: Sequence[Result] | None = None)

A ResultList is a container for a collection of Result objects. ResultList is a subclass of collections.UserList, so it has the behavior of a list. In addition, “ResultList.iter” returns a generator that the caller can iterate with to get each Result. The “ResultList.first” method will return the first Result in the list. This is a convenience for callers that expect only one Result being returned. And “ResultList.size” is the same as len(ResultList).

property results: list[Result]

The list of Result objects.

first() Result

Return the first Result in the Result list, or an Error if the list is empty.

iter() Iterator[Result]

Return a generator to access the Result list.

size() int

Return the size of the Result list.