I’m using Akka Http’s singleWebSocketRequest, and recently I find that, if there is a huge tcp package, network will dispart it as multi package with last one’s fin = 1.
But when messages comes to netty/akka http, the last frame comes first, then akka http transforming it to a BinaryMessage.Strict, but others message transformed as BinaryMessage.Stream. then when I received BinaryMessages, I found my messages disorders. Because the last frame with fin sign comes first.
Now. I’m thinking, how to make Akka’s websocket have any buffer for message to handle this
situation, to make my dispart-message become ordered, or make sure all the frame is be ordered by sequence id.
Here is part of my code for receiving message, I print the message’s serial number when I received, then pipe it to actor to handle messages
Use flatMapConcat instead of firing off a streamed message consumer in a separately running stream, that makes sure you do not consume the next web socket message until that stream has completed, and then fire off the message to the actor downstream of that in a single foreach callback.
Did you mean flat the BinaryMessage.Stream till the stream is full finished, then send data actor. And I want to offer more information about my situation more specific (due to limit that new user only post one pic).
In actually, each data in BinaryMessage.Stream is fully complete, every one can decode by my code correctly.
I received 5 messages, If one returns as BinaryMessage.Strict, will print sink (serial number, task number), If BinaryMessage.Stream, will print stream (serial number, task number).
I sent message 1 to 5, but received as 1 2 3 5 4
Sorry for my limited knowledge about Akka Http. I read the code about Akka’s Websocket, I find that if a message with FIN=1, it will return as Strict message, else it will be Streamed message. So, the message printed with stream flag (2, 5) have no FIN flag. rest of messages have one.
And I use tcpdump to monitor network packages. Client side received 9 packages, and server side send 4 packages, which bring back to the first time I describe it.
And that makes me more confused about Akka’s Websocket message hadling, what doest Streamed message mean exactly ? And why messages not all Strict or Streamd ?
The ordering of the log entries from the Strict entry, logged from the “outer” stream consuming web socket messages, and the streaming consumer started for each Streamed, running in a separate stream started by runFold (so in another thread), is not deterministic.
If you instead completely collect the strict and streaming message into memory/or as individual smaller messages and emit that downstream first, and log/send to actor from there, it should be deterministic and in the order you expect.
You could also do that using mapAsync, if you always collect the entire streaming into memory, rather than process smaller chunks of bytes one at a time (but remember that it is a good idea to put an upper limit to it or else a web socket client could fill your heap and crash the JVM):
val onlyStrict: Flow[Message, BinaryMessage.Strict, NotUsed] =
Flow[Message].mapAsync(1) {
case strict: BinaryMessage.Strict => Future.successful(strict)
case BinaryMessage.Streamed(stream) =>
// collect entire stream into memory, turn that into a strict message
stream.runFold(ByteString.empty)(_ ++ _)
.map(BinaryMessage.Strict.apply)
}
val incomingSink: Sink[Message, NotUsed] =
onlyStrict.to(Sink.foreach(strict =>
println("do stuff...")
))
Oh! I think I’ve got what you say. You mean that the Streamed it self is async, not evaluate data until collecting finished. And before Streamed output its value, Strict data already arrived, so it makes Streamed later than Strict. So I need handle messages on single thread.
Enlightened ! Thanks for anythings helped me.
Now I realized that is a Akka Stream question. Pity for knowledge about it.