stevenmhl
(Steve H.)
April 24, 2020, 10:07pm
1
Hi …
I’m having an issue where a cassandra read side view is not sending events to the event handler method. I cannot determine why.
A subscriber to the service message topic does receive messages generated from events. However, these events never reach the read side event handler method, even though the offsets for the view are recorded in the offset store.
I’ve pushed an example project to github that recreates this condition.
Can someone help me understand what I’ve done wrong here?
Thank you!
–Steve
stevenmhl
(Steve H.)
April 29, 2020, 12:54pm
2
Hello All …
I’ve gone through this several times comparing to any documentation or examples I could find. Clearly, I am missing something. Can someone provide any hints as to what is wrong?
Thank you!
–Steve
stevenmhl
(Steve H.)
April 30, 2020, 12:28pm
3
Tenacity pays off!!! Drilling down into Lagom code, CassandraReadSideImpl
builds a Map of handlers that is keyed by event ClassTag
.
override def setPrepare(callback: (AggregateEventTag[Event]) => Future[Done]): ReadSideHandlerBuilder[Event] = {
prepareCallback = callback
this
}
override def setEventHandler[E <: Event: ClassTag](
handler: EventStreamElement[E] => Future[immutable.Seq[BoundStatement]]
): ReadSideHandlerBuilder[Event] = {
val eventClass = implicitly[ClassTag[E]].runtimeClass.asInstanceOf[Class[Event]]
handlers += (eventClass -> handler.asInstanceOf[Handler[Event]])
this
}
override def build(): ReadSideHandler[Event] = {
new CassandraAutoReadSideHandler[Event](
session,
offsetStore,
handlers,
globalPrepareCallback,
prepareCallback,
So, my original code matches nothing:
override def buildHandler(): ReadSideProcessor.ReadSideHandler[Event] =
readSideProcessor.builder[Event]("processes")
.setGlobalPrepare(() => createTable())
.setPrepare(_ => prepare())
.setEventHandler[Event](processEvent _)
.build()
The solution is to match each derived Event class:
override def buildHandler(): ReadSideProcessor.ReadSideHandler[Event] =
readSideProcessor.builder[Event]("processes")
.setGlobalPrepare(() => createTable())
.setPrepare(_ => prepare())
.setEventHandler[event.Added](processAddedEvent _)
.setEventHandler[event.Dropped](processDroppedEvent _)
.setEventHandler[event.Updated](processUpdatedEvent _)
.build()
1 Like
ignasi35
(Ignasi Marimon-Clos)
April 30, 2020, 12:55pm
4
Yeah, the log at debug
level is probably not enough to help diagnose this issue:
Flow[EventStreamElement[Event]]
.mapAsync(parallelism = 1) { elem =>
val eventClass = elem.event.getClass
val handler =
handlers.getOrElse(
// lookup handler
eventClass,
// fallback to empty handler if none
{
if (log.isDebugEnabled()) log.debug("Unhandled event [{}]", eventClass.getName)
CassandraAutoReadSideHandler.emptyHandler.asInstanceOf[Handler]
}
)
invoke(handler, elem).flatMap(executeStatements)
}
.withAttributes(ActorAttributes.dispatcher(dispatcher))
}
}
How do you think this could be improved?
stevenmhl
(Steve H.)
April 30, 2020, 1:39pm
5
Maybe change the debug message to:
“Unhandled event [{}]. Note: A handler must be set via ‘setEventHandler’ for each derived Event class.”
It seems a bit kludgy, but it may be the easiest way.