I’m trying to implement a web socket GraphQL subscription service on top of akka-streams/akka-http, which should have a simple protocol: upon connection, clients send one or more subscribe(query)
messages and get a stream of results in response, with the ability to add and remove subscriptions on the fly.
- a stateful protocol requires either an actor or a
GraphStage[FlowShape[Message, Message]]
to manage the state machine, withGraphStage
solution being probably cleaner and safer - this dynamic fan-in configuration suggests that I should try using
MergeHub
for merging all subscriptions into the socket output
My initial idea was pretty simple: code a GraphStage[FlowShape[Message, Message]]
with an internal MergeHub
, expose its outlet as stage’s and on incoming subscribe(query)
messages just do dataSource.subscribe(query).runWith(mergehub)
. However, this turned out to be a non-starter for a very obvious reason: there’s no Sink
to materialize the MergeHub
with.
The only potentially working option I came up with is something like
val protocolActor: ActorRef = ???
upgradeToWebSocket.handleMessages(
Flow.fromSinkAndSourceCoupled(Sink.actorRef(protocolActor, "complete"), MergeHub.source.mapMaterializedValue(protocolActor ! _))
)
, but this is clearly a messy solution: passing socket handle as a message, actor’s unbounded message box, reduced type safety, having to manually manage protocolActor
lifecycle etc etc. Is there any better way?