"Typed"ActorFlow

Hello there,

In akka actor classic, there is a convenient helper method ActorFlow.actorRef in play.api.libs.streams.

I needed the analogue for a typed actor version, and didn’t find any. Is there something already existing? If not, this is the implementation I came up with:

import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior, Terminated}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.typed.scaladsl.{ActorSink, ActorSource}
import akka.stream.{Materializer, OverflowStrategy}

  private sealed trait EndOfStream
  private case object Success extends EndOfStream
  private case object Failure extends EndOfStream

  /**
   * Creates a [[Flow]] where incoming elements (of type `In`) are sent to the given behaviour, and messages (of type
   * `Out`) sent to created actor go downstream.
   *
   * The `behavior` will receive an actor of type `Out`, and all message sent to it will go downstream.
   *
   * This is particularly useful for using in Play Websockets, where incoming messages from the client are handled by
   * the given behavior, and outgoing messages come from the actor.
   *
   * @param behavior Actor responsible to receive (and handle) elements of type In coming from upstream. This behavior
   *                 will receive as input the actor to which sent elements go downstream.
   * @param actorName name of the actor to spawn
   * @param bufferSize size of the upstream elements buffer
   * @param overflowStrategy strategy describing how to handle buffer overflow
   * @param actorSystem surrounding actor system
   * @tparam In type of elements/messages coming from upstream
   * @tparam Out type of elements/messages going downstream
   * @return a [[Flow]] from In to Out.
   */
  def actorRef[In, Out](
      behavior: ActorRef[Out] => Behavior[In],
      actorName: String,
      bufferSize: Int                    = 16,
      overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
  )(implicit actorSystem: ActorSystem): Flow[In, Out, _] = {
    val (outActor, publisher) = ActorSource
      .actorRef[Out](
        { case _ if false      => }: PartialFunction[Out, Unit],
        { case e: Any if false => new Exception(e.toString) }: PartialFunction[Any, Throwable],
        bufferSize,
        overflowStrategy
      )
      .toMat(Sink.asPublisher(false))(Keep.both)
      .run()

    val sink = Flow[In]
      .map(Right[EndOfStream, In])
      .to(
        ActorSink.actorRef[Either[EndOfStream, In]](
          actorSystem.spawn(
            Behaviors.setup[Either[EndOfStream, In]] { context =>
              val flowActor = context.spawn(behavior(outActor), "flowActor")
              context.watch(flowActor)

              Behaviors
                .receiveMessage[Either[EndOfStream, In]] {
                  case Right(in) =>
                    flowActor ! in
                    Behaviors.same
                  case Left(_) =>
                    context.stop(flowActor)
                    Behaviors.same
                }
                .receiveSignal {
                  case (_, Terminated(_)) =>
                    Behaviors.stopped
                }
            },
            actorName
          ),
          Left(Success),
          _ => Left(Failure)
        )
      )

    Flow.fromSinkAndSource(
      sink,
      Source.fromPublisher(publisher)
    )

  }

It probably needs a tiny bit lifting but otherwise, is this something that would be suitable for a pull request?

Thanks in advance!

1 Like