How to persist to Cassandra and publish to Kafka multiple events

There is application on akka 2.5.29 , that has domain actors (extends some aggregate root actor) that can handle user commands, produce 0 to many events, and call processEvents(events) method of AggregateRootActor, which effectively is implemented as persistent actor:

abstract class AggregateRootActor () extends ActorPersistence

We need to process events by persisting to Cassandra journal and then publish to Kafka, only after ALL events were stored and published, some user handler should be called ( for let’s say to return response to client from actor).
How can it be designed?

Provided that persistAll() internally stashes all messages we can send some EventProcessingCompleted message to self and after that call the handler(). At that time all messages will be already stored.
But we have also publish it to Kafka ,so things get more complex…

Some silly design could be as follows:

def processEvents(events: Seq[Event])(handler: Event => Unit): Unit = {
    persistAll(events) { persistedEvent =>
      state = //updateActorState

      publishToKafka(persistedEvent) { publishedEvent =>
        context.become {
          case EventProcessingCompleted => handler(publishedEvent)
          case ... // updateReceive
        }
      }
    }
    self ! EventProcessingCompleted
  }

Any ideas are welcome!

2 Likes

The callback in PersistentActor will be invoked for each of the events, so you have to count and act on the last one. EventSourcedBehavior has a better API for this.

Another thing to be aware of, you can’t have atomic transaction between Cassandra and Kafka. One of the operations may be successful but not the other.

Typically, publishing to Kafka would be done with a read-side projection. You can look into the new project Akka Procections and there is a separate section about sending to Kafka https://doc.akka.io/docs/akka-projection/current/kafka.html#sending-to-kafka

2 Likes