Creating the Cassandra Table and Registering Codecs without relying on the ReadSide Db Handler

This is not with functional interface but it will do.

public interface PrepareAction {
	public CompletionStage<Done> prepare();
}

public class PrepareRegisterActor extends AbstractActor{
	private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
	private final List<PrepareAction> actions;
	public static Props props(PrepareAction...actions) {
	        return Props.create(PrepareRegisterActor.class,Arrays.asList(actions));
	}

	public PrepareRegisterActor(List<PrepareAction> actions){
		this.actions=actions;
		
	}
	 @Override
	public void preStart() {
		 logger.info("Started...");
	     pipe(doAll(actions.stream().map(PrepareAction::prepare).collect(Collectors.toList()))).to(getSelf());
	}
	 
	 @Override
	public Receive createReceive() {
		 return receiveBuilder()
				.match(Done.class, message -> handleDone())
				.match(Status.Failure.class, this::handleFailure)
				.build();
				
	}
	 
	 private void handleDone() {
		 logger.info("Done");
	 }
	 private void handleFailure(Status.Failure failure) throws RuntimeException{
		 Throwable error = failure.cause();
		 //logger.error("Exception occued. Will retry: {}",error);
		 throw new RuntimeException(error);
	 }

	private <T> PipeToSupport.PipeableCompletionStage<T> pipe(CompletionStage<T> completionStage) {
		return PatternsCS.pipe(completionStage, getContext().dispatcher());
	}

	@Override
	public void postStop() {
		logger.info("Stopped");
	}
	
	public static <T> CompletionStage<Done> doAll(CompletionStage<T>... stages) {
		return doAll(Arrays.asList(stages));
	}

	public static <T> CompletionStage<Done> doAll(List<CompletionStage<T>> stages) {
		CompletionStage<Done> result = CompletableFuture.completedFuture(Done.getInstance());
		for (CompletionStage<?> stage : stages) {
			result = result.thenCombine(stage, (d1, d2) -> Done.getInstance());
		}
		return result;
	}
	
}

public class PrepareRegister {
	
	public static void global(ActorSystem system,String prepareActorName,FiniteDuration backOffMin, FiniteDuration backOffMax, double backOffRandomFactor, PrepareAction...actions) {
		apply(system, true, prepareActorName, backOffMin, backOffMax, backOffRandomFactor, actions);
	}
	
	public static void local(ActorSystem system,String prepareActorName,FiniteDuration backOffMin, FiniteDuration backOffMax, double backOffRandomFactor, PrepareAction...actions) {
		apply(system, false, prepareActorName, backOffMin, backOffMax, backOffRandomFactor, actions);
	}

	private static void apply(ActorSystem system,boolean singletone, String prepareActorName,FiniteDuration backOffMin, FiniteDuration backOffMax, double backOffRandomFactor, PrepareAction...actions) {
		
        Props prepareActor = PrepareRegisterActor.props(actions);
        Props backoffProps = BackoffSupervisor.props(
                Backoff.onFailure(
                		prepareActor,
                		prepareActorName,
                		backOffMin, 
                		backOffMax,
                		backOffRandomFactor
                )
        );
        if(singletone){
	        ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create(system);
	        system.actorOf(
	                ClusterSingletonManager.props(backoffProps, PoisonPill.getInstance(), settings),
	                prepareActorName+"-Singletone"
	        		);
        }else{
        	system.actorOf(backoffProps,prepareActorName+"-BackOff");
        }
	}
	
	

}
//simple test (global is done in the same way)
public class TestPrepareAction implements PrepareAction{
	
	private Integer id;
	private boolean fail;
	
	public TestPrepareAction(Integer id, boolean fail) {
		super();
		this.id=id;
		this.fail = fail;
	}
	
	public void setFail(boolean fail) {
		this.fail=fail;
	}

	@Override
	public CompletionStage<Done> prepare() {
		if(!fail) {
			return CompletableFuture.completedFuture(Done.getInstance());
		}else {
			CompletableFuture<Done> cf=new CompletableFuture<Done>();
			cf.completeExceptionally(new Exception("("+id+")Error"));
			return cf;
		}
	}

}

public class Test {
	
	public static void main(String[] args) {
		final ActorSystem system=ActorSystem.create();
		TestPrepareAction localFail=new TestPrepareAction(2, true);
		PrepareRegister.local(system, 
							   "localPrepare", 
							   FiniteDuration.create(1, TimeUnit.SECONDS), 
							   FiniteDuration.create(10, TimeUnit.SECONDS), 
							   0.2d, new TestPrepareAction(1,false),localFail);
		
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
		}
		System.out.println("Setting #2 to not fail");
		localFail.setFail(false);
	}

}
1 Like