i have send message to sharding envelop but message does not get processed,
I have created two node in cluster sharding with configuration as below,
akka {
loglevel = DEBUG
actor {
provider = "cluster"
serializers {
jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
}
allow-java-serialization = off
serialization-bindings {
"com.actor.CborSerializer" = jackson-cbor
}
}
remote.artery {
canonical {
hostname = "127.0.0.1"
port = 1565
}
}
cluster {
seed-nodes = [
"akka://MY_ACTOR_SYSTEM@127.0.0.1:1565",
"akka://MY_ACTOR_SYSTEM@127.0.0.1:1566"
]
sharding {
number-of-shards = 1000
remember-entities = on
remember-entities-store = ddata
distributed-data.durable.keys = []
least-shard-allocation-strategy.rebalance-absolute-limit = 20
least-shard-allocation-strategy.rebalance-relative-limit = 0.1
}
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
configured actor in java as follows,
public class ActorConfiguration {
ApplicationContext applicationContext;
ActorSystem<RootActor.Command> actorSystem= null;
Behavior<RootActor.Command> rootActor = null;
Config config = null;
ClusterSharding clusterSharding= null;
EntityTypeKey<Command> processorEntityKey = null;
ActorRef<ShardingEnvelope<Command>> shardingEnv=null;
@Autowired
public ActorConfiguration(ApplicationContext applicationContext) {
super();
try {
this.applicationContext = applicationContext;
getConfig();
getRootActor();
getActorSystem();
getClusterShardingObj();
processorEntityKey = EntityTypeKey.create(Command.class, "ENTITY_KEY");
shardingEnv=getClusterShardingObj()
.init(Entity.of(getProcessorEnitityKey(), ctx -> getProcessorActor(ctx.getEntityId())).withStopMessage(ProcessorActor.GoodByeCommand.INSTANCE));
}catch (Exception e) {
e.printStackTrace();
}
}
ActorSystem<RootActor.Command> getActorSystem(){
if(actorSystem==null)
actorSystem = ActorSystem.create(getRootActor(), "MY_ACTOR_SYSTEM",getConfig());
return actorSystem;
}
Behavior<RootActor.Command> getRootActor(){
if(rootActor==null)
rootActor = RootActor.create(applicationContext);
return rootActor;
}
Behavior<Command> getProcessorActor(String entityId){
return ProcessorActor.create(applicationContext,entityId);
}
public ClusterSharding getClusterShardingObj() {
if(clusterSharding==null)
clusterSharding=ClusterSharding.get(getActorSystem());
return clusterSharding;
}
public ActorRef<ShardingEnvelope<Command>> getShardRegionProcessorActor(){
if(shardingEnv==null)
shardingEnv = getClusterShardingObj().init(Entity.of(getProcessorEnitityKey(), ctx -> getProcessorActor(ctx.getEntityId())).withStopMessage(ProcessorActor.GoodByeCommand.INSTANCE));
return shardingEnv;
}
public EntityTypeKey<Command> getProcessorEnitityKey(){
if(processorEntityKey==null)
processorEntityKey = EntityTypeKey.create(Command.class, "ENTITY_KEY");
return processorEntityKey;
}
public Config getConfig() {
if(config==null)
config = ConfigFactory.load();
return config;
}
}
processes 5000 orders by calling
ActorRef<ShardingEnvelope<Command>> processorShardRegion= actorConfiguration.getShardRegionProcessorActor();
processorShardRegion.tell(new ShardingEnvelope<Command>(order.getId()+"_PROCESSOR", new ProcessOrderCommand(order)));
some message get processed and some does not message for which message does not get
processed printed following actor log,
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-3|2023-07-06 15:31:24.818|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - PROCESSOR_ENTITY_KEY: Request to start entity [ID3768_PROCESSOR] (in state [RememberedButNotCreated])
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-3|2023-07-06 15:31:24.818|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - PROCESSOR_ENTITY_KEY: Started entity [Actor[akka://MY_ACTOR_SYSTEM/system/sharding/PROCESSOR_ENTITY_KEY/259/3768_PROCESSOR#-1888555062]] with entity id [3768_PROCESSOR] in shard [259]
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-23|2023-07-06 15:31:25.403|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - Cluster Node [akka://MY_ACTOR_SYSTEM@127.0.0.1:1565] - Receiving gossip from [UniqueAddress(akka://MY_ACTOR_SYSTEM@127.0.0.1:1566,-4576392484291764776)]
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.404|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - SBR exited [Member(akka://MY_ACTOR_SYSTEM@127.0.0.1:1566, Exiting)]
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.405|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - SBR reset stable deadline when members/unreachable changed
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.409|INFO |Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$3:98 - This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.438|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - Cluster Node [akka://MY_ACTOR_SYSTEM@127.0.0.1:1565] - Receiving gossip from [UniqueAddress(akka://MY_ACTOR_SYSTEM@127.0.0.1:1566,-4576392484291764776)]
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.438|INFO |Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$3:98 - Cluster Node [akka://MY_ACTOR_SYSTEM@127.0.0.1:1565] - Exiting confirmed [akka://MY_ACTOR_SYSTEM@127.0.0.1:1566]
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.470|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - Using serializer [akka.remote.serialization.ArteryMessageSerializer] for message [akka.remote.artery.ActorSystemTerminatingAck]
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.593|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - Closing connection due to IO error java.net.SocketException: Connection reset
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.629|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - Closing connection due to IO error java.net.SocketException: Connection reset
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.655|INFO |Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$3:98 - Cluster Node [akka://MY_ACTOR_SYSTEM@127.0.0.1:1565] - is the new leader among reachable nodes (more leaders may exist)
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.657|INFO |Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$3:98 - Cluster Node [akka://MY_ACTOR_SYSTEM@127.0.0.1:1565] - Leader is removing confirmed Exiting node [akka://MY_ACTOR_SYSTEM@127.0.0.1:1566]
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.658|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - Publish AddressTerminated [akka://MY_ACTOR_SYSTEM@127.0.0.1:1566]
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.659|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - SBR remove [Member(akka://MY_ACTOR_SYSTEM@127.0.0.1:1566, Removed)]
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-27|2023-07-06 15:31:25.660|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - adding removed node [UniqueAddress(akka://MY_ACTOR_SYSTEM@127.0.0.1:1566,-4576392484291764776)] from MemberRemoved
|MY_ACTOR_SYSTEM-akka.actor.internal-dispatcher-2|2023-07-06 15:31:25.663|DEBUG|package$LoggerOps$|debug2$extension:114 - ClusterReceptionist [akka://MY_ACTOR_SYSTEM@127.0.0.1:1565] - Leader node observed removed node [UniqueAddress(akka://MY_ACTOR_SYSTEM@127.0.0.1:1566,-4576392484291764776)]
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-23|2023-07-06 15:31:25.665|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - Watchee terminated: [akka://MY_ACTOR_SYSTEM@127.0.0.1:1566/system/sharding/PROCESSOR_ENTITY_KEY]
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-23|2023-07-06 15:31:25.666|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - adding removed node [UniqueAddress(akka://MY_ACTOR_SYSTEM@127.0.0.1:1566,-4576392484291764776)] from MemberRemoved
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-23|2023-07-06 15:31:25.739|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - [outbound connection to [akka://MY_ACTOR_SYSTEM@127.0.0.1:1566], message stream] Upstream finished.
|MY_ACTOR_SYSTEM-akka.actor.default-dispatcher-23|2023-07-06 15:31:25.739|DEBUG|Slf4jLogger$$anonfun$receive$1|$anonfun$applyOrElse$4:103 - Last restart attempt was more than 1 second ago, resetting restart count
please help me regarding same.