Hi.
I’m want to receive the current JmsConnectorState, but how can I receive this if the RestartSource.withBackoff returns NotUsed materialize?
My code:
def jmsSource(): Source[AckEnvelope, JmsConsumerControl] = JmsConsumer.ackSource(
JmsConsumerSettings(consumerConfig, connectionFactory)
.withQueue(endpoint)
.withAcknowledgeMode(if (mqAutoAck) {
AcknowledgeMode.AutoAcknowledge
} else {
AcknowledgeMode.ClientAcknowledge
})
)
// in the following code I want to receive **JmsConsumerControl** instead of **NotUsed**
private[this] val restartSource: Source[AckEnvelope, NotUsed] = RestartSource
.withBackoff(
RestartSettings(
minBackoff = 3.seconds,
maxBackoff = 10.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
)
) { () =>
jmsSource()
}
// and now, for each message I received I want to log the current jms connection state and other actions
restartSource.mapMaterializedValue(....) etc
If some one can help, I would appreciate it.
Thanks,
Asael