Subscribers are software connecting to the Orchestra server and calling wait_event
on an outlet. This is how the pub/sub system of Orchestra works.
Outlets subscribed to entities will serve as event queue ready to be consumed by anyone calling wait_event
on it.
When publishing an event, there is a number of data point the publisher can add to the event. The event
is the most important data point and represents what kind of event is being published. Possible values of event are dictated by the taxonomy, example values from the orchestra taxonomy includes UPDATE
, ERROR
, or DELETE
. Outlets can decide to only subscribe to a specific event.
On top of what event is being published, subscriber will receive:
wait_event
on)Note that both the entity and outlet received by the subscriber is the real EUID of the entity, after potential redirections.
When someone calls publish_event
on an entity, any outlet that is subscribed to that entity will receive the event.
From there, if a subscriber is already calling wait_event
waiting for event, then the event is directly sent to that subscriber. If no subscriber is calling wait_event
waiting for event, then the outlet stores the event ready to hand it over to the next subscriber calling wait_event
on it.
This is a python example of how to publish event and wait events:
note: requires avesterra python package version 24.02.2
import time
from threading import Thread
import avesterra as av
av.initialize(server="127.0.0.1", directory="/AvesTerra/Certificates/")
auth = av.AvAuthorization("c08d118a-0ebf-4889-b5de-bbabbf841403")
def wait_one_event(e: av.AvEntity):
"""
This function is useful only to demonstrate that we can call `wait_event` before an event
is published, in which case it will block until it receives the event.
If the optional timeout parameter is `0` (the default), then it will block forever.
Otherwise if no event, it will raise a Timeouterror after `timeout` seconds.
"""
def inner():
evdata = av.wait_event(e, authorization=auth)
print(f"Received an event with name {evdata.name}")
t = Thread(target=inner)
t.start()
return t
try:
entity = av.create_entity("My entity", authorization=auth)
outlet = av.create_outlet("my outlet", authorization=auth)
print(f"Created entity {entity} and outlet {outlet}")
# After we subscribe the outlet, it will receive any events published on the entity
av.subscribe_event(entity, outlet, authorization=auth)
# When we publish an event on the entity, it gets queued up in the outlet
# until a subscriber waits for it
av.publish_event(entity, name="First event", authorization=auth)
# not needed, just to illustrate that time may pass
time.sleep(1)
# Waits for an event, but since the event we just published is sitting in the
# outlet's event queue then it will instantly consume that event.
evdata = av.wait_event(e, authorization=auth)
print(f"Received an event with name {evdata.name}")
# This background thread will wait to receive the next event we publish
# and print its name.
t = wait_one_event(outlet)
# not needed, just to illustrate that time may pass
time.sleep(1)
# We publish the event that the second background thread has been patiently
# waiting for.
av.publish_event(entity, name="Second event", authorization=auth)
time.sleep(0.1)
print("Done")
t.join()
finally:
av.finalize()
output:
Created self-subscribed outlet <0|0|397873>
Received an event with name First event
Received an event with name Second event
Done
To subscribe an outlet to an entity, one must use an authorization token that is either the target entity's authority or one of its authorizations.
To wait event on an outlet, one must use the outlet authority.
For an event to get to an outlet, the authorization used to publish the event should also be the authority or one of the authorizations of the outlet.
Example:
Entity e1 has authority A and authorization D, E and F.
Outlet o1 has authority B and authorization D, F and G.
Outlet o2 has authority C and authorization E, F and H.
To subscribe o1 or o2 on e1, one can use token A, D, E or F.
To publish an event on e1, one can use token A, D, E or F.
If token A is used to publish an event on e1, none of the outlet will receive it.
If token D is used to publish an event on e1, then only outlet o1 will receive it.
If token E is used to publish an event on e1, then only outlet o2 will receive it.
If token F is used to publish an event on e1, then both outlet o1 and o2 will receive it.
To wait event on outlet o1, one must use token B.
To wait event on outlet o2, one must use token C.
Similarly to how entities can be connected to outlets with filter on method/precedence, outlets can be subscribed to entities with filter on the event/precedence.
Entities connected to outlets with a filter on method/precedence will only handle invokes of a certain method and/or precedence (see the [Invoke section] of the Adapters page).
Outles subscribed to entities with a filter on event/precedence will only receive publishes of a certain event and/or precedence. This event and precedence filter needs to be set during the call to subscribe_event
. If the outlet is connected on NULL_EVENT, then it will receive publishes of any event, and if the outlet is connected on precedence 0, then it will receive publishes of events on any precedence. Otherwise, it will only receive events of the specified event and events published on the specified precedence.
It is possible for an entity to have mutiple outlets subscribed to it, in which case all the outlets will receive events published on the entity.
When a subscriber consumes an event through a wait_event
call to an outlet however, the outlet removes that event from its queue and send it to the subscriber. That means that if multiple subscribers try to share the same outlet, they will each receive half of the events without order guarantee.
note: it's possible to subscribe an outlet to itself, since outlets are also entities
In practice, it is common to have multiple subscribers be notified for event publication on a single entity.
To achieve that, every subscriber will need its own outlet subscribed to the entity so it can receive the events and queue them up for consumption.
Note that in order for an outlet to receive an event, the authorization the event is published with must be either the authority of the outlet or one of its authorization. See the section about required authorization.
This is a python example of multiple subscribers waiting for events on a single entity:
from threading import Thread
import time
import avesterra as av
av.initialize(server="127.0.0.1", directory="/AvesTerra/Certificates/")
auth = av.AvAuthorization("c08d118a-0ebf-4889-b5de-bbabbf841403")
def wait_two_events(entity: av.AvEntity, id: str):
outlet = av.create_outlet(f"Outlet for subscriber {id}", authorization=auth)
av.subscribe_event(entity, outlet, authorization=auth)
evdata = av.wait_event(outlet, authorization=auth)
print(f"Subscriber {id} received an first event with name {evdata.name}")
evdata = av.wait_event(outlet, authorization=auth)
print(f"Subscriber {id} received an second event with name {evdata.name}")
try:
entity = av.create_entity("My entity", authorization=auth)
print(f"Created entity {entity}")
t1 = Thread(target=wait_two_events, args=(entity, "#1"))
t1.start()
t2 = Thread(target=wait_two_events, args=(entity, "#2"))
t2.start()
t3 = Thread(target=wait_two_events, args=(entity, "#3"))
t3.start()
time.sleep(1) # make sure other threads have time to subscribe
av.publish_event(entity, name="First event", authorization=auth)
time.sleep(1)
av.publish_event(entity, name="Second event", authorization=auth)
time.sleep(0.1)
print("Done")
t1.join()
t2.join()
t3.join()
finally:
av.finalize()
output:
Created entity <0|0|397882>
Subscriber #3 received an first event with name First event
Subscriber #2 received an first event with name First event
Subscriber #1 received an first event with name First event
Subscriber #3 received an second event with name Second event
Subscriber #1 received an second event with name Second event
Subscriber #2 received an second event with name Second event
Done
The subscriptions of an entity can be found in the Subscriptions
field of its metadata:
Here we can see that the outlet <0|0|397893>
is subscribed to this entity with no restriction on the Event nor on the Precedence.
We can see how many events an outlet got queued up by looking at the Pending
field of its metadata
Here it has 42 events pending for a subscriber to receive through a wait_event
call.