Data Structures¶
Source code: event_collector.data.py
This module contains definitions for basic data structures used in event_collector.
Type Aliases¶
The following are type aliases used in the event_collector codebase for
common data structures.
- type event_collector.data.Record = dict[str, Any]¶
Payload¶
- class event_collector.data.Payload(value: T, delivery: Delivery = Delivery.BROADCAST, keys: Sequence[str] | None = None, topics: Sequence[str] | None = None, priority: int | None = None)¶
The
Payloadobject is a wrapper for a value. The Payload also carries metadata about how to deliver the payload.- value¶
valueis the payload’s initialized value. The type isAnyand depends on what the caller passes as a parameter.
- delivery: Delivery¶
deliveryis theDeliverymode (Delivery.BROADCAST,Delivery.KEY, orDelivery.TOPIC). Default isDelivery.BROADCAST.
- keys: Sequence[str]¶
If
deliverymode isDelivery.KEY,keysshould be a list of collector keys that the event payload should be routed to.
- topics: Sequence[str]¶
If
deliverymode isDelivery.TOPIC,topicsshould be a list of topics that the event payload should be routed to.
- priority: int | None¶
The priority level of the event if using a priority queue.
- set_delivery(delivery: Delivery, keys: Sequence[str] | None = None, topics: Sequence[str] | None = None) None¶
Set the delivery mode, the collector keys (if delivery mode is
Delivery.KEY), and topics (if delivery mode isDelivery.TOPIC).
- set_priority(priority: int) None¶
Set the priority level (if using a priority queue).
Context¶
- class event_collector.data.Context(data: dict[str, Any] | None)¶
An object to store application-specific values.
Based on the
Stateclass in Starlette. https://github.com/Kludex/starlette/blob/main/starlette/datastructures.pyThe
Contextclass implements an object that can store other objects or application-specific data. Stored values in theContextobject are accessed like any attribute of an object:Example:
ctx = Context() ctx.database_dsn = "postgresql://postgres:postgres@localhost:5432/mydb" ctx.repository = MyDBRepository(ctx.database_dsn)
The
Collectorclass automatically creates aContextobject at initialization.Example usage:
key = "demo-collector" ctx = Context() ctx.config_value = os.getenv("CONFIG_VALUE") collector = Collector( key, event_handler, lifecycle=lifecycle, ctx=ctx, )
Result¶
- type event_collector.data.Result = Ok | Error¶
A
Resultis either anOkobject or anErrorobject. It’s used to wrap a value or an error (depending on the output of a function). Using theResultobject allowsevent_collector’s asynchronous tasks to pass valid values or errors/exceptions up the function call stack in a clean, unified way without having to deal with complex exception and error handling semantics.
- class event_collector.data.Ok(value)¶
This is a class that defines a wrapper for an OK, successful result.
- error() bool¶
Returns False.
- ok() bool¶
Returns True.
- unwrap() T¶
The
unwrapfunction returns the value set when theOkobject was initialized.
- class event_collector.data.Error(err: E, details: dict[str, Any] | None = None)¶
This is a class that defines a wrapper for an Error result.
- error() bool¶
Returns True.
- ok() bool¶
Returns False.
- unwrap() NoReturn¶
The
unwrapfunction will raise aRuntimeErroror the exception that was set when theErrorobject was initialized.
Result Example Usage¶
Ok indicates success and Error indicates failure or an
exception. The Ok and Error objects wrap a value. In the case
of Ok the value is the payload being returned. If the case of an
Error, the wrapped value should be an error message or an
Exception.
Example:
async def func(payload: Payload, ctx: Context) -> Result:
"""
Return an Ok result if processing is successful. The
Ok object will wrap the "payload" as a return value.
"""
value = {
"status": "successful",
}
return Ok(value)
or:
async def func(payload: Payload, ctx: Context) -> Result:
"""
Return an Error result if processing encounters an error.
The Error object will wrap an error message and the error
will unwrap as a RuntimeError.
"""
...
return Ok("event payload is missing required field")
# or
return Error(ValueError)
Example results:
>>> from event_collector.data import Ok, Error, Result
>>> result = Ok({"status": "successful"})
>>> result.unwrap()
{'status': 'successful'}
>>> result = Error("payload is missing required field")
>>> result.unwrap()
Traceback (most recent call last):
File "<python-input-4>", line 1, in <module>
result.unwrap()
~~~~~~~~~~~~~--
File ".../event-collector/src/event_collector/data.py", line 55, in unwrap
raise RuntimeError(self.err)
RuntimeError: payload is missing required field
ResultList¶
- class event_collector.data.ResultList(results: Sequence[Result] | None = None)¶
A
ResultListis a container for a collection of Result objects.ResultListis a subclass of collections.UserList, so it has the behavior of a list. In addition, “ResultList.iter” returns a generator that the caller can iterate with to get each Result. The “ResultList.first” method will return the first Result in the list. This is a convenience for callers that expect only one Result being returned. And “ResultList.size” is the same aslen(ResultList).- size() int¶
Return the size of the Result list.