I am trying to run timer actor in akka cluster sharding but when one node going down which contains the schedular timer… it is not getting replicated to new shard …the actor is getting replilcated to new shard but timer is not getting restarted on new shard
public class ManagerActor extends AbstractBehavior<ManagerActor.Command>{
private static final Object StartTimerKey=new Object();
TimerScheduler<ManagerActor.Command> timer;
public static Logger logger = LoggerFactory.getLogger(ManagerActor.class);
private ManagerActor(ActorContext<Command> context) {
super(context);
}
public static Behavior<ManagerActor.Command> create(){
return Behaviors.setup(ctx->{return new ManagerActor(ctx);});
}
public interface Command extends Serializable{
}
public static class StartSchedular implements ManagerActor.Command{
private static final long serialVersionUID = 1L;
}
public static class CheckAndStartSession implements ManagerActor.Command{
private static final long serialVersionUID = 1L;
}
@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(StartSchedular.class, command -> {
logger.info(getContext().getSelf().path() + " :: Request Received to Start StartSchedularActor "+" | application port = "+applicationContext.getEnvironment().getProperty("server.port"));
return Behaviors.withTimers(timer -> {
timer.startTimerWithFixedDelay(StartTimerKey, new CheckAndStartSession(),
Duration.ofSeconds(0), Duration.ofSeconds(1));
this.timer = timer;
return this;
});
})
.onMessage(CheckAndStartSession.class, command->{
logger.info(getContext().getSelf().path() + " :: Request Received to CheckAndStartSession "+" | application port = "+applicationContext.getEnvironment().getProperty("server.port"));
return this;
})
.build();
}
}
public class ParentActor extends AbstractBehavior<ParentActor.Command>{
public static Logger logger = LoggerFactory.getLogger(ParentActor.class);
private ParentActor(ActorContext<Command> context) {
super(context);
}
public static Behavior<ParentActor.Command> create(){
return Behaviors.setup(context->{
return new ParentActor(context);
});
}
public interface Command extends Serializable{
}
public static class StartManagerActor implements ParentActor.Command{
}
@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(StartManagerActor.class, command->{
logger.info(getContext().getSelf().path()+" :: Request Received to Start StartManagerActor");
ActorRef<ManagerActor.Command> ManagerActor = getContext().spawn(ManagerActor.create(), SESSION_ACTOR");
shardRegion.
tell(new ShardingEnvelope<>("SESSION_ACTOR",new ManagerActor.StartSchedular ()));
return this;
})
.build();
}
}