It seems that a response message, which is sent from a typed actor, skips a protobuf serialization defined in the configuration.
There is a simple akka typed microservice that is written using cluster/sharding and persistence with Cassandra journal, it runs in a local cluster with only 1 machine.
It has a single actor which handles a command (this command is properly serialized using protobuf) and then sends a response message to replyTo
from the command. The problem is that such a message doesn’t get serialized at all.
I forced serialization of all messages, this is a config file application.conf
:
akka.actor.serializers {
shipment-proto = "com.shipment.protobuf.ProtobufSerializer"
}
akka.actor.serialization-bindings {
"com.shipment.Shipment" = shipment-proto
"com.shipment.command.FindShipmentByNumber" = shipment-proto
}
akka.actor.serialize-messages = on
akka.actor.allow-java-serialization = off
There is a class which sends a command to entity (sharding) actor:
class Service {
...
def findShipment(shipmentNumber: String): Future[Shipment] = {
val ref: EntityRef[ShipmentCommand] = sharding.entityRefFor(ShipmentModel.TypeKey, shipmentNumber)
ref.ask { replyTo: ActorRef[Shipment] =>
FindShipmentByNumber(
shipmentNumber = shipmentNumber,
replyTo = replyTo
)
})
}
}
And the actor itself:
case class Shipment(shipmentNumber: String)
object ShipmentModel {
...
val TypeKey: EntityTypeKey[ShipmentCommand] =
EntityTypeKey[ShipmentCommand]("ShipmentActor")
def apply(persistenceId: PersistenceId): Behavior[ShipmentCommand] = {
Behaviors.setup { context: ActorContext[ShipmentCommand] =>
EventSourcedBehavior
.withEnforcedReplies[ShipmentCommand, ShipmentEvent, Option[Shipment]](
persistenceId = persistenceId,
emptyState = None,
commandHandler = (state, cmd) =>
cmd match {
case FindShipmentByNumber(_, replyTo) =>
Effect.reply(replyTo)(Shipment(shipmentNumber = "123"))
},
eventHandler = (state, event) =>
...
)
}
}
}
There is also a protobuf serializer for the Shipment
class:
class ProtobufSerializer(system: ExtendedActorSystem)
extends SerializerWithStringManifest {
val ShipmentManifest = "001"
val FindShipmentManifest = "003"
// should be unique, check other serializers before assigning
def identifier: Int = 1205
def manifest(o: AnyRef): String = {
o match {
case _: Shipment => ShipmentManifest
case _: FindShipmentByNumber => FindShipmentManifest
}
}
def toBinary(o: AnyRef): Array[Byte] = {
println("Serializing new message: " + o)
(o match {
case o: Shipment => toProtobuf(o)
case o: FindShipmentByNumber => toProtobuf(o)
}).toByteArray
}
Why does the FindShipmentByNumber
message get serializied properly (I can see Serializing new message ..
in the console) but Shipment
doesn’t ?
How is it possible to avoid the serialization of actor response message even if akka.actor.serialize-messages
is enabled?
Akka version is 2.6.15
, jdk 14.0.2
Appreciate any ideas.
Thanks in advance!