Alpakka JMS crashing on maximum numbers of allowed connections

Error:
Could not accept connection : org.apache.activemq.transport.tcp.ExceededMaximumConnectionsException: Exceeded the maximum number of allowed client connections. See the ‘maximumConnections’ property on the TCP transport configuration URI in the ActiveMQ configuration file (e.g., activemq.xml)
Code:
Sink<JmsTextMessage, CompletionStage> jmsSink = JmsProducer.sink(producerSettings);
List jsonList = Arrays.asList(json);
return Source.from(jsonList).map(JmsTextMessage::create).runWith(jmsSink, mat);
Questions:
1 - The connection pool is opaque? It appears like it’s making 12 connections per server. Is this correct?
2 - How do I control the connection pool? I’ve got a very limited understanding of what it’s doing and how it handles connections under the hood.
3 - What’s the best way to handle these exceptions?
4 - Our connection is inside of a singleton actor, is this correct?
Sorry for all the questions however in the documentation it has 4 examples of Alpakka being a jms consumer, but none of being a producer. If anyone could help me with these questions or provide me with so documentation that would be fantastic.
Thanks
David

Which version of Alpakka are you using?

The latest vesrsion has JMS producer examples as well: https://doc.akka.io/docs/alpakka/current/jms/producer.html

Thanks for getting back. We’re using the latest. I’ve also taken tips from this post(Alpakka JMS connection caching) and used a Source.queue however I’m still getting the same issue of many, many connections being created.

From the code you have posted above, it looks like you are calling runWith from some method. How often is that method called? Everytime runWith is called on a stream that has JmsProducer.sink a new Session will be initialized.

You should have the stream that sends messages to JMS materialize only once.

Thanks Martynas. For now we have made Synchronous to get it all to work. Our understanding of back pressure might be wrong but we’ve got it all going for now by doing this:
QueueOfferResult queueOfferResult = offer.toCompletableFuture().get();

I still would like to know more about how in Alpakka we can control the connection pool.