Greetings all,
I came across some struggle using akka-stream-alpakka-jms_2.13-5.0.0.
The problem
given a ConnectionFactory set with a clientId (due to being a durable topic subscription solution)
var connectionFactory = new ActiveMQConnectionFactory()
connectionFactory.setBrokerURL("vm://localhost:8000")
connectionFactory.setClientID("some-client-id")
and a JmsConsumerSettings
JmsConsumerSettings(system, connectionFactory)
.withDurableTopic("test-topic", "some-subscriber")
.withSessionCount(1)
.withAckTimeout(3.seconds)
.withFailStreamOnAckTimeout(true)
when ack timeout is reached, the exception is thrown and the stream begins to restart
[WARN] [FxTradeBridgeTest-akka.actor.default-dispatcher-26] [RestartWithBackoffSource(akka://FxTradeBridgeTest)] Restarting stream due to failure [1]: akka.stream.alpakka.jms.JmsTxAckTimeout: The TxEnvelope didn't get committed or rolled back within ack-timeout (3 seconds)
when RestartSource kicks off and a new connection is attempted, it throws InvalidClientIDException, for it cannot have multiple connections with the same clientId
javax.jms.InvalidClientIDException: Broker: localhost - Client: some-client-id already connected from vm://localhost#4
Analysis
on stream failure
when JmsConnector>finishStop>closeSessions() gets called
- the session is closed
- the connection remains open
PS: if at this moment I invoke connection.close() via evaluator, the stream is restarted gracefully and a new connection is successfully created!
Could you please advise / share a workaround?
Thanks a bunch!