I’ve crated a two-way tcp stream using akka-stream-typed.
void createSocket(ActorSystem<Void> actorSystem, ActorRef<SomeApi> ownerRef, String serverIp, int serverPort) {
Tcp.get(actorSystem).outgoingConnection(serverIp, serverPort)
.join(createSocketFlow(ownerRef))
.run(actorSystem);
}
Flow<ByteString, ByteString, NotUsed> createSocketFlow(ActorRef<SomeApi> socketOwner) {
return Flow.fromGraph(GraphDSL.create(builder -> {
Source<ByteString, ActorRef<ByteString>> pushSource =
ActorSource.<ByteString>actorRef(elem -> false,
m -> Optional.empty(),
40,
OverflowStrategy.dropBuffer())
.mapMaterializedValue(param -> {
socketOwner.tell(SetSocketPushRef.of(param));
return param;
});
SourceShape<ByteString> source = builder.add(pushSource);
Sink<ByteString, NotUsed> in = Flow.of(ByteString.class)
.to(ActorSink.<ByteString, SomeApi, ByteString>actorRefWithBackpressure(socketOwner,
(socketRef, bytes) -> ReceivedSocketPacket.of(bytes.toArray()),
SocketInitiated::of,
SocketCompleted.INSTANCE,
SocketError::of));
SinkShape<ByteString> input = builder.add(in);
return FlowShape.of(input.in(), source.out());
}));
}
Cause I needed a two-way socket (which sends and receives bytes) I had to create a source-ref.
This flow works fine but when created sockets’ count exceeds 1000, some sockets tend to get completed unexpectedly while no signal from socket-owner has been sent.
Any clue about the reasons?