I’m trying to implement a web socket GraphQL subscription service on top of akka-streams/akka-http, which should have a simple protocol: upon connection, clients send one or more subscribe(query) messages and get a stream of results in response, with the ability to add and remove subscriptions on the fly.
- a stateful protocol requires either an actor or a
GraphStage[FlowShape[Message, Message]]to manage the state machine, withGraphStagesolution being probably cleaner and safer - this dynamic fan-in configuration suggests that I should try using
MergeHubfor merging all subscriptions into the socket output
My initial idea was pretty simple: code a GraphStage[FlowShape[Message, Message]] with an internal MergeHub, expose its outlet as stage’s and on incoming subscribe(query) messages just do dataSource.subscribe(query).runWith(mergehub). However, this turned out to be a non-starter for a very obvious reason: there’s no Sink to materialize the MergeHub with.
The only potentially working option I came up with is something like
val protocolActor: ActorRef = ???
upgradeToWebSocket.handleMessages(
Flow.fromSinkAndSourceCoupled(Sink.actorRef(protocolActor, "complete"), MergeHub.source.mapMaterializedValue(protocolActor ! _))
)
, but this is clearly a messy solution: passing socket handle as a message, actor’s unbounded message box, reduced type safety, having to manually manage protocolActor lifecycle etc etc. Is there any better way?