I’m having trouble with my producer process apparently exhausting its resources.
After sending a large batch of messages the JVM process has a large number of threads named as follows:
kafka-producer-network-thread | producer-xx
My producer instance is shared across threads and looks similar to this:
class MyKafkaProducer @Inject()(actorSystem: ActorSystem)(implicit mat: Materializer) {
val config = actorSystem.settings.config.getConfig("akka.kafka.producer")
val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer)
val producer = Producer.plainSink(producerSettings)
def send(key: String, msg: String) = {
val record = new ProducerRecord[String, String]("topic", key, msg)
Source.single(record).runWith(producer)
}
}
Do I need to release producer resources somehow after invoking send
?