Sink.actorRefWithAck not working

Hello All, I followed the example

https://doc.akka.io/docs/akka/2.5.12/stream/stream-integrations.html

@Component
@Scope(SCOPE_PROTOTYPE)
public class AckingReceiver extends AbstractActor {

    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(StreamInitialized.class, init -> {
                log.info("Stream initialized");
                sender().tell(new Ack(), self());
            })
            .match(String.class, element -> {
                log.info("Received element: {}", element);
                sender().tell(new Ack(), self());
            })
            .match(StreamCompleted.class, completed -> {
                log.info("Stream completed");
            })
            .match(StreamFailure.class, failed -> {
                log.error(failed.getCause(), "Stream failed!");
            })
            .build();
    }

}
 @Test
    public void actorRefBackPressureTest() {
        try {
            final Materializer materializer = ActorMaterializer.create(actorSystem);
            final ActorRef receiver = actorSystem.actorOf(DI_PROVIDER.get(actorSystem)
                .props("ackingReceiver"), "AckingReceiver");

            final Source<String, NotUsed> words =
                Source.from(Arrays.asList("hello", "hi"));

            final Sink<String, NotUsed> sink = Sink.<String> actorRefWithAck(receiver,
                new StreamInitialized(),
                new Ack(),
                new StreamCompleted(),
                ex -> new StreamFailure(ex));

            words
                .map(el -> el.toLowerCase())
                .runWith(sink, materializer);

            Thread.sleep(300 * 1000);

        } catch (final Exception e) {
            e.printStackTrace();
        }
    }

All I see is the “Stream Initialiazed” no other updates, anyone facing similar problems. I escalated here https://github.com/akka/akka/issues/22874 but asked to raise the issue over here.

The problem here is that the expected Ack object specified for actorRefWithAck and the Ack instances returned by the AckingReceiver are different, since the default equals implementation in Java tests for reference equality, and these are different Ack instances.

The easiest fix is to use an Ack singleton object. This mistake was made in the Java version of the docs as well, so we should definitely update those. Thanks for bringing it up!

@raboof Thanks again, singleton Ack solved the problem.

1 Like