This is not with functional interface but it will do.
public interface PrepareAction {
public CompletionStage<Done> prepare();
}
public class PrepareRegisterActor extends AbstractActor{
private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
private final List<PrepareAction> actions;
public static Props props(PrepareAction...actions) {
return Props.create(PrepareRegisterActor.class,Arrays.asList(actions));
}
public PrepareRegisterActor(List<PrepareAction> actions){
this.actions=actions;
}
@Override
public void preStart() {
logger.info("Started...");
pipe(doAll(actions.stream().map(PrepareAction::prepare).collect(Collectors.toList()))).to(getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Done.class, message -> handleDone())
.match(Status.Failure.class, this::handleFailure)
.build();
}
private void handleDone() {
logger.info("Done");
}
private void handleFailure(Status.Failure failure) throws RuntimeException{
Throwable error = failure.cause();
//logger.error("Exception occued. Will retry: {}",error);
throw new RuntimeException(error);
}
private <T> PipeToSupport.PipeableCompletionStage<T> pipe(CompletionStage<T> completionStage) {
return PatternsCS.pipe(completionStage, getContext().dispatcher());
}
@Override
public void postStop() {
logger.info("Stopped");
}
public static <T> CompletionStage<Done> doAll(CompletionStage<T>... stages) {
return doAll(Arrays.asList(stages));
}
public static <T> CompletionStage<Done> doAll(List<CompletionStage<T>> stages) {
CompletionStage<Done> result = CompletableFuture.completedFuture(Done.getInstance());
for (CompletionStage<?> stage : stages) {
result = result.thenCombine(stage, (d1, d2) -> Done.getInstance());
}
return result;
}
}
public class PrepareRegister {
public static void global(ActorSystem system,String prepareActorName,FiniteDuration backOffMin, FiniteDuration backOffMax, double backOffRandomFactor, PrepareAction...actions) {
apply(system, true, prepareActorName, backOffMin, backOffMax, backOffRandomFactor, actions);
}
public static void local(ActorSystem system,String prepareActorName,FiniteDuration backOffMin, FiniteDuration backOffMax, double backOffRandomFactor, PrepareAction...actions) {
apply(system, false, prepareActorName, backOffMin, backOffMax, backOffRandomFactor, actions);
}
private static void apply(ActorSystem system,boolean singletone, String prepareActorName,FiniteDuration backOffMin, FiniteDuration backOffMax, double backOffRandomFactor, PrepareAction...actions) {
Props prepareActor = PrepareRegisterActor.props(actions);
Props backoffProps = BackoffSupervisor.props(
Backoff.onFailure(
prepareActor,
prepareActorName,
backOffMin,
backOffMax,
backOffRandomFactor
)
);
if(singletone){
ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create(system);
system.actorOf(
ClusterSingletonManager.props(backoffProps, PoisonPill.getInstance(), settings),
prepareActorName+"-Singletone"
);
}else{
system.actorOf(backoffProps,prepareActorName+"-BackOff");
}
}
}
//simple test (global is done in the same way)
public class TestPrepareAction implements PrepareAction{
private Integer id;
private boolean fail;
public TestPrepareAction(Integer id, boolean fail) {
super();
this.id=id;
this.fail = fail;
}
public void setFail(boolean fail) {
this.fail=fail;
}
@Override
public CompletionStage<Done> prepare() {
if(!fail) {
return CompletableFuture.completedFuture(Done.getInstance());
}else {
CompletableFuture<Done> cf=new CompletableFuture<Done>();
cf.completeExceptionally(new Exception("("+id+")Error"));
return cf;
}
}
}
public class Test {
public static void main(String[] args) {
final ActorSystem system=ActorSystem.create();
TestPrepareAction localFail=new TestPrepareAction(2, true);
PrepareRegister.local(system,
"localPrepare",
FiniteDuration.create(1, TimeUnit.SECONDS),
FiniteDuration.create(10, TimeUnit.SECONDS),
0.2d, new TestPrepareAction(1,false),localFail);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("Setting #2 to not fail");
localFail.setFail(false);
}
}