Topic Subscriber extreme delay in receiving messages in Development

So, I’ve been working on an application that I have two services where ServiceA has some EventSourcedBehaviorWithEnforcedReplies and has a Topic that is sourced off events from said entity. ServiceB uses the subscription to react to those messages, as suggested by both the lagom documentation using Message Broker API, and also from the ShoppingCart Java Example.

My problem is this: On startup, the initial topic subscriber does not receive the events until a significant delay (several minutes) when running the application in development (using mvn lagom:runAll).

This is reproducible with the lagom sample repository for the ShoppingCart demo, and putting a breakpoint for the InventoryServiceImpl’s subscriber, it took maybe about 7 minutes before the First message after start up and sending a rest call to the shopping cart service for the event to be received! However frustratingly, after the initial delay, messages are received within a second.

I’ve searched for any signs of a configuration option that this can be explained, but I cannot explain why there’s such a significant initial delay???

This discussion is now on Undocumented behavior of Topic Subscription with ShoppingCart-Java · Issue #205 · lagom/lagom-samples · GitHub

Please, don’t comment here.

Sorry, so the discussion I think can be brought back here:

In testing the shopping-cart sample, it does seem that kafka messages are pretty instantaneous, however, my application seems to be plagued by the issue with akka-typed issue that leads to the EventsByTagStage to start at the default beginning (2016-02-25 it seems):

2:06:41.427 INFO akka.persistence.cassandra.query.EventsByTagStage [{akkaAddress=akka://systemofadownload-versions-impl-application@127.0.0.1:62958, akkaSource=EventsByTagStage(akka://systemofadownload-versions-impl-application), akkaTimestamp=05:06:41.426UTC, sourceActorSystem=systemofadownload-versions-impl-application, sourceThread=systemofadownload-versions-impl-application-akka.actor.default-dispatcher-33}] - [4e24c6f2-7621-431a-81b9-cf6f5a663e92]: EventsByTag query [org.spongepowered.downloads.versions.api.event.VersionedArtifactEvent9] starting with EC delay 5000ms: fromOffset [b67b4000-db52-11e5-8080-808080808080 (2016-02-25 00:00:00:000)] toOffset [None]
22:06:41.427 INFO akka.persistence.cassandra.query.EventsByTagStage [{akkaAddress=akka://systemofadownload-versions-impl-application@127.0.0.1:62958, akkaSource=EventsByTagStage(akka://systemofadownload-versions-impl-application), akkaTimestamp=05:06:41.427UTC, sourceActorSystem=systemofadownload-versions-impl-application, sourceThread=systemofadownload-versions-impl-application-akka.actor.default-dispatcher-35}] - [80a96a19-6983-4894-b192-fa5fc73c1e5e]: EventsByTag query [org.spongepowered.downloads.versions.api.event.VersionedArtifactEvent6] starting with EC delay 5000ms: fromOffset [b67b4000-db52-11e5-8080-808080808080 (2016-02-25 00:00:00:000)] toOffset [None]
22:06:41.430 INFO akka.persistence.cassandra.query.EventsByTagStage [{akkaAddress=akka://systemofadownload-versions-impl-application@127.0.0.1:62958, akkaSource=EventsByTagStage(akka://systemofadownload-versions-impl-application), akkaTimestamp=05:06:41.429UTC, sourceActorSystem=systemofadownload-versions-impl-application, sourceThread=systemofadownload-versions-impl-application-akka.actor.default-dispatcher-7}] - [2fe0be83-bb94-43bf-9fbf-f24e0b32fac6]: EventsByTag query [org.spongepowered.downloads.versions.api.event.VersionedArtifactEvent1] starting with EC delay 5000ms: fromOffset [b67b4000-db52-11e5-8080-808080808080 (2016-02-25 00:00:00:000)] toOffset [None]
22:06:41.431 INFO akka.persistence.cassandra.query.EventsByTagStage [{akkaAddress=akka://systemofadownload-versions-impl-application@127.0.0.1:62958, akkaSource=EventsByTagStage(akka://systemofadownload-versions-impl-application), akkaTimestamp=05:06:41.430UTC, sourceActorSystem=systemofadownload-versions-impl-application, sourceThread=systemofadownload-versions-impl-application-akka.actor.default-dispatcher-34}] - [c7c94fca-f9c8-4a0d-ab12-bdcfceb51050]: EventsByTag query [org.spongepowered.downloads.versions.api.event.VersionedArtifactEvent2] starting with EC delay 5000ms: fromOffset [b67b4000-db52-11e5-8080-808080808080 (2016-02-25 00:00:00:000)] toOffset [None]
22:06:41.433 INFO akka.persistence.cassandra.query.EventsByTagStage [{akkaAddress=akka://systemofadownload-versions-impl-application@127.0.0.1:62958, akkaSource=EventsByTagStage(akka://systemofadownload-versions-impl-application), akkaTimestamp=05:06:41.432UTC, sourceActorSystem=systemofadownload-versions-impl-application, sourceThread=systemofadownload-versions-impl-application-akka.actor.default-dispatcher-18}] - [63ef02f9-398b-4ba4-b06f-ae25bd398300]: EventsByTag query [org.spongepowered.downloads.versions.api.event.VersionedArtifactEvent5] starting with EC delay 5000ms: fromOffset [b67b4000-db52-11e5-8080-808080808080 (2016-02-25 00:00:00:000)] toOffset [None]
22:06:41.434 INFO akka.persistence.cassandra.query.EventsByTagStage [{akkaAddress=akka://systemofadownload-versions-impl-application@127.0.0.1:62958, akkaSource=EventsByTagStage(akka://systemofadownload-versions-impl-application), akkaTimestamp=05:06:41.434UTC, sourceActorSystem=systemofadownload-versions-impl-application, sourceThread=systemofadownload-versions-impl-application-akka.actor.default-dispatcher-30}] - [b7991953-26d3-4549-a08d-69b09b425176]: EventsByTag query [org.spongepowered.downloads.versions.api.event.VersionedArtifactEvent4] starting with EC delay 5000ms: fromOffset [b67b4000-db52-11e5-8080-808080808080 (2016-02-25 00:00:00:000)] toOffset [None]
22:06:41.438 INFO akka.persistence.cassandra.query.EventsByTagStage [{akkaAddress=akka://systemofadownload-versions-impl-application@127.0.0.1:62958, akkaSource=EventsByTagStage(akka://systemofadownload-versions-impl-application), akkaTimestamp=05:06:41.436UTC, sourceActorSystem=systemofadownload-versions-impl-application, sourceThread=systemofadownload-versions-impl-application-akka.actor.default-dispatcher-20}] - [eabd8cc3-8c25-4e2b-a949-960953fa60ce]: EventsByTag query [org.spongepowered.downloads.versions.api.event.VersionedArtifactEvent7] starting with EC delay 5000ms: fromOffset [b67b4000-db52-11e5-8080-808080808080 (2016-02-25 00:00:00:000)] toOffset [None]
22:06:41.441 INFO akka.persistence.cassandra.query.EventsByTagStage [{akkaAddress=akka://systemofadownload-versions-impl-application@127.0.0.1:62958, akkaSource=EventsByTagStage(akka://systemofadownload-versions-impl-application), akkaTimestamp=05:06:41.441UTC, sourceActorSystem=systemofadownload-versions-impl-application, sourceThread=systemofadownload-versions-impl-application-akka.actor.default-dispatcher-31}] - [ce3a7e7f-2bfc-4602-b1e5-ba0a22665737]: EventsByTag query [org.spongepowered.downloads.versions.api.event.VersionedArtifactEvent8] starting with EC delay 5000ms: fromOffset [b67b4000-db52-11e5-8080-808080808080 (2016-02-25 00:00:00:000)] toOffset [None]
22:06:41.444 INFO akka.persistence.cassandra.query.EventsByTagStage [{akkaAddress=akka://systemofadownload-versions-impl-application@127.0.0.1:62958, akkaSource=EventsByTagStage(akka://systemofadownload-versions-impl-application), akkaTimestamp=05:06:41.444UTC, sourceActorSystem=systemofadownload-versions-impl-application, sourceThread=systemofadownload-versions-impl-application-akka.actor.default-dispatcher-32}] - [8a184bd8-1f37-4084-902a-d92093f3f988]: EventsByTag query [org.spongepowered.downloads.versions.api.event.VersionedArtifactEvent0] starting with EC delay 5000ms: fromOffset [b67b4000-db52-11e5-8080-808080808080 (2016-02-25 00:00:00:000)] toOffset [None]
22:06:41.445 INFO akka.persistence.cassandra.query.EventsByTagStage [{akkaAddress=akka://systemofadownload-versions-impl-application@127.0.0.1:62958, akkaSource=EventsByTagStage(akka://systemofadownload-versions-impl-application), akkaTimestamp=05:06:41.444UTC, sourceActorSystem=systemofadownload-versions-impl-application, sourceThread=systemofadownload-versions-impl-application-akka.actor.default-dispatcher-19}] - [d3533546-8acc-49f5-ae16-7ea4b5efb31f]: EventsByTag query [org.spongepowered.downloads.versions.api.event.VersionedArtifactEvent3] starting with EC delay 5000ms: fromOffset [b67b4000-db52-11e5-8080-808080808080 (2016-02-25 00:00:00:000)] toOffset [None]

The problem that the issue has laid out is that the akka-typed docs do mention configuring the initial offset with the following in the application.conf (I re-tested today with today’s date):

akka.persistence.cassandra.events-by-tag.first-time-bucket = "20210714T00:00"

but still, with that line in all of my application.conf's I can’t seem to get the EventsByTagStage to start at an appropriate time.

TLDR: Lagom cassandra-core ships an override and changes the path of the configuration

The solution:

cassandra-query-journal {
  first-time-bucket = "20210721T00:00"
}

Spent some time investigating other aspects about setting up a lagom environment, and discovered that while the akka.persistence.cassandra.events-by-tag.first-time-bucket is valid in a pure akka world, Lagom registers some overrides, specifically in the overrides configuration file shipped by lagom-persistence-cassandra-core

Having tested out the mixed-persistence-java-sample, I was witnessing the same behavior, that the EventsByTagStage would start with an offset of 2015..., so I knew that it was nothing with my setup… Managed to discover after clicking around in IntelliJ to “find by usages” and discovered the particular override file! Huzzah, reloaded the application in local dev, and sure enough, the events-by-tag query started at a more reasonable offset!!

17:59:10.504 [info] akka.persistence.cassandra.query.EventsByTagStage [akkaAddress=akka://hello-impl-application@127.0.0.1:61825, sourceThread=hello-impl-application-lagom.persistence.dispatcher-38, akkaSource=EventsByTagStage(akka://hello-impl-application), sourceActorSystem=hello-impl-application, akkaTimestamp=22:59:10.504UTC] - [38060ffc-816c-44c2-a2f2-8dd2a867fe27]: EventsByTag query [com.lightbend.lagom.samples.mixedpersistence.hello.impl.entity.HelloEvent3] starting with EC delay 5000ms: fromOffset [97800000-e9b6-11eb-8080-808080808080 (2021-07-21 00:00:00:000)] toOffset [None]
17:59:10.509 [info] akka.persistence.cassandra.query.EventsByTagStage [akkaAddress=akka://hello-impl-application@127.0.0.1:61825, sourceThread=hello-impl-application-lagom.persistence.dispatcher-37, akkaSource=EventsByTagStage(akka://hello-impl-application), sourceActorSystem=hello-impl-application, akkaTimestamp=22:59:10.504UTC] - [a6f1ada5-daec-42ba-85e2-c63a27a32f01]: EventsByTag query [com.lightbend.lagom.samples.mixedpersistence.hello.impl.entity.HelloEvent0] starting with EC delay 5000ms: fromOffset [97800000-e9b6-11eb-8080-808080808080 (2021-07-21 00:00:00:000)] toOffset [None]
17:59:10.510 [info] akka.persistence.cassandra.query.EventsByTagStage [akkaAddress=akka://hello-impl-application@127.0.0.1:61825, sourceThread=hello-impl-application-lagom.persistence.dispatcher-33, akkaSource=EventsByTagStage(akka://hello-impl-application), sourceActorSystem=hello-impl-application, akkaTimestamp=22:59:10.504UTC] - [a023ef71-b0be-480b-8c30-134109963906]: EventsByTag query [com.lightbend.lagom.samples.mixedpersistence.hello.impl.entity.HelloEvent2] starting with EC delay 5000ms: fromOffset [97800000-e9b6-11eb-8080-808080808080 (2021-07-21 00:00:00:000)] toOffset [None]
17:59:10.511 [info] akka.persistence.cassandra.query.EventsByTagStage [akkaAddress=akka://hello-impl-application@127.0.0.1:61825, sourceThread=hello-impl-application-lagom.persistence.dispatcher-45, akkaSource=EventsByTagStage(akka://hello-impl-application), sourceActorSystem=hello-impl-application, akkaTimestamp=22:59:10.504UTC] - [88ab8646-e375-4dc1-9cda-8aed828681f0]: EventsByTag query [com.lightbend.lagom.samples.mixedpersistence.hello.impl.entity.HelloEvent1] starting with EC delay 5000ms: fromOffset [97800000-e9b6-11eb-8080-808080808080 (2021-07-21 00:00:00:000)] toOffset [None]