I’m having trouble updating the Read Side Cassandra table. When I send a request to my service using curl, everything goes through and I get the expected response from the service. However, I can see in Cassandra that, although the events are being stored, the Read Side tables aren’t being updated.
The method processUserAdded is responsible for binding values and I know that this method is executed because I can see the logger’s output in the console. I have also checked my create database method and associated data types many times and they seem to be fine.
Here is the implementation of my ReadSide. Any help or tips on how to resolve/diagnose the problem would be much appreciated.
Thanks
private[impl] class UserEventProcessor(session: CassandraSession, readSide: CassandraReadSide)(implicit ec: ExecutionContext)
extends ReadSideProcessor[UserEvent] {
private val writeUserPromise = Promise[PreparedStatement] // initialized in prepare
private def writeUser: Future[PreparedStatement] = writeUserPromise.future
private val updateUserStatus = Promise[PreparedStatement] // initialized in prepare
private def updateStatus: Future[PreparedStatement] = updateUserStatus.future
override def aggregateTags: Set[AggregateEventTag[UserEvent]] =
UserEvent.Tag.allTags
override def buildHandler = {
readSide.builder[UserEvent](UserRepository.userEventOffset)
.setGlobalPrepare(createTables)
.setPrepare(_ => prepareWriteUser)
.setPrepare(- => prepareUpdateStatus)// this can be useful but leave for later
.setEventHandler[InsuranceStatusUpdated](processStatusUpdated)
.setEventHandler[UserCreated](processUserAdded)
.build
}
private def prepareWriteUser(): Future[Done] = {
val f = session.prepare("INSERT INTO user_main " +
"(id, addedon, email, fname, lname, status) " +
"VALUES (?, ?, ?, ?, ?, ?)")
writeUserPromise.completeWith(f)
f.map(_ => Done)
}
def processStatusUpdated(eventElement: EventStreamElement[InsuranceStatusUpdated]): Future[List[BoundStatement]] = {
updateStatus.map { ps =>
val bindUpdateStatus = ps.bind()
bindUpdateStatus.setString("status", eventElement.event.status.toString)
bindUpdateStatus.setUUID("id", UUID.fromString(eventElement.entityId))
List(bindUpdateStatus)
}
private def processUserAdded(eventElement: EventStreamElement[UserCreated]): Future[List[BoundStatement]] = {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneOffset.UTC)
LoggerFactory.getLogger("UserAdd").info("In process UserAdded")
writeUser.map { ps =>
val bindWriteUser = ps.bind()
bindWriteUser.setUUID("id", eventElement.event.user.id)
bindWriteUser.setString("email", eventElement.event.user.email)
bindWriteUser.setString("addedon", formatter.format(eventElement.event.user.addedon))
bindWriteUser.setString("fname", eventElement.event.user.fname)
bindWriteUser.setString("lname", eventElement.event.user.lname)
bindWriteUser.setString("status", eventElement.event.user.status.toString)
List(bindWriteUser)
}
}
private def createTables() = for {
_ <- session.executeCreateTable("CREATE TABLE IF NOT EXISTS user_main ( " +
"id UUID, addedon text , email TEXT,fname TEXT, lname TEXT, status TEXT, PRIMARY KEY (id))")
_ <- session.executeCreateTable(
"CREATE INDEX IF NOT EXISTS user_email_idx ON user_main (email)") // HH note secondary indexes are local so may have to go through many nodes before findding - bettter to use materialized views
// consider using materialized view instead
} yield Done
}