So I can successfully forward messages from an Akka HTTP websocket -> rabbitmq and back. However, I appear to be having some issues when I have multiple clients. Here is the Flow I have:
Rather long winded, but I basically have a stretched out Websockets flow with two separate rabbitmq queues. The question I have is how can I correlate so that the WS client id is the same? It seems that the return leg via RabbitMQ back to the websocket doesn’t always know which client it is connected to, so either client might receive the message.
So I’ve simplified the Flow implementation as follows:
val rabbitMqSink: Graph[SinkShape[ByteString], NotUsed] = GraphDSL.create() { implicit builder =>
val flow = builder.add {
RabbitMQFlows.amqpSink
}
SinkShape(flow.in)
}
def wsResponseSource: Graph[SourceShape[IncomingMessage], NotUsed] = GraphDSL.create() { implicit builder =>
val flow = builder.add {
RabbitMQFlows.source
}
SourceShape(flow.out)
}
val rabbitMqFlow: Flow[ByteString, IncomingMessage, NotUsed] = {
val in =
Flow[ByteString]
.to(rabbitMqSink)
val out = wsResponseSource
Flow.fromSinkAndSource(in, out)
}
val wsViaFlow: Flow[Message, Message, NotUsed] = {
Flow[Message]
.collect {
case TextMessage.Strict(msg) => ByteString(msg)
}
.via(rabbitMqFlow)
.map {
case msg: IncomingMessage =>
TextMessage.Strict(msg.bytes.utf8String)
}
}
During testing, when there is more than one client, reply messages get returned to the last client to connect, then subsequent reply messages alternate in a round-robin fashion, which doesn’t seem right. It seems that extending the flow via RabbitMQ does some damage to the WS connection, so the inbound Flow doesn’t know which client to send the message to.
Is there a way to list connected clients server side? All tips appreciated!
OK, so I figured this out, or at least I have a usable work around: Use the Alpakka AMQP RPC Flow; this creates a temp queue, per client, which helps with correlation. Server-side needs an AmqpSink.replyTo, and you set the message properties from the original IncomingMessage to the OutgoingMessage. And all is well.