Akka-Streams are quite new to me and I wanted to get some hands-on experience. I was thinking of setting up a Qpid broker and building a Producer and Consumer for it.
While looking at Alpakka-AMQP, I noticed that there doesn’t seem to be (at first glance) a way to setup a listener, which will receive messages from the broker once those have been published by the Producer.
I can successfully run the example from the documentation but once I consume the messages, the connection gets closed and the consumer is no longer listening. Specifically, I refer to the example about receiving:
val amqpSource = AmqpSource.atMostOnceSource(
NamedQueueSourceSettings(connectionProvider, queueName).withDeclaration(queueDeclaration),
bufferSize = 10
)
val result = amqpSource.take(5).runWith(Sink.seq)
I am most likely missing something and will appreciate your help.
Hi Toni, the take(n) operator will accept at most n elements before it cancels upstream, so if you want it to continuously process elements from the topic you should use take. Additionally Sink.seq will collect all elements into memory so that would be a pretty bad idea if the stream is “infinite”.
The collect finite number of elements logic in the sample is likely to keep the sample as simple as possible.
Another basic example that will keep running “forever” (until you terminate the ActorSystem backing it), and print each element, would be: