The doc on defining a read side processor, uses a hypothetical MyDatabase
trait to explain important guidelines:
trait MyDatabase { ... /** * Load the offset of the last event processed. */ def loadOffset(tag: AggregateEventTag[BlogEvent]): Future[Offset] ... }
My question is about the guidelines for the loadOffset
method:
The
loadOffset
method reads the lastOffset
that was processed by this read side processor for the particular tag. Typically this will be stored in a table that has the tag name and the eventProcessorId as a compound primary key. Offsets come in two varieties, aakka.persistence.query.Sequence
offset represented using along
, and aakka.persistence.query.TimeBasedUUID
offset represented using aUUID
. Your database will need to be able to persist both of these types. If there is no offset stored for a particular tag, such as when the processor runs for the very first time, then you can returnakka.persistence.query.NoOffset
.
Where does the eventProcessorId
come from?
It would be very helpful to have an non-trivial example of this MyDatabase
.
I found in the source a trivial implementation here:
object MyDatabase extends MyDatabase { def createTables(): Future[Done] = Future.successful(Done) def loadOffset(tag: AggregateEventTag[BlogEvent]): Future[Offset] = Future.successful(NoOffset) def handleEvent(event: BlogEvent, offset: Offset): Future[Done] = Future.successful(Done) }
Unfortunately, this trivial implementation doesn’t shed any light on where the eventProcessorId
comes from.
If we’re sharding events, then we will have multiple instances of the same read-side processor. I makes sense for each to store their own offset but it is unclear where the eventProcessorId
comes from in the sharded case.
- Nicolas.