My program (that’s Kotlin, but it should be pretty self-explanatory here):
val consumerSettings = ConsumerSettings.create(config, StringDeserializer(), StringDeserializer())
.withBootstrapServers("kafka-0.kafka:9092")
.withGroupId("test-group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
.withClientId("test-consumer-" + (system.lookupRoot().path().address()))
val committerSink: Graph<SinkShape<ConsumerMessage.CommittableOffset>, CompletionStage<Done>>
= Committer.sink<ConsumerMessage.CommittableOffset>(CommitterSettings.create(system.settings().config().getConfig("akka.kafka.committer")))
@Volatile var lastConsumerControl: Consumer.Control? = null
val consumer: CompletionStage<Done> =
RestartSource.onFailuresWithBackoff(
Duration.ofSeconds(3),
Duration.ofSeconds(30),
0.2,
20) {
Consumer.committableSource(consumerSettings, Subscriptions.topics("test-topic"))
.mapAsync(4) { msg ->
log.info("Processing $msg")
// Do some processing and return a CompletionStage
}
.mapMaterializedValue {
lastConsumerControl = it
}
}
.log("consumer-stream")
.addAttributes(
Attributes.createLogLevels(
Attributes.logLevelOff(), // onElement
Attributes.logLevelError(), // onFailure
Attributes.logLevelWarning())) // onFinish
.toMat(committerSink, Keep.right())
.run(ActorMaterializer.create(system))
.thenApply {
log.warn("Consumer future is done!")
it
}
suspend fun main() {
log.info("Started")
CoordinatedShutdown.get(system).addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate(), "stop-kafka-consumer", Supplier {
log.info("triggered termination")
val completionStage: CompletionStage<Done> = (lastConsumerControl?.drainAndShutdown(consumer, Runnable::run)
?: CompletableFuture.completedFuture(done()))
.thenApply { log.warn("drainAndShutdown future is done!"); it }
completionStage
})
consumer.await()
log.info("Exiting")
}
Scenario: send SIGTERM
to the process.
Expected behavior: the process terminates immediately.
Observed behavior: the process terminates after before-actor-system-terminate
phase times out.
Output:
2019-08-20 17:58:17,107 INFO [TestConsumer] [tcgd-akka.actor.default-dispatcher-3] [NO_DOMAIN] [none] [] - triggered termination
[ERROR] [08/20/2019 17:58:17.115] [tcgd-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://tcgd/system/StreamSupervisor-0)] [consumer-stream] Upstream finished.
2019-08-20 17:58:17,133 WARN [TestConsumer] [Thread-2] [NO_DOMAIN] [none] [] - Consumer future is done!
2019-08-20 17:58:17,138 INFO [TestConsumer] [Thread-2] [NO_DOMAIN] [none] [] - Exiting
[WARN] [08/20/2019 17:58:30.161] [tcgd-akka.actor.default-dispatcher-2] [CoordinatedShutdown(akka://tcgd)] Coordinated shutdown phase [before-actor-system-terminate] timed out after 13000 milliseconds
2019-08-20 17:58:30,202 INFO [o.a.k.c.c.i.AbstractCoordinator] [tcgd-akka.kafka.default-dispatcher-6] [NO_DOMAIN] [none] [] - [Consumer clientId=test-consumer-akka://tcgd, groupId=test-group] Sending LeaveGroup request to coordinator kafka-0.kafka.default.svc.cluster.local:9092 (id: 2147483647 rack: null)
2019-08-20 17:58:30,203 WARN [TestConsumer] [Thread-3] [NO_DOMAIN] [none] [] - drainAndShutdown future is done!
Note: I’ve changed the timeout for that shutdown phase to 13s to be sure that drainAndShutdown
waits until its end.
Why does drainAndShutdown
future resolve only after the actor system is terminated? I’d expect it to shut down immediately.