Hello Everyone,
I have an issue since days and I don’t understand the root cause.
I’m using Lagom and here are the relevant versions:
- Lagom 1.6.0
- akka 2.6.9
- akka-http 10.1.11
- akka-persistence-cassandra 0.101
Events are being stored properly in the tables tag_views and messages, but my ReadSideProcessor is not getting them.
I need to restart the service in order for ReadSideProcessor to get the events.
This is a part of my config
cassandra-snapshot-store {
keyspace = feedbackservice
write-consistency = "ONE"
read-consistency = "ONE"
contact-points = [${CASSANDRA_CONTACT_POINT_ONE},${CASSANDRA_CONTACT_POINT_TWO}]
session-provider = akka.persistence.cassandra.ConfigSessionProvider
replication-strategy = "NetworkTopologyStrategy"
data-center-replication-factors = ["DC3-ChatYamo:2", "DC4-ChatYamo:1"]
local-datacenter = ""
local-datacenter = ${?CASSANDRA_LOCAL_DATACENTER}
keyspace-autocreate = false
tables-autocreate = false
}
cassandra-journal {
keyspace = feedbackservice
write-consistency = "ONE"
read-consistency = "ONE"
contact-points = [${CASSANDRA_CONTACT_POINT_ONE},${CASSANDRA_CONTACT_POINT_TWO}]
session-provider = akka.persistence.cassandra.ConfigSessionProvider
replication-strategy = "NetworkTopologyStrategy"
data-center-replication-factors = ["DC3-ChatYamo:2", "DC4-ChatYamo:1"]
local-datacenter = ""
local-datacenter = ${?CASSANDRA_LOCAL_DATACENTER}
keyspace-autocreate = false
tables-autocreate = false
}
lagom {
persistence.read-side.cassandra {
keyspace = feedbackservice
write-consistency = "ONE"
read-consistency = "ONE"
max-result-size = 1000
contact-points = [${CASSANDRA_CONTACT_POINT_ONE},${CASSANDRA_CONTACT_POINT_TWO}]
session-provider = akka.persistence.cassandra.ConfigSessionProvider
replication-strategy = "NetworkTopologyStrategy"
data-center-replication-factors = ["DC3-ChatYamo:2", "DC4-ChatYamo:1"]
local-datacenter = ""
local-datacenter = ${?CASSANDRA_LOCAL_DATACENTER}
keyspace-autocreate = false
tables-autocreate = false
}
# Circuit breaker settings - add or update
circuit-breaker {
default {
max-failures = 5
call-timeout = 20s
reset-timeout = 60s
}
}
cluster.exit-jvm-when-system-terminated = on
}
is there something i’m missing or doing wrong? Can someone please help?
Adding more context:
We have an event class CaseEvent with two shards. When we start the service, the two shards work correctly at reading new events and send it to the ReadSideProcessor.
But after an hour or so, one of the shard (handled by EventByTagStage) stops working (the ReadSideProcessor it not receiving any events). To fix the issue, we have to restart the service or implement a logic to restart all worker of the registered projection.
When we restart the projection (CaseEventProcessor), here are logs that we can see
2025-05-23T10:42:53.258Z [info] com.ndolo.feedback.monitor.ReadSideRestarter [] - Found projections to restart: kafkaProducer-NewFeedbackEventV1Topic, kafkaProducer-UserReportedTopic, CaseEventProcessor
2025-05-23T10:42:53.258Z [info] com.ndolo.feedback.monitor.ReadSideRestarter [] - Found projections to restart: kafkaProducer-NewFeedbackEventV1Topic, kafkaProducer-UserReportedTopic, CaseEventProcessor
2025-05-23T10:42:53.258Z [info] com.ndolo.feedback.monitor.ReadSideRestarter [] - Stopping projection: kafkaProducer-NewFeedbackEventV1Topic
2025-05-23T10:42:53.258Z [info] com.ndolo.feedback.monitor.ReadSideRestarter [] - Stopping projection: CaseEventProcessor
2025-05-23T10:42:53.468Z [info] com.ndolo.instrumentation.EventProcessingHeartbeatMonitor [] - event_processing_silence_duration|tag=ReportEvent0|duration_ms=12760414
2025-05-23T10:42:53.468Z [info] com.ndolo.instrumentation.EventProcessingHeartbeatMonitor [] - event_processing_silence_duration|tag=ReportEvent1|duration_ms=20086060
2025-05-23T10:43:08.258Z [info] com.ndolo.feedback.monitor.ReadSideRestarter [] - Starting projection: kafkaProducer-NewFeedbackEventV1Topic
2025-05-23T10:43:08.258Z [info] com.ndolo.instrumentation.EventProcessingHeartbeatMonitor [] - event_processing_silence_duration|tag=CaseEntityEvent1|duration_ms=-15001
2025-05-23T10:43:08.258Z [info] com.ndolo.feedback.monitor.ReadSideRestarter [] - Starting projection: CaseEventProcessor
2025-05-23T10:43:08.259Z [info] com.ndolo.instrumentation.EventProcessingHeartbeatMonitor [] - event_processing_silence_duration|tag=CaseEntityEvent1|duration_ms=-15001
2025-05-23T10:43:08.417Z [info] akka.persistence.cassandra.query.EventsByTagStage [akkaAddress=akka://feedbackservice@192.168.84.36:25520, sourceThread=feedbackservice-akka.actor.default-dispatcher-10, akkaSource=EventsByTagStage(akka://feedbackservice), sourceActorSystem=feedbackservice, akkaTimestamp=10:43:08.417UTC] - [d5a9c184-b12a-4421-a936-372d571da85d]: EventsByTag query [CaseEntityEvent0] starting with EC delay 2000ms: fromOffset [24cbaa50-37b9-11f0-b342-b5f41f0f03eb (2025-05-23 09:34:37:429)] toOffset [None]
2025-05-23T10:43:08.420Z [info] akka.persistence.cassandra.query.EventsByTagStage [akkaAddress=akka://feedbackservice@192.168.84.36:25520, sourceThread=feedbackservice-akka.actor.default-dispatcher-6, akkaSource=EventsByTagStage(akka://feedbackservice), sourceActorSystem=feedbackservice, akkaTimestamp=10:43:08.420UTC] - [787d70d2-f155-4c18-bda6-342c0e3b18f2]: EventsByTag query [CaseEntityEvent1] starting with EC delay 2000ms: fromOffset [6ed67030-37b9-11f0-b342-b5f41f0f03eb (2025-05-23 09:36:41:651)] toOffset [None]
2025-05-23T10:43:08.426Z [info] com.ndolo.instrumentation.EventProcessingHeartbeatMonitor [] - event_processing_heartbeat|tag=CaseEntityEvent0|timestamp=1747996988426
2025-05-23T10:43:08.427Z [info] com.ndolo.instrumentation.EventProcessingHeartbeatMonitor [] - event_processing_heartbeat|tag=CaseEntityEvent1|timestamp=1747996988427
But there are unfortunately no logs indicating any issue with the EventByTagStage. I start to think that there might be a race condition somewhere in EventByTagStage.
Also this start happening after we migrated from Lagom 1.5.5 to Lagom 1.6.0.