Process not exit when have fatal error

Hi, when I have fatal error, the process not exit, I have SupervisionStrategy but this excpetion not handled.

When I put errored Kafka Bootstrap, for example:

[error] (run-main-0) org.apache.kafka.common.KafkaException: Failed to construct kafka producer
[error] org.apache.kafka.common.KafkaException: Failed to construct kafka producer
[error] at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:429)
[error] at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:286)
[error] at akka.kafka.ProducerSettings$.createKafkaProducer(ProducerSettings.scala:149)
[error] at akka.kafka.ProducerSettings$$anonfun$apply$3.apply(ProducerSettings.scala:70)
[error] at akka.kafka.ProducerSettings$$anonfun$apply$3.apply(ProducerSettings.scala:70)
[error] at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:316)
[error] at akka.kafka.scaladsl.Producer$$anonfun$3.apply(Producer.scala:197)
[error] at akka.kafka.scaladsl.Producer$$anonfun$3.apply(Producer.scala:197)
[error] at akka.kafka.internal.DefaultProducerStage.createLogic(DefaultProducerStage.scala:37)
[error] at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:106)
[error] at akka.stream.stage.GraphStageWithMaterializedValue.createLogicAndMaterializedValue(GraphStage.scala:50)
[error] at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:674)
[error] at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:491)
[error] at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:446)
[error] at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:443)
[error] at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:629)
[error] at akka.stream.scaladsl.Source.runWith(Source.scala:106)
[error] at com.lojasrenner.distribution.math.engine.Main$.main(Main.scala:52)
[error] at com.lojasrenner.distribution.math.engine.Main.main(Main.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
[error] at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:86)
[error] at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:407)
[error] at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:286)
[error] at akka.kafka.ProducerSettings$.createKafkaProducer(ProducerSettings.scala:149)
[error] at akka.kafka.ProducerSettings$$anonfun$apply$3.apply(ProducerSettings.scala:70)
[error] at akka.kafka.ProducerSettings$$anonfun$apply$3.apply(ProducerSettings.scala:70)
[error] at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:316)
[error] at akka.kafka.scaladsl.Producer$$anonfun$3.apply(Producer.scala:197)
[error] at akka.kafka.scaladsl.Producer$$anonfun$3.apply(Producer.scala:197)
[error] at akka.kafka.internal.DefaultProducerStage.createLogic(DefaultProducerStage.scala:37)
[error] at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:106)
[error] at akka.stream.stage.GraphStageWithMaterializedValue.createLogicAndMaterializedValue(GraphStage.scala:50)
[error] at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:674)
[error] at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:491)
[error] at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:446)
[error] at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:443)
[error] at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:629)
[error] at akka.stream.scaladsl.Source.runWith(Source.scala:106)
[error] at com.lojasrenner.distribution.math.engine.Main$.main(Main.scala:52)
[error] at com.lojasrenner.distribution.math.engine.Main.main(Main.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)

I have this error, but process don’t exit.

I need this for Kubernetes check when stream have error and restart pod.

I found this solution, is ok?

try {
      source
        .via(JsonFlow.kafkaParser)
        .via(applyBusinessRules)
        .via(JsonFlow.stringify)
        .via(producerRecord)
        .runWith(sink)
    } catch {
      case e: Throwable => {
        log.error(e.getMessage, e)
        Thread.sleep(DELAY_BEFORE_EXIT)
        System.exit(-1)
      }
    }

Not really…
I think you want something like this:

val stream = source.via(flows).runwith(sink)
stream.onComplete{
  case _ => //some code which will kill your app
}

I think your stream will return with a Future[Something] and you want to handle that future.
Probably you want to stop the actor system before your System.exit with a terminate.

But with this code, I don’t know what happen, not handle error message.

And not work with errors like “Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: any”

I think it starts to go out of scope from here, but I will explain it:

When you start a stream, it will give back the running-context to the creation thread, and start to run on a threadpool (by default in the actor systems threadpool).
Which means if you do try{createStream()} it will only catch “creation” time errors (like you try to do a Broadcast with -1 outputs).
Most of the stream elements have a Materialized value, which is combinable (you can choose the left, or the right Mat value, or combine them with a function). These Mat values can “do things” with a running stream. For example, you can get back Binding info, so after the stream created you know which random udp port get binded. Or you can get back callback objects, if you call some functions on it the stream will die (see killswitch in the docs). The same way most of the Sinks produce some kind of Future[Something], which will complete if the Sink is completed. You can wait for Futures (total antipattern), you can compose them (with map/flatMap), and you can register callbacks to them (like the onComplete). The onComplete waits for a partial function and for your case you want something like:

f.onComplete{
  case Success(_) => //Something strange happened this stream should never complete happily
  case Failure(e: ConfigException) => //config exception
  case Failure(e: SomeOtherException)  => e.printStackTrace
  case Failure(e) => //every other exception
}

I think you should read some tutorials about Future in general. Handling application lifecycle with actor systems was never easy bcs of a lot of different concepts, but without knowing how the big picture works, I think it is even harder.