There is application on akka 2.5.29 , that has domain actors (extends some aggregate root actor) that can handle user commands, produce 0 to many events, and call processEvents(events)
method of AggregateRootActor
, which effectively is implemented as persistent actor:
abstract class AggregateRootActor () extends ActorPersistence
We need to process events by persisting to Cassandra journal and then publish to Kafka, only after ALL events were stored and published, some user handler should be called ( for let’s say to return response to client from actor).
How can it be designed?
Provided that persistAll()
internally stashes all messages we can send some EventProcessingCompleted
message to self and after that call the handler()
. At that time all messages will be already stored.
But we have also publish it to Kafka ,so things get more complex…
Some silly design could be as follows:
def processEvents(events: Seq[Event])(handler: Event => Unit): Unit = {
persistAll(events) { persistedEvent =>
state = //updateActorState
publishToKafka(persistedEvent) { publishedEvent =>
context.become {
case EventProcessingCompleted => handler(publishedEvent)
case ... // updateReceive
}
}
}
self ! EventProcessingCompleted
}
Any ideas are welcome!