java.lang.OutOfMemoryError: Java heap space when Connecting to Kafka

I’m getting this OOM when connecting to Kafka ( AWS MSK ) using Akka Cluster, Alpakka and SSL. Btw, it works when SSL is off.

marciomarinho@MacBook-Pro-3 trade % kubectl logs trade-84b848bf5d-djb74
SLF4J: A number (4) of logging calls during the initialization phase have been intercepted and are
SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
SLF4J: See also http://www.slf4j.org/codes.html#replay
[2020-09-07 00:48:42,812] [INFO] [] [akka.event.slf4j.Slf4jLogger] [Trade-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[2020-09-07 00:48:43,302] [INFO] [akka://Trade@10.62.128.217:36319] [akka.remote.artery.tcp.ArteryTcpTransport] [Trade-akka.actor.default-dispatcher-3] [ArteryTcpTransport(akka://Trade)] - Remoting started with transport [Artery tcp]; listening on address [akka://Trade@10.62.128.217:36319] with UID [2700251691979606742]
[2020-09-07 00:48:43,328] [INFO] [akka://Trade@10.62.128.217:36319] [akka.cluster.Cluster] [Trade-akka.actor.default-dispatcher-3] [Cluster(akka://Trade)] - Cluster Node [akka://Trade@10.62.128.217:36319] - Starting up, Akka version [2.6.8] ...
[2020-09-07 00:48:43,578] [INFO] [akka://Trade@10.62.128.217:36319] [akka.cluster.Cluster] [Trade-akka.actor.default-dispatcher-3] [Cluster(akka://Trade)] - Cluster Node [akka://Trade@10.62.128.217:36319] - Registered cluster JMX MBean [akka:type=Cluster]
[2020-09-07 00:48:43,578] [INFO] [akka://Trade@10.62.128.217:36319] [akka.cluster.Cluster] [Trade-akka.actor.default-dispatcher-3] [Cluster(akka://Trade)] - Cluster Node [akka://Trade@10.62.128.217:36319] - Started up successfully
[2020-09-07 00:48:43,736] [INFO] [akka://Trade@10.62.128.217:36319] [akka.cluster.Cluster] [Trade-akka.actor.default-dispatcher-7] [Cluster(akka://Trade)] - Cluster Node [akka://Trade@10.62.128.217:36319] - No seed-nodes configured, manual cluster join required, see https://doc.akka.io/docs/akka/current/typed/cluster.html#joining
[2020-09-07 00:48:43,745] [INFO] [akka://Trade@10.62.128.217:36319] [akka.cluster.sbr.SplitBrainResolver] [Trade-akka.actor.default-dispatcher-11] [akka://Trade/system/cluster/core/daemon/downingProvider] - SBR started. Config: stableAfter: 20000 ms, strategy: KeepMajority, selfUniqueAddress: UniqueAddress(akka://Trade@10.62.128.217:36319,2700251691979606742), selfDc: default
[2020-09-07 00:48:44,304] [INFO] [akka://Trade@10.62.128.217:36319] [akka.management.internal.HealthChecksImpl] [Trade-akka.actor.default-dispatcher-11] [HealthChecksImpl(akka://Trade)] - Loading readiness checks [(cluster-membership,akka.management.cluster.scaladsl.ClusterMembershipCheck)]
[2020-09-07 00:48:44,305] [INFO] [akka://Trade@10.62.128.217:36319] [akka.management.internal.HealthChecksImpl] [Trade-akka.actor.default-dispatcher-11] [HealthChecksImpl(akka://Trade)] - Loading liveness checks []
[2020-09-07 00:48:44,408] [INFO] [akka://Trade@10.62.128.217:36319] [akka.management.scaladsl.AkkaManagement] [Trade-akka.actor.default-dispatcher-11] [AkkaManagement(akka://Trade)] - Binding Akka Management (HTTP) endpoint to: 10.62.128.217:8558
[2020-09-07 00:48:44,468] [INFO] [akka://Trade@10.62.128.217:36319] [akka.management.scaladsl.AkkaManagement] [Trade-akka.actor.default-dispatcher-11] [AkkaManagement(akka://Trade)] - Including HTTP management routes for ClusterHttpManagementRouteProvider
[2020-09-07 00:48:44,514] [INFO] [akka://Trade@10.62.128.217:36319] [akka.management.scaladsl.AkkaManagement] [Trade-akka.actor.default-dispatcher-11] [AkkaManagement(akka://Trade)] - Including HTTP management routes for ClusterBootstrap
[2020-09-07 00:48:44,522] [INFO] [akka://Trade@10.62.128.217:36319] [akka.management.cluster.bootstrap.ClusterBootstrap] [Trade-akka.actor.default-dispatcher-11] [ClusterBootstrap(akka://Trade)] - Using self contact point address: http://10.62.128.217:8558
[2020-09-07 00:48:44,544] [INFO] [akka://Trade@10.62.128.217:36319] [akka.management.scaladsl.AkkaManagement] [Trade-akka.actor.default-dispatcher-11] [AkkaManagement(akka://Trade)] - Including HTTP management routes for HealthCheckRoutes
[2020-09-07 00:48:45,239] [INFO] [akka://Trade@10.62.128.217:36319] [akka.management.scaladsl.AkkaManagement] [Trade-akka.actor.default-dispatcher-12] [AkkaManagement(akka://Trade)] - Bound Akka Management (HTTP) endpoint to: 10.62.128.217:8558
[2020-09-07 00:48:45,364] [INFO] [akka://Trade@10.62.128.217:36319] [akka.kafka.internal.SingleSourceLogic] [Trade-akka.actor.default-dispatcher-12] [SingleSourceLogic(akka://Trade)] - [fe2d3] Starting. StageActor Actor[akka://Trade/system/Materializers/StreamSupervisor-0/$$a#503737864]
[2020-09-07 00:48:45,381] [INFO] [] [org.apache.kafka.clients.consumer.ConsumerConfig] [Trade-akka.kafka.default-dispatcher-14] [] - ConsumerConfig values:
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [b-3.akkacluster.quhv42.c4.kafka.ap-southeast-2.amazonaws.com:9094, b-1.akkacluster.quhv42.c4.kafka.ap-southeast-2.amazonaws.com:9094, b-2.akkacluster.quhv42.c4.kafka.ap-southeast-2.amazonaws.com:9094]
	check.crcs = true
	client.dns.lookup = default
	client.id =
	client.rack =
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = register-trade-topic-group-id
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

[2020-09-07 00:48:45,382] [INFO] [] [org.apache.kafka.clients.consumer.ConsumerConfig] [Trade-akka.kafka.default-dispatcher-13] [] - ConsumerConfig values:
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [b-3.akkacluster.quhv42.c4.kafka.ap-southeast-2.amazonaws.com:9094, b-1.akkacluster.quhv42.c4.kafka.ap-southeast-2.amazonaws.com:9094, b-2.akkacluster.quhv42.c4.kafka.ap-southeast-2.amazonaws.com:9094]
	check.crcs = true
	client.dns.lookup = default
	client.id =
	client.rack =
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = register-trade-topic-group-id
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = SSL
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = SSL
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[2020-09-07 00:48:45,529] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [Trade-akka.kafka.default-dispatcher-14] [] - Kafka version: 2.4.1
[2020-09-07 00:48:45,530] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [Trade-akka.kafka.default-dispatcher-14] [] - Kafka commitId: c57222ae8cd7866b
[2020-09-07 00:48:45,531] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [Trade-akka.kafka.default-dispatcher-14] [] - Kafka startTimeMs: 1599439725527
[2020-09-07 00:48:45,537] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [Trade-akka.kafka.default-dispatcher-13] [] - Kafka version: 2.4.1
[2020-09-07 00:48:45,538] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [Trade-akka.kafka.default-dispatcher-13] [] - Kafka commitId: c57222ae8cd7866b
[2020-09-07 00:48:45,540] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [Trade-akka.kafka.default-dispatcher-13] [] - Kafka startTimeMs: 1599439725535
[2020-09-07 00:48:45,547] [INFO] [] [org.apache.kafka.clients.consumer.KafkaConsumer] [Trade-akka.kafka.default-dispatcher-14] [] - [Consumer clientId=consumer-register-trade-topic-group-id-2, groupId=register-trade-topic-group-id] Subscribed to topic(s): register-trade-topic
Uncaught error from thread [Trade-akka.kafka.default-dispatcher-20]: Java heap space, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[Trade]
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
	at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
	at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:536)
	at akka.kafka.internal.KafkaConsumerActor.commitAndPoll(KafkaConsumerActor.scala:522)
	at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:506)
	at akka.kafka.internal.KafkaConsumerActor$$anonfun$regularReceive$1.applyOrElse(KafkaConsumerActor.scala:296)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:212)
	at akka.actor.Timers.aroundReceive(Timers.scala:52)
	at akka.actor.Timers.aroundReceive$(Timers.scala:41)
	at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:212)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
[2020-09-07 00:48:46,149] [ERROR] [akka://Trade@10.62.128.217:36319] [akka.actor.ActorSystemImpl] [Trade-akka.actor.default-dispatcher-12] [akka.actor.ActorSystemImpl(Trade)] - Uncaught error from thread [Trade-akka.kafka.default-dispatcher-20]: Java heap space, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[Trade]
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
	at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
	at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:536)
	at akka.kafka.internal.KafkaConsumerActor.commitAndPoll(KafkaConsumerActor.scala:522)
	at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:506)
	at akka.kafka.internal.KafkaConsumerActor$$anonfun$regularReceive$1.applyOrElse(KafkaConsumerActor.scala:296)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:212)
	at akka.actor.Timers.aroundReceive(Timers.scala:52)
	at akka.actor.Timers.aroundReceive$(Timers.scala:41)
	at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:212)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
[2020-09-07 00:48:46,166] [INFO] [akka://Trade@10.62.128.217:36319] [akka.actor.LocalActorRef] [Trade-akka.actor.default-dispatcher-11] [akka://Trade/system/cluster/core/daemon] - Message [akka.cluster.ClusterUserAction$Leave] to Actor[akka://Trade/system/cluster/core/daemon#-1961940792] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[2020-09-07 00:48:46,170] [INFO] [akka://Trade@10.62.128.217:36319] [akka.cluster.Cluster] [Trade-akka.actor.default-dispatcher-11] [Cluster(akka://Trade)] - Cluster Node [akka://Trade@10.62.128.217:36319] - Exiting completed
[2020-09-07 00:48:46,174] [INFO] [akka://Trade@10.62.128.217:36319] [akka.cluster.Cluster] [Trade-akka.actor.default-dispatcher-11] [Cluster(akka://Trade)] - Cluster Node [akka://Trade@10.62.128.217:36319] - Shutting down...
[2020-09-07 00:48:46,185] [INFO] [akka://Trade@10.62.128.217:36319] [akka.cluster.Cluster] [Trade-akka.actor.default-dispatcher-11] [Cluster(akka://Trade)] - Cluster Node [akka://Trade@10.62.128.217:36319] - Successfully shut down
[2020-09-07 00:48:46,225] [INFO] [akka://Trade@10.62.128.217:36319] [akka.remote.RemoteActorRefProvider$RemotingTerminator] [Trade-akka.actor.default-dispatcher-12] [akka://Trade@10.62.128.217:36319/system/remoting-terminator] - Shutting down remote daemon.
[2020-09-07 00:48:46,235] [INFO] [akka://Trade@10.62.128.217:36319] [akka.remote.RemoteActorRefProvider$RemotingTerminator] [Trade-akka.actor.default-dispatcher-12] [akka://Trade@10.62.128.217:36319/system/remoting-terminator] - Remote daemon shut down; proceeding with flushing remote transports.
[2020-09-07 00:48:46,237] [INFO] [] [org.apache.kafka.clients.Metadata] [Trade-akka.kafka.default-dispatcher-13] [] - [Consumer clientId=consumer-register-trade-topic-group-id-1, groupId=register-trade-topic-group-id] Cluster ID: XDsNaaboR9qOhqe6SH16_g
[2020-09-07 00:48:46,261] [INFO] [akka://Trade@10.62.128.217:36319] [akka.actor.ActorSystemImpl] [Trade-akka.actor.default-dispatcher-12] [akka.actor.ActorSystemImpl(Trade)] - Retrieved 128 partitions for topic 'register-trade-topic'
[2020-09-07 00:48:46,272] [INFO] [akka://Trade@10.62.128.217:36319] [akka.remote.RemoteActorRefProvider$RemotingTerminator] [Trade-akka.actor.default-dispatcher-11] [akka://Trade@10.62.128.217:36319/system/remoting-terminator] - Remoting shut down.

This is my pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>clearpay.systems</groupId>
    <artifactId>trade</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <akka.version>2.6.8</akka.version>
        <akka-http.version>10.1.11</akka-http.version>
        <akka-management.version>1.0.8</akka-management.version>
        <surefire.version>2.22.2</surefire.version>
        <maven.build.timestamp.format>yyyyMMdd-HHmm</maven.build.timestamp.format>
        <timestamp>${maven.build.timestamp}</timestamp>
        <git-commit-id-plugin.version>2.2.4</git-commit-id-plugin.version>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor-typed_2.13</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-cluster-typed_2.13</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-cluster-sharding-typed_2.13</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-serialization-jackson_2.13</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-http_2.13</artifactId>
            <version>${akka-http.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-http-jackson_2.13</artifactId>
            <version>${akka-http.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-stream-typed_2.13</artifactId>
            <version>${akka.version}</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-stream-kafka-cluster-sharding_2.13</artifactId>
            <version>2.0.4</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-persistence-typed_2.13</artifactId>
            <version>${akka.version}</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-discovery_2.13</artifactId>
            <version>${akka.version}</version>
        </dependency>

        <dependency>
            <groupId>com.lightbend.akka.management</groupId>
            <artifactId>akka-management-cluster-bootstrap_2.13</artifactId>
            <version>${akka-management.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.typesafe.akka</groupId>
                    <artifactId>akka-discovery_2.13</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.lightbend.akka.discovery</groupId>
            <artifactId>akka-discovery-kubernetes-api_2.13</artifactId>
            <version>${akka-management.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.typesafe.akka</groupId>
                    <artifactId>akka-discovery_2.13</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.lightbend.akka.management</groupId>
            <artifactId>akka-management-cluster-http_2.13</artifactId>
            <version>${akka-management.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.typesafe.akka</groupId>
                    <artifactId>akka-discovery_2.13</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.junit.platform</groupId>
            <artifactId>junit-platform-launcher</artifactId>
            <version>1.6.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.6.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.6.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.vintage</groupId>
            <artifactId>junit-vintage-engine</artifactId>
            <version>5.6.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-junit-jupiter</artifactId>
            <version>3.3.3</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

</project>

Any ideas?

It works fine when not using SSL.

Unless you are happy using TLS, try to capture a heapdump to see what was filling up your heap. That it was the HeapBuffer allocation that failed does not necessarily mean it was what made the heap full.

Hi Johan,

This is exactly my problem. I am setting up the consumer/extractor with TLS/SSL but getting that weird OOM exception:

        public static final String SECURITY_PROTOCOL_KEY = "akka.kafka-clients.security.protocol";
        public static final String SSL_PROTOCOL_KEY = "akka.kafka-clients.ssl.protocol";
        public static final String SECURITY_PROTOCOL = "security.protocol";
        public static final String SSL_PROTOCOL = "ssl.protocol";

        final String securityProtocol = ConfigFactory.load().getString(SECURITY_PROTOCOL_KEY);
        final String sslProtocol = ConfigFactory.load().getString(SSL_PROTOCOL_KEY);

        CompletionStage<KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor<Trade.Command>> messageExtractor =
                KafkaClusterSharding.get(system)
                        .messageExtractorNoEnvelope(
                                REGISTER_TRADE_TOPIC,
                                Duration.ofSeconds(10),
                                (Trade.Command msg) -> msg.toString(),
                                ConsumerSettings.create(
                                        Adapter.toClassic(system), new StringDeserializer(), new StringDeserializer())
                                        .withBootstrapServers(kafkaBootstrap)
                                        .withProperty(SECURITY_PROTOCOL, securityProtocol)
                                        .withProperty(SSL_PROTOCOL, sslProtocol)
                                        .withGroupId(
                                                ENTITY_TYPE_KEY
                                                        .name()));

On the configuration:

  kafka-clients {
    bootstrap.servers = ${?KAFKA_BROKER}
    security.protocol = ${?SECURITY_PROTOCOL}
    ssl.protocol = ${?SSL_PROTOCOL}
  }

Passing the variable to EKS.

          - name: KAFKA_BROKER
            value: b-2.akkacluster.quhv42.c4.kafka.ap-southeast-2.amazonaws.com:9094,b-3.akkacluster.quhv42.c4.kafka.ap-southeast-2.amazonaws.com:9094,b-1.akkacluster.quhv42.c4.kafka.ap-southeast-2.amazonaws.com:9094
          - name: SECURITY_PROTOCOL
            value: SSL
          - name: SSL_PROTOCOL
            value: SSL

Definitely does not work when SSL is on.

Hi,

This is a known issue on a SSL misconfiguration. Check if you have a proper config for the trust / keystores.

Igmar