Hi,
First of all, I’m just beginning to use Akka, what I’m trying to do is distribute a process that used to run in a single server. the process that I distribute averaging about 50ms, but for some, it takes around 1500ms.
since this is a req-response pattern I’m using “AskPattern.ask” with 25000ms to get a future, but when this is running it gives following exception
Caused by: java.util.concurrent.TimeoutException: Ask timed out on [Actor[akka://test-cluster/user/executor_router#-1035294220]] after [25000 ms]. Message of type [...cluster.akka.DistributedExecutorEvent]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
at akka.actor.typed.scaladsl.AskPattern$.$anonfun$onTimeout$1(AskPattern.scala:120)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:669)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:476)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:680)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:355)
at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:306)
at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:310)
at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:262)
but none of my process running in any node is not taking more than 1500ms, which can be the cause for this, what am I doing wrong?
Perhaps you are sending the message to the wrong actor, or the actor has stopped?
Put a debug log entry where you reply and verify that the request is received and the reply is actually sent from that actor, and also what actor you are responding to.
Another problem can be that the actor is correct, but the message is not matched correctly (for example, because of missing brackets on the case class name in the match clause). In that case, make a catch-all and log unmatched messages.
Hi,
Thank you for you quick replay,
I have only one type of actor in the system, as follows,
public class DistributedExecutor<C, R> extends AbstractBehavior<DistributedExecutorEvent>
{
private String executorRole;
private DistributedExecutor( ActorContext<DistributedExecutorEvent> context, String executorRole )
{
super( context );
this.executorRole = executorRole;
}
static Behavior<DistributedExecutorEvent> create( String executorRole )
{
return Behaviors.setup( context -> {
context.getSystem().receptionist().tell( Receptionist.register( EXECUTOR_SERVICE_KEYS.get( executorRole ), context.getSelf() ) );
return new DistributedExecutor( context, executorRole );
} );
}
@Override public Receive<DistributedExecutorEvent> createReceive()
{
return newReceiveBuilder()
.onAnyMessage( this::onProcessorReceived )
.build();
}
private Behavior<DistributedExecutorEvent> onProcessorReceived( DistributedExecutorEvent<C, R> event )
{
try
{
long start = System.currentTimeMillis();
getContext().getLog().debug( "Got message from : {} with uuid : {} with criteria {}", event.getReplyTo().path(), event.getUuid(), event.getCriteria() );
SerializableFunction<C, R> processFunction = event.getProcessFunction();
R result = processFunction.apply( event.getCriteria() );
getContext().getLog().debug( "Distributed processor took : {}", ( System.currentTimeMillis() - start ) );
event.getReplyTo().tell( new DistributedResponseEvent<>( event.getUuid(), result ) );
}
catch( Exception e )
{
getContext().getLog().error( e.getMessage(), e );
}
return this;
}
}
“Got message” line is printed for most of them but not for some this is happening for 40-60 messages out of 1000 messages.
and now I can see in the log that some of the event.getReplyTo().tell() is resulting to send the message to dead letters
Info(akka://test-cluster/deadLetters,class akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef,Message [...cluster.akka.DistributedResponseEvent] to Actor[akka://test-cluster/deadLetters] was not delivered. [4] dead letters encountered. If this is not an expected behavior then Actor[akka://test-cluster/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.,akka.event.LogMarker@57b7771b)
a common reason for timeouts and delayed processing is thread starvation. Can you capture a stack dump while the problem occurs to see what the system is doing at the time (e.g. using jstack <pid> on the shell).
If you are a Lightbend subscriber you can also try to use the Thread Starvation Detector which will detect such a situation automatically.
Hi all,
I think I found the issue, was doing something fundamentally wrong. I was thinking ask pattern can be used as an Executor service in Java, so I called AskPattern.ask 1000 times in a loop and got the futures to a list, after doing some work in the code, I called Future.get for each future. the process that I fire in the cluster takes a maximum of 1500 ms, and the dispatcher I used has a fixed-pool-size of 8 so it will take around (1000/8)*1500 ~ 1875s to complete all the tasks. but all the 1000 request will only take about 200s to fire and the timeout for each request is started immediately as I called AskPattern.ask,
so for some request, it will give a Ask timed out. I think I’ll have to implement a Response aggregator to support my need ( https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html#general-purpose-response-aggregator )
Thank you all for helping me out.