I am writing an application for proof of concept purposes (not PoC for Akka Persistence per se) using persistent actors with Cassandra.
I run this on my laptop (8 GB of memory,4 cpus)
I use a parent actor spawning children which are persistent actors. Each persistent actor models a simple aggregate root containing a couple of attributes and a Map of value type objects. The persistent actor is called CustomerActor and the parent actor EntryActor.
Now the application reads data using Slick and creates one EntryActor and sends messages to it. The messages are command style e.g. CreateCustomer and CreateAccount.
What I basically do is load data into memory using persistent actors, then execute some functionality on these.
My problem is that when I create more than small numbers of CustomerActors the system is slow and produces WARN log messages that I don’t know how to interpret correctly (and I have done some searching before posting here).
The log messsages from Cassandra:
cassandra_1 | WARN [Native-Transport-Requests-2] 2019-05-29 19:24:25,392 BatchStatement.java:301 - Batch for [attachments_journal.tag_scanning] is of size 10.547KiB, exceeding specified threshold of 5.000KiB by 5.547KiB.
and
cassandra_1 | WARN [Native-Transport-Requests-11] 2019-05-29 19:24:23,918 NoSpamLogger.java:94 - Unlogged batch covering 500 partitions detected against table [attachments_journal.tag_scanning]. You should use a logged batch for atomicity, or asynchronous writes for performance.
and from Akka:
19:23:08.235UTC WARN a.p.cassandra.journal.TagWriter akka://poc-attachments/system/cassandra-journal/tagWrites/customer - Buffer for tagged events is getting too large (202), is Cassandra responsive? Are writes failing? If events are buffered for longer than the eventual-consistency-delay they won't be picked up by live queries. The oldest event in the buffer is offset: 30758d95-8247-11e9-9f0c-0b5fa4cbc546 (2019-05-29 19:23:07:881)
My application.conf:
akka.persistence.journal.plugin = "cassandra-journal"
akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"
cassandra-journal.contact-points = ["127.0.0.1"]
cassandra-journal.keyspace = attachments_journal
cassandra-snapshot-store.keyspace = attachments__snapshot
cassandra-journal.events-by-tag.max-message-batch-size = 50
akka.actor.default-mailbox.stash-capacity=100
op-rabbit {
topic-exchange-name = "poc.attachments"
channel-dispatcher = "op-rabbit.default-channel-dispatcher"
default-channel-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
connection {
hosts = ["localhost"]
username = "mds"
password = "***"
connection-timeout = 1s
port = 5672
}
}
akka {
loglevel = "DEBUG"
//stdout-loglevel = "INFO"
loggers = ["akka.event.slf4j.Slf4jLogger"]
}
}
poc.attachments {
default_rabbit_concurrency = 1
}
Some code in the persistent actor:
val receiveCommand: Receive = {
case attr : CustomerAttributes => {
log.debug("CustomerActor at {} Received customerattributes command: {}",self.path.name, attr)
persist(Tagged(attr, Set("customer"))) {
attr =>
updateAttributes(attr.payload.asInstanceOf[CustomerAttributes])
}
}
case cce : CustomerCreated => {
if( cce.id % 100 == 0){
log.info("I {} got CustomerCreated message: {}", self.path.name, cce)
}
persistAsync(Tagged(cce, Set("customer"))) {
taggedcce => setInitialState(taggedcce.payload.asInstanceOf[CustomerCreated])
}
}
case acc : AccountCreated => {
log.debug("CustomerActor got accountcreated message: {}", acc)
if(acc.customerId % 100 == 0) {
log.info("Added account {} to customer {}",acc.accountNumber, acc.customerId )
}
persistAsync(Tagged(acc, Set("customer"))) {
taggedacc => addAccount(taggedacc.payload.asInstanceOf[AccountCreated])
}
}
From the application runing db query and sending messages to the EntryActor:
val joinQuery1 = for {
a <- accounts
c <- customers if c.customerType === "F" && c.id < highestCustomerId + 1 && c.country === a.country && c.id === a.customerId
} yield (c, a)
println(s"The join query is : ${joinQuery1.result.statements}")
val accountsAction = joinQuery1.result
val publisher = db.stream(accountsAction)
val fut1 = publisher.foreach
{
it => entryActor ! accountCreatedForVAccountC(it._2)
}
From the EntryActor creating CustomerActor (the persistent actor):
override def receive: Receive = {
case c: CustomerCreated => {
createCustomer(c)
}
def createCustomer(c: CustomerCreated): Unit = {
context.child(CustomerActor.name(c.country, c.id.toString)) match {
case Some(value) => {
log.warning("**************************** Create customer called for already existing customer: %s".format( c))
}
case None => {
val cc = context.actorOf(Props(classOf[CustomerActor], c.country, c.id), CustomerActor.name(c.country, c.id.toString))
customers(CustomerActor.name(c.country, c.id.toString)) = cc
cc ! c
context.watch(cc)
}
}
}
I realize the code is probably not so important but I figured I throw in a bit more rather that a bit too little material.
I’m running Cassandra using Docker image cassandra:3.11