Hi Akka Team in our project we are facing issues when shutting down the actor system
which has threads blocked in await.
I know that i should avoid using blocking calls inside the actor system but unfortunately, it is not always possible, esp. in hosted env like Flink, Spark, etc, which already provide an abstraction to run distributed computations.
In my particular case we have a Flink pipeline where we are using Akka/Akka HTTP library to fetch data from the backend.
Flink stages (except (Rich)AsyncOperator) usually require a message to be passed through the pipeline which force us to block on await, i.e.
Await.result(Http()(system).singleRequest(request), Duration.Inf)
The problem comes when there is an exception within Flink pipeline and Flink tries to restarts the job. Initially, Flink tries to gracefully shutdown the pipeline operators, keeping JVM running, but if there are blocking calls usually Actor system is not actually shutting down cleanly.
Issues become even worse when Flink times out waiting for the operator`s completion and starts removing temporary files that were used to run the job still running actor system threads start logging millions of error messages of NoClassDefFound and keep CPUs busy.
I created a small demonstration code which will never shutdown the actor system properly:
object AppConfig {
val actorSystemConfig = ConfigFactory.parseString(
"""
akka {
loglevel = DEBUG
loggers = ["akka.event.slf4j.Slf4jLogger"]
}
"""
)
}
object ReproducerApp extends App {
implicit val system = ActorSystem("actor-system", AppConfig.actorSystemConfig)
import system.dispatcher
val exceptionHandler: ExceptionHandler = ExceptionHandler {
case ex: Throwable =>
println(s"Got exception: ${ex.getMessage}")
complete(StatusCodes.OK)
}
val route: Route =
handleExceptions(exceptionHandler) {
path("test") {
println("Got request")
withoutRequestTimeout {
val promise = Promise[String]()
system.scheduler.scheduleOnce(10.seconds) {
promise.success("OK")
}
complete {
promise.future
}
}
}
}
val bindingFuture = Http().newServerAt("127.0.0.1", 8080).bind(route)
System.in.read()
}
object ReproducerClientApp extends App {
implicit val system = ActorSystem("actor-system", AppConfig.actorSystemConfig)
val request =
HttpRequest(
method = HttpMethods.GET,
uri = Uri("http://127.0.0.1:8080/test")
)
Source.fromIterator(() => (1 to 100).toList.iterator)
.map { _ =>
Await.result(Http()(system).singleRequest(request), Duration.Inf)
}.map(_.discardEntityBytes()).runWith(Sink.ignore)
Thread.sleep(4000)
val result = system.terminate()
result.andThen {
case s => println(s)
}(ExecutionContext.global)
}
ReproducerClientApp code will never terminate the actor system as there are some threads blocked but there will be no new task scheduled to deliver a message to waiting Akka HTTP stage, the default/internal dispatcher threads are still running. Alive actors blocking akka dispatchers to shut down - so it is kind of deadlock.
As a work aroud i tried to send interrupt to all threads like this:
val threadSet = Thread.getAllStackTraces.keySet
threadSet.asScala.foreach { t =>
t.interrupt()
}
This did a trick and actor system terminates logging error:
[2020-08-17 10:59:00.336 +0000] ERROR - akka://actor-system/system/Materializers/StreamSupervisor-0/flow-0-0-ignoreSink: interruption during creation
akka.actor.ActorInitializationException: akka://actor-system/system/Materializers/StreamSupervisor-0/flow-0-0-ignoreSink: interruption during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:196)
...
Caused by: java.lang.InterruptedException: null
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1040)
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait0(Promise.scala:207)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:225)
I looked into akka shutdown process, which is already quite complicated, but do you think having some build-in mechanism to force shutdown by interrupt signal after some timeout would make sense? Or maybe you have other suggestions in this situation?
Regards,
Kyrylo