In order to connect two websocket connections I would like to create a source from a sink (and the other way round). The only way I have found out myself is via the Reactive Streams types:
val in = Sink.asPublisher[Int](false)
val out = Source.asSubscriber[Int]
val (source, sink) =
out
.toMat(in)(Keep.both)
.mapMaterializedValue {
case (sub, pub) =>
(Source.fromPublisher(pub), Sink.fromSubscriber(sub))
}
.run()
Source(1.to(10)).runWith(sink)
source.runForeach(println)
Source is available since 2.5.11 and the other way around was just merged and will be in 2.5.12 when that is released. The combinator does pretty much what you show in your sample as you can see:
Thanks for the quick reply. Yet I don’t think this is what I need: I need a source that is based upon a sink. Not a prematerialized source. Looking at the impl you have attached, I think my approach is valid.
Yes, I think I drew the wrong conclusion by the idea being so similar. I think that the RS-dance in the question is a good approach. Maybe we should have something pre-packaged for it, I created https://github.com/akka/akka/issues/24853 to track that.
val in = Sink.asPublisher[Output](false)
val out = Source.asSubscriber[Output]
val (_sink, _source) = out
.toMat(in)(Keep.both)
.mapMaterializedValue {
case (sub, pub) => (Sink.fromSubscriber(sub), Source.fromPublisher(pub))
}
.run()
val sink = Flow[Input].map(inputToOutput).to(_sink)
val source = _source