Hello,
I have a question about the delay from when the persistent entity (typed actor) persists an event to when the source provider and shared daemon process projection. I have a simple service that takes a Create command and emits a Created event. after about 6 to 10 seconds the projection will emit its logs for persisting the event to the read side (in this case postgresql).
My source provider is using CassandraReadJournal as I am using WriteSideCassandraPersistenceComponents for entity persistence. My projection is a SlickProject using exactly once.
def sourceProvider(tag: String, system: ActorSystem[_]): SourceProvider[Offset, EventEnvelope[Post.Event]] =
EventSourcedProvider
.eventsByTag[Post.Event](
system = system,
readJournalPluginId = CassandraReadJournal.Identifier,
tag = tag
)
def projection(tag: String): ExactlyOnceProjection[Offset, EventEnvelope[Event]] =
SlickProjection
.exactlyOnce[Offset, EventEnvelope[Post.Event], PostgresProfile](
projectionId = ProjectionId("Posts", tag),
sourceProvider(tag, actorSystem.toTyped),
dbConfig,
handler = () => new PostProjectionProcessor(postRepository)
)
ShardedDaemonProcess(actorSystem.toTyped)
.init[ProjectionBehavior.Command](
name = "posts",
numberOfInstances = Post.Event.tags.size,
behaviorFactory = (i: Int) => ProjectionBehavior(projection(Event.tags(i))),
stopMessage = ProjectionBehavior.Stop
)
The logs show:
[2021-06-29 16:36:24,487] [INFO] [post] [] [posts-impl-application-akka.actor.default-dispatcher-31] - Entity received Create command. MDC: {persistencePhase=replay-evt, akkaAddress=akka://posts-impl-application@127.0.0.1:46031, akkaSource=akka://posts-impl-application/system/sharding/Post/946/a4b21293-4e43-47e4-b34b-336b118d0f8c, sourceActorSystem=posts-impl-application, persistenceId=Post|a4b21293-4e43-47e4-b34b-336b118d0f8c}
[2021-06-29 16:36:24,502] [INFO] [post] [] [posts-impl-application-akka.actor.default-dispatcher-31] - Entity applying Event. MDC: {persistencePhase=running-cmd, akkaAddress=akka://posts-impl-application@127.0.0.1:46031, persistenceId=Post|a4b21293-4e43-47e4-b34b-336b118d0f8c, akkaSource=akka://posts-impl-application/system/sharding/Post/946/a4b21293-4e43-47e4-b34b-336b118d0f8c, sourceActorSystem=posts-impl-application}
[2021-06-29 16:36:35,407] [INFO] [org.ygoty.posts.impl.readside.PostProjectionProcessor] [] [posts-impl-application-akka.actor.default-dispatcher-27] - Created(Posted(a4b21293-4e43-47e4-b34b-336b118d0f8c,5c6ae850-977f-43e2-aac3-f8ae52ede4bf,testing the body,None,2021-06-29T16:36:23.888025Z,None)) MDC: {}
[2021-06-29 16:36:35,408] [INFO] [org.ygoty.posts.impl.readside.PostProjectionProcessor] [] [posts-impl-application-akka.actor.default-dispatcher-27] - Creating post -> Posted(a4b21293-4e43-47e4-b34b-336b118d0f8c,5c6ae850-977f-43e2-aac3-f8ae52ede4bf,testing the body,None,2021-06-29T16:36:23.888025Z,None) MDC: {}
application.conf files is:
akka {
loglevel="INFO"
actor {
serialization-bindings {
"org.ygoty.posts.impl.entities.Post$CborSerializer" = jackson-cbor
}
enable-additional-serialization-bindings = on
}
cluster {
sharding {
state-store-mode = ddata
}
}
extensions = [akka.persistence.Persistence]
# use Cassandra to store both snapshots and the events of the persistent actors
persistence {
journal {
plugin = cassandra-journal
}
snapshot-store {
plugin = cassandra-snapshot-store
}
}
projection.slick {
profile = "slick.jdbc.PostgresProfile$"
# add here your Slick db settings
db {
url = "jdbc:postgresql://localhost:5432/post"
driver = "org.postgresql.Driver"
# connectionPool = disabled
keepAliveConnection = on
user = "postgres"
password = "postgres"
}
offset-store {
# set this to your database schema if applicable, empty by default
schema = ""
# the database table name for the offset store
table = "akka_projection_offset_store"
# the database table name for the projection manangement data
management-table = "akka_projection_management"
# Use lowercase table and column names.
# This is mostly useful for H2 and Postgres databases. MySQL and SQL Server are case insensitive.
# Oracle schema is case sensitive and is defined with uppercase, this property is therefore ignore when using Oracle
use-lowercase-schema = true
}
}
}
post-service.cassandra.keyspace = "postservice"
cassandra-journal.keyspace = ${post-service.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${post-service.cassandra.keyspace}
cassandra-query-journal.eventual-consistency-delay = 200ms
cassandra-query-journal.delayed-event-timeout = 30s
cassandra-query-journal.eventual-consistency-delay = 1s
play {
application {
loader = org.ygoty.posts.impl.PostApplicationLoader
}
server.pidfile.path=/dev/null
}
I have been trying to find documentation that clarifies how to configure the read side eventual consistency delay. I also could not find the reference.conf values under projections/akka.persistence.
How can I improve the response time of the projection?
Kindly,
Robert