Actors Streams with actorRefWithBackpressure

Hi,
I have an application that needs to interact with 3rd party APIs that have some rate-limiting requirements.

I’ve designed two shard regions: one for business actors and other for worker actors, these will make the calls to the network (3rd party APIs). These actions will be triggered either by message queue (Akka) either by client requests through an api exposed.

I designed the following flow:

  1. Source: Business Actor using ActorSource.actorRefWithBackpressure.
    Question: What actor I need to define in the first parameter and which message in the second parameter?

  2. Flow: from the previous result message to send a message to a worker actor.
    Question: Same. What is the actor for the actorRefWithBackpressure method.

That is a web-application. So I would like to expose this ActorRef<PayerCompanion.Command> in order to whenever I send a message it goes through the stream.

    private ActorRef<PayerCompanion.Command> buildStream(ActorSystem<SpawnProtocol.Command> actorSystem) {

        Materializer mat = Materializer.createMaterializer(actorSystem);

        Source<PayerCompanion.Command, ActorRef<PayerCompanion.Command>> source = ActorSource.actorRefWithBackpressure(actorSystem, null, null, null);

        ActorRef<PayerCompanion.Command> walletActorRef = source.preMaterialize(mat).first();

        Flow<PayerCompanion.Command, PayerCompanion.PaymentInitiated, NotUsed> flow = ActorFlow.ask(
                walletActorRef,
                Duration.ofSeconds(1),
                (PayerCompanion.Command command, ActorRef<PayerCompanion.PaymentInitiated> actorReference) -> command)
                .throttle(2, Duration.ofSeconds(3));

        Source<PayerCompanion.Command, ActorRef<BraintreeWorkerCompanion.Command>> source = ActorSink.actorRefWithBackpressure(null, null, null, null, null, null);

        final Flow<PayerCompanion.PaymentInitiated, BraintreeWorkerCompanion.CreatePayment, NotUsed> flowToWorkers =
                Flow.fromFunction(
                        (PayerCompanion.PaymentInitiated command) ->
                                BraintreeWorkerCompanion.CreatePayment.builder()
                                        .userId(command.getUserId())
                                        .paymentId(command.getPaymentId())
                                        .amount(command.getAmount()).build()
                );

        source
                .via(flow)
                .via(flowToWorkers)
                .run(mat);

        return actorRef;
    }

I added more thoughts about what I have in my mind here:


Sorry, I am not clear. I would be glad if someone could explain. Thanks!

Hi @jlnery,

ActorSource.actorRefWithBackpressure will acknowledge that it emitted the element on to the stream to the actor which you specify as first parameter. The second parameter is the message which is used to acknowledge this emitting.

Sending another message before the previous one has been acknowledged will fail the stream.

But if I understand your use-case correctly, using ActorFlow.ask seems enough for interacting with your actors.

Cheers,
Enno.

Do you even need this? Throttle has a really big usecase cover. You can handle “costs”, add cost calculation functions, min/max burst. If you add a mapAsync after that you can probably do any rate-limit related thing with it, without the use of actors.