Hello there,
I am running into an error while I am trying to setup akka http
websocket (server side of websocket) using Akka Typed
.
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._
import scala.util.{Failure, Success, Try}
//Akka Typed code...
object WSJSONMessageService {
sealed trait Command
case class IncomingMessage(v: Message) extends Command
case class IncomingJSONMessage(v: JValue) extends Command
case class OutgoingJSONMessage(v: JValue) extends Command
case class OutgoingActorRef(ref: ActorRef[TextMessage]) extends Command
case class ReportError(err: Throwable) extends Command
case object Stop extends Command
def apply(): Behavior[Command] = handleMessages(null)
private def handleMessages(outgoingHandler: ActorRef[TextMessage]): Behavior[Command] = Behaviors.setup { ctx =>
Behaviors.receiveMessage {
case IncomingJSONMessage(v) =>
ctx.log.info(s"Incoming message: ${compact(v)}")
//todo: give this message to some useful actor.
ctx.self ! OutgoingJSONMessage(v) //hack for echo message!
Behaviors.same
case IncomingMessage(TextMessage.Strict(text)) =>
Try(parse(text)) match {
case Success(input) => ctx.self ! WSJSONMessageService.IncomingJSONMessage(input)
case Failure(exception) => ctx.log.error(exception, s"Error while processing: '$text'")
}
Behaviors.same
case IncomingMessage(_) =>
Behaviors.same
case OutgoingJSONMessage(v) =>
ctx.log.info(s"Outgoing message: ${compact(v)}")
if (outgoingHandler == null) {
ctx.log.warning(s"Outgoing message handler not set. Skipping: ${compact(v)}")
}
outgoingHandler ! TextMessage(compact(v))
Behaviors.same
case OutgoingActorRef(ref) =>
ctx.log.info(s"Registration of outgoing handler")
handleMessages(ref)
case Stop =>
ctx.log.info("Stopping actor!")
Behaviors.stopped
case ReportError(th) =>
ctx.log.error(th, "Error occurred while receiving message from WS")
Behaviors.stopped
}
}
}
//WS code to return a Flow...
private def wsJSONMessageHandler()(implicit context: ActorContext, mat: ActorMaterializer): Flow[Message, Message, Any] = {
import akka.actor.typed.scaladsl.adapter._
val actorRef = context.spawnAnonymous(WSJSONMessageService())
import akka.stream.typed.scaladsl._
val incoming = Flow[Message].mapAsync(1)(_.removeStream(5 minutes))
.collect{case v: Message => WSJSONMessageService.IncomingMessage(v)}
.to(ActorSink.actorRef[WSJSONMessageService.Command](actorRef, WSJSONMessageService.Stop, WSJSONMessageService.ReportError))
//This outgoing ActorSource is error-ing out to the declared RuntimeException.
/*val outgoing = ActorSource.actorRef[Message]({ case v =>}, { case input =>
new RuntimeException(s"error in ${input}")
}, 10, OverflowStrategy.dropHead)
.mapMaterializedValue(v => actorRef ! WSJSONMessageService.OutgoingActorRef(v))*/
val outgoing = Source.actorRef[Message](10, OverflowStrategy.dropHead)
.mapMaterializedValue(v => actorRef ! WSJSONMessageService.OutgoingActorRef(v))
Flow.fromSinkAndSource(incoming, outgoing)
}
implicit class RichMessage(value: Message) {
def removeStream(duration: FiniteDuration)(implicit mat: Materializer):Future[Any] = {
value match {
case v:TextMessage => v.toStrict(duration)
case v:BinaryMessage => v.toStrict(duration)
case _ => Future.successful(NotUsed)
}
}
}
I am successfully able to create a Http route for the above code using handleWebSocketMessages
directive. The incoming
Sink already uses Akka Typed
and works fine.
The outgoing
Source does not seem to work (handle to new RuntimeException(s"error in ${input}")
is called) with ActorSource
(shown above commented). A similar Source from untyped
works as expected (shown above).
Please advice on what I might be missing.
Thanks,
Muthu