Create Source from Sink (and vice versa)

In order to connect two websocket connections I would like to create a source from a sink (and the other way round). The only way I have found out myself is via the Reactive Streams types:

    val in  = Sink.asPublisher[Int](false)
    val out = Source.asSubscriber[Int]

    val (source, sink) =
      out
        .toMat(in)(Keep.both)
        .mapMaterializedValue {
          case (sub, pub) =>
            (Source.fromPublisher(pub), Sink.fromSubscriber(sub))
        }
        .run()

    Source(1.to(10)).runWith(sink)
    source.runForeach(println)

Is this approach fine?

I think that’d be the preMaterialize combinators:

https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html#source-pre-materialization

Source is available since 2.5.11 and the other way around was just merged and will be in 2.5.12 when that is released. The combinator does pretty much what you show in your sample as you can see:

Source:

Sink:

Thanks for the quick reply. Yet I don’t think this is what I need: I need a source that is based upon a sink. Not a prematerialized source. Looking at the impl you have attached, I think my approach is valid.

Yes, I think I drew the wrong conclusion by the idea being so similar. I think that the RS-dance in the question is a good approach. Maybe we should have something pre-packaged for it, I created https://github.com/akka/akka/issues/24853 to track that.

1 Like

I have a similar situation, except my Sink and Source are of two different types:

val in = Sink.asPublisher[Input](fanout = false)
val out = Source.asSubscriber[Output]

My beginner instinct tells me to do something like this:

out
  .toMat(Flow[Input]
    .map(inputToOutput)
    .to(Sink.asPublisher[Output](fanout = false))
  )(Keep.both)

…but I get a type error. Is there an easy way to fix this problem?

Update: I’ve been able to wire it correctly.

val in = Sink.asPublisher[Output](false)
val out = Source.asSubscriber[Output]

val (_sink, _source) = out
  .toMat(in)(Keep.both)
  .mapMaterializedValue {
    case (sub, pub) => (Sink.fromSubscriber(sub), Source.fromPublisher(pub))
  }
  .run()

val sink = Flow[Input].map(inputToOutput).to(_sink)
val source = _source