Hello All, I followed the example
https://doc.akka.io/docs/akka/2.5.12/stream/stream-integrations.html
@Component
@Scope(SCOPE_PROTOTYPE)
public class AckingReceiver extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
@Override
public Receive createReceive() {
return receiveBuilder()
.match(StreamInitialized.class, init -> {
log.info("Stream initialized");
sender().tell(new Ack(), self());
})
.match(String.class, element -> {
log.info("Received element: {}", element);
sender().tell(new Ack(), self());
})
.match(StreamCompleted.class, completed -> {
log.info("Stream completed");
})
.match(StreamFailure.class, failed -> {
log.error(failed.getCause(), "Stream failed!");
})
.build();
}
}
@Test
public void actorRefBackPressureTest() {
try {
final Materializer materializer = ActorMaterializer.create(actorSystem);
final ActorRef receiver = actorSystem.actorOf(DI_PROVIDER.get(actorSystem)
.props("ackingReceiver"), "AckingReceiver");
final Source<String, NotUsed> words =
Source.from(Arrays.asList("hello", "hi"));
final Sink<String, NotUsed> sink = Sink.<String> actorRefWithAck(receiver,
new StreamInitialized(),
new Ack(),
new StreamCompleted(),
ex -> new StreamFailure(ex));
words
.map(el -> el.toLowerCase())
.runWith(sink, materializer);
Thread.sleep(300 * 1000);
} catch (final Exception e) {
e.printStackTrace();
}
}
All I see is the “Stream Initialiazed” no other updates, anyone facing similar problems. I escalated here https://github.com/akka/akka/issues/22874 but asked to raise the issue over here.