Hi all
I am trying to test actor, that is using Akka Typed Persistence. The scenario is, the database is offline and it should send a notification to another actor, that the database is not available.
During the creation of child actor, that will persist the data to the database, it throws the following error:
akka.actor.ActorInitializationException: akka://StoreTestOffline/user/OfflineStoreActor/StoreChilid: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:202)
at akka.actor.ActorCell.create(ActorCell.scala:696)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:547)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:569)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:293)
at akka.dispatch.Mailbox.run(Mailbox.scala:228)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Default journal plugin is not configured, see 'reference.conf'
at akka.persistence.Persistence$.verifyPluginConfigIsDefined(Persistence.scala:193)
at akka.persistence.typed.internal.EventSourcedSettings$.defaultJournalPluginId$1(EventSourcedSettings.scala:57)
at akka.persistence.typed.internal.EventSourcedSettings$.journalConfigFor(EventSourcedSettings.scala:61)
at akka.persistence.typed.internal.EventSourcedSettings$.apply(EventSourcedSettings.scala:39)
at akka.persistence.typed.internal.EventSourcedSettings$.apply(EventSourcedSettings.scala:22)
at akka.persistence.typed.internal.EventSourcedBehaviorImpl.apply(EventSourcedBehaviorImpl.scala:83)
at akka.actor.typed.Behavior$.start(Behavior.scala:331)
at akka.actor.typed.internal.InterceptorImpl$anon$1.start(InterceptorImpl.scala:45)
at akka.actor.typed.internal.AbstractSupervisor.aroundStart(Supervision.scala:72)
at akka.actor.typed.internal.InterceptorImpl.preStart(InterceptorImpl.scala:68)
at akka.actor.typed.internal.InterceptorImpl$.$anonfun$apply$1(InterceptorImpl.scala:25)
at akka.actor.typed.Behavior$DeferredBehavior$anon$1.apply(Behavior.scala:264)
at akka.actor.typed.Behavior$.start(Behavior.scala:331)
at akka.actor.typed.internal.adapter.ActorAdapter.preStart(ActorAdapter.scala:238)
at akka.actor.Actor.aroundPreStart(Actor.scala:550)
at akka.actor.Actor.aroundPreStart$(Actor.scala:550)
at akka.actor.typed.internal.adapter.ActorAdapter.aroundPreStart(ActorAdapter.scala:51)
at akka.actor.ActorCell.create(ActorCell.scala:676)
... 9 more
[ERROR] [08/30/2019 12:28:15.391] [StoreTestOffline-akka.actor.default-dispatcher-6] [akka://StoreTestOffline/user] death pact with Actor[akka://StoreTestOffline/user/OfflineStoreActor/StoreChilid#-1253371311] was triggered
akka.actor.typed.DeathPactException: death pact with Actor[akka://StoreTestOffline/user/OfflineStoreActor/StoreChilid#-1253371311] was triggered
at akka.actor.typed.Behavior$.interpretSignal(Behavior.scala:402)
at akka.actor.typed.internal.adapter.ActorAdapter.handleSignal(ActorAdapter.scala:128)
at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:87)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
at akka.actor.dungeon.DeathWatch.$anonfun$receivedTerminated$1(DeathWatch.scala:67)
at akka.actor.dungeon.DeathWatch.$anonfun$receivedTerminated$1$adapted(DeathWatch.scala:65)
at scala.Option.foreach(Option.scala:438)
at akka.actor.dungeon.DeathWatch.receivedTerminated(DeathWatch.scala:65)
at akka.actor.dungeon.DeathWatch.receivedTerminated$(DeathWatch.scala:64)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:447)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:597)
at akka.actor.ActorCell.invoke(ActorCell.scala:580)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
The way, how I spawn the child is:
object MessageSupervisorSpec {
def create(communicator: Option[ActorRef[RpcCmd]], logger: Option[ActorRef[LogCmd]]): Behavior[MessageCmd] =
Behaviors.setup { context =>
context.log.info("=============> Start MessageSupervisorSpec <=============")
val fault = Behaviors
.supervise(Persistence.create(communicator, logger))
.onFailure[ActorInitializationException](SupervisorStrategy.stop)
val store = context.spawn(fault, "StoreChilid")
context.watch(store)
def loop(): Behavior[MessageCmd] =
Behaviors.receiveMessage {
case SaveMessage(v) =>
println(v)
store ! SaveMessage(v)
Behavior.same
}
loop()
}
}
The code of the child actor:
object Persistence {
val storeName = "connector-store"
/*
* Persist all incoming messages from KAFKA or SAP
*/
private val commandHandler: Option[ActorRef[RpcCmd]] => (MessageState, MessageCmd) => Effect[MessageEvent, MessageState]
= communicator => { (_, command) =>
command match {
case SaveMessage(data) =>
Effect
.persist(MessageSaved(data))
.thenRun(state => communicator.foreach(actor => actor ! SendMessage(state.value)))
}
}
private val eventHandler: (MessageState, MessageEvent) => MessageState
= { (_, event) =>
event match {
case MessageSaved(data) => MessageState(data)
}
}
def create(communicator: Option[ActorRef[RpcCmd]], logger: Option[ActorRef[LogCmd]]): Behavior[MessageCmd] =
Behaviors.setup { context =>
context.log.info("=============> Start PersistenceMessageActor <=============")
EventSourcedBehavior[MessageCmd, MessageEvent, MessageState](
persistenceId = PersistenceId(storeName),
emptyState = MessageState(),
commandHandler = commandHandler(communicator),
eventHandler = eventHandler)
.onPersistFailure(SupervisorStrategy.restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1))
.receiveSignal {
case (_, _) =>
println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
context.log.error("PersistenceMessageActor has been terminated. Please check the persistence.")
logger.foreach(actor => actor ! SaveLog(Log(Error, "Message store actor was stopped!")))
}
}
}
I expect to receive a Signal
that I can handle accordingly but I did not get anyone.
The question is, how to handle the case, when the actor can not be started, to receive the signal.
Thanks