Hello there,
In akka actor classic, there is a convenient helper method ActorFlow.actorRef
in play.api.libs.streams
.
I needed the analogue for a typed actor version, and didn’t find any. Is there something already existing? If not, this is the implementation I came up with:
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior, Terminated}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.typed.scaladsl.{ActorSink, ActorSource}
import akka.stream.{Materializer, OverflowStrategy}
private sealed trait EndOfStream
private case object Success extends EndOfStream
private case object Failure extends EndOfStream
/**
* Creates a [[Flow]] where incoming elements (of type `In`) are sent to the given behaviour, and messages (of type
* `Out`) sent to created actor go downstream.
*
* The `behavior` will receive an actor of type `Out`, and all message sent to it will go downstream.
*
* This is particularly useful for using in Play Websockets, where incoming messages from the client are handled by
* the given behavior, and outgoing messages come from the actor.
*
* @param behavior Actor responsible to receive (and handle) elements of type In coming from upstream. This behavior
* will receive as input the actor to which sent elements go downstream.
* @param actorName name of the actor to spawn
* @param bufferSize size of the upstream elements buffer
* @param overflowStrategy strategy describing how to handle buffer overflow
* @param actorSystem surrounding actor system
* @tparam In type of elements/messages coming from upstream
* @tparam Out type of elements/messages going downstream
* @return a [[Flow]] from In to Out.
*/
def actorRef[In, Out](
behavior: ActorRef[Out] => Behavior[In],
actorName: String,
bufferSize: Int = 16,
overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
)(implicit actorSystem: ActorSystem): Flow[In, Out, _] = {
val (outActor, publisher) = ActorSource
.actorRef[Out](
{ case _ if false => }: PartialFunction[Out, Unit],
{ case e: Any if false => new Exception(e.toString) }: PartialFunction[Any, Throwable],
bufferSize,
overflowStrategy
)
.toMat(Sink.asPublisher(false))(Keep.both)
.run()
val sink = Flow[In]
.map(Right[EndOfStream, In])
.to(
ActorSink.actorRef[Either[EndOfStream, In]](
actorSystem.spawn(
Behaviors.setup[Either[EndOfStream, In]] { context =>
val flowActor = context.spawn(behavior(outActor), "flowActor")
context.watch(flowActor)
Behaviors
.receiveMessage[Either[EndOfStream, In]] {
case Right(in) =>
flowActor ! in
Behaviors.same
case Left(_) =>
context.stop(flowActor)
Behaviors.same
}
.receiveSignal {
case (_, Terminated(_)) =>
Behaviors.stopped
}
},
actorName
),
Left(Success),
_ => Left(Failure)
)
)
Flow.fromSinkAndSource(
sink,
Source.fromPublisher(publisher)
)
}
It probably needs a tiny bit lifting but otherwise, is this something that would be suitable for a pull request?
Thanks in advance!