I have a simple actor functioning as a proxy to a model serving service.
object ModelService {
sealed trait Command extends NoSerializationVerificationNeeded
sealed trait Request[R <: Reply] extends Command {
def replyTo: ActorRef[R]
}
case class GetPrediction(replyTo: ActorRef[Reply], htmlInput: String, probabilityThreshold: Float)
extends Request[Reply]
sealed trait Reply extends NoSerializationVerificationNeeded
case object ModelOffline extends Reply
case class Prediction(probIndexVec: Vector[(Float, Int)]) extends Reply
case class ModelError(error: String) extends Reply
// case class Context(replyTo: ActorRef[Reply])
def apply(modelServiceHost: String, port: Int, path: String): Behavior[Command] = {
val QueueSize = 10
Behaviors.setup { context =>
implicit val system = context.system.toClassic
import system.dispatcher // to get an implicit ExecutionContext into scope
val poolClientFlow =
Http()(system).cachedHostConnectionPool[ActorRef[Reply]](modelServiceHost, port)
def createRequest(predictionCommand: GetPrediction): (HttpRequest, ActorRef[Reply]) = ???
def parseResponse(response: HttpResponse): Either[ModelError, Prediction] = ???
val queue = Source
.queue[GetPrediction](QueueSize, OverflowStrategy.dropNew)
.map(createRequest)
.via(poolClientFlow)
.to(Sink.foreach({
case (Success(resp), replyTo) => parseResponse(resp).fold(replyTo ! _, replyTo ! _)
case (Failure(e), replyTo) => replyTo ! ModelError("failed to get a response from the model service")
}))
.run()
Behaviors.receiveMessage {
case cmd @ GetPrediction(replyTo, htmlInput, probabilityThreshold) =>
queue.offer(cmd)
Behaviors.same
}
}
}
}
when I want to test it in this
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import com.typesafe.config.ConfigFactory
class ModelServiceSpec extends ScalaTestWithActorTestKit() with AnyWordSpecLike with BeforeAndAfterAll with Matchers {
override def afterAll(): Unit = testKit.shutdownTestKit()
"the model service" when {
val modelService = testKit.spawn(ModelService("127.0.0.1", 8080, "/model"), "model-service")
val probe = testKit.createTestProbe[ModelService.Reply]()
"a valid request" should {
"get response from the model-serving server" in {
modelService ! ModelService.GetPrediction(probe.ref, "this is a test scala question", 0.7f)
probe.expectMessage(ModelService.ModelError("still testing"))
}
}
}
}
I saw
[2020-11-20 22:08:17,219] [DEBUG] [akka.remote.artery.Decoder] [] [ModelServiceSpec-akka.actor.default-dispatcher-5] - Decoded message but unable to record hits for compression as no remoteAddress known. No association yet? {akkaAddress=akka://ModelServiceSpec@127.0.0.1:2551, sourceThread=ModelServiceSpec-akka.remote.default-remote-dispatcher-11, akkaSource=Decoder(akka://ModelServiceSpec), sourceActorSystem=ModelServiceSpec, akkaTimestamp=21:08:17.218UTC}
[2020-11-20 22:08:17,219] [WARN] [akka.remote.artery.InboundHandshake$$anon$2] [] [ModelServiceSpec-akka.actor.default-dispatcher-5] - Dropping Handshake Request from [akka://ModelServiceSpec@127.0.0.1:2551#1138052464865087236] addressed to unknown local address [akka://nt-ui@127.0.0.1:2551]. Local address is [akka://ModelServiceSpec@127.0.0.1:2551]. Check that the sending system uses the same address to contact recipient system as defined in the 'akka.remote.artery.canonical.hostname' of the recipient system. The name of the ActorSystem must also match. {akkaAddress=akka://ModelServiceSpec@127.0.0.1:2551, sourceThread=ModelServiceSpec-akka.actor.internal-dispatcher-4, akkaSource=InboundHandshake$$anon$2(akka://ModelServiceSpec), sourceActorSystem=ModelServiceSpec, akkaTimestamp=21:08:17.218UTC}
it seems to me that the prob is using address akka://ModelServiceSpec@127.0.0.1:2551
where the real actor is using akka://nt-ui@127.0.0.1:2551
.
ânt-uiâ is my projectâs name.
can someone explain the meaning of âCheck that the sending system uses the same address to contact recipient system as defined in the âakka.remote.artery.canonical.hostnameâ of the recipient system. The name of the ActorSystem must also match.â
thanks!