Hey guys,
I have a persistent actor. It uses Akka Durable State.
I can access that one based on following code
sharding.init(Entity.of(PersistentActor.ENTITY_TYPE_KEY,
entityContext -> PersistentActor.create(entityContext.getEntityId(),
PersistenceId.of(entityContext.getEntityTypeKey().name(), entityContext.getEntityId()))));
.....
Source.from(...)
.mapAsync(buffer, message -> {
EntityRef<Command> ref = sharding.entityRefFor(PersistentActor.ENTITY_TYPE_KEY,
String.valueOf(message.getId()));
return ref.ask(replyTo -> new Command(message, replyTo), askTimeout);
})
That works very well.
If I replace the source with a Alpakka Consumer (Consumer
.committableSource(…)) then I get the following error:
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Recipient shard region of [EntityRef(EntityTypeKey[scheduler.Command](PersistentActor), 2)] had already been terminated.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:674)
at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662)
at java.base/java.util.concurrent.CompletableFuture.thenApplyAsync(CompletableFuture.java:2173)
at scala.concurrent.java8.FuturesConvertersImpl$CF.thenApply(FutureConvertersImpl.scala:35)
at scala.concurrent.java8.FuturesConvertersImpl$CF.thenApply(FutureConvertersImpl.scala:26)
at Consumer.lambda$consume2$60ab0450$1(Consumer.java:77)
at akka.stream.javadsl.Source.$anonfun$mapAsync$1(Source.scala:2288)
at akka.stream.impl.fusing.MapAsync$$anon$30.onPush(Ops.scala:1307)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:542)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
The application.conf looks like following:
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "OFF"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "cluster"
serializers {
jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"DurableStateMessage" = jackson-json
}
}
remote {
artery {
enabled = on
transport = tcp
canonical.hostname = "127.0.0.1"
canonical.port = 2551
}
}
cluster {
seed-nodes = [
"akka://Test@127.0.0.1:2551"
]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver {
active-strategy = keep-majority
}
}
management {
http {
hostname = "127.0.0.1"
port = 8558
}
}
persistence {
state {
plugin = "jdbc-durable-state-store"
// Enable the line below to automatically start the snapshot-store when the actorsystem is started
auto-start-snapshot-stores = ["jdbc-durable-state-store"]
}
}
}
Is that possible that the Source cannot resolve that shard? I didn’t use KafkaClusterSharding in that example, but maybe that is incompatible with the durable state store?