I’m simulating in a test the conversation between three actors (A, B, C)
A ---> MessageA2B ---> B ---> MessageB2C ---> C
When MessageB2C
is successfully arrived to C then the acknowledgement is sent back to the origin.
C ---> MessageB2C_Ack ---> B ---> MessageA2B_Ack --> A
The only peculiarity of this conversation is the message MessageB2C
.
MessageB2C
is sent at least every 100 ms until C does not answer with its acknowledgement.
I’ve implemented this simple conversation with scala testkit framework, but the test fail in a particular situation.
When ActorB retries to send MessageB2C more then once time, then is unable to recognize the answers from ActorC. And the answers from ActorC go to deadLetters.
test("expectNoMessage-case: actorB retries MessageB2C every 50 milliseconds") {
val actorA = TestProbe()
val actorC = TestProbe()
val actorB = system.actorOf(ActorB.props(Props(classOf[TestRefWrappingActor], actorC)), "step1-case2-primary")
actorA.send(actorB, MessageA2B())
actorA.expectNoMessage(100.milliseconds)
actorC.expectMsg(MessageB2C())
// Retries form above
actorC.expectMsg(200.milliseconds, MessageB2C())
// Never reach this point with 100 ms frequency
actorC.expectMsg(200.milliseconds, MessageB2C())
actorA.expectNoMessage(100.milliseconds)
actorC.reply(MessageB2C_Ack())
// Never reach this point with MessageB2C 50 ms frequency
actorA.expectMsg(MessageA2B_Ack())
}
This is the ActorB
code:
class ActorB(actorCProps: Props) extends Actor {
import ActorB._
import context.dispatcher
val log = Logging(context.system, this)
val actorC = context.actorOf(actorCProps)
def receive = {
case r:MessageB2C_Ack => {
log.info("ActorB - secondary received UNHANDLED message: MessageB2C_Ack")
}
case r:MessageA2B => {
val client = context.sender()
implicit val timeout = Timeout(100.milliseconds)
log.info("ActorB received message MessageA2B from client " + client)
implicit val scheduler=context.system.scheduler
val p = MessageB2C()
RetrySupport.retry(() => {
log.info("ActorB - sent message MessageB2C to ActorC " + actorC)
Patterns.ask(actorC, p, 50.millisecond)
}, 10, 50.millisecond)
.onSuccess({
case p: MessageB2C_Ack => {
log.info("ActorB - Received MessageB2C_Ack so now sending an MessageA2B_Ack to client " + client)
client ! MessageA2B_Ack()
}
})
}
case r => {
log.info("ActorB - received UNHANDLED message " + r)
}
}
}