My use case is I want to create an Akka stream in which the source is a java Queue. The stream should keep on the pooling values from the queue and if the queue is empty then wait for the values in the queue. Akka stream queue is another option but if in case of any failure I want to store the values that are there in the queue (I don’t know how to do that with Akka stream Queue). I tried the following:
Note that java.util.Queue won’t be safe in general to push values onto from one thread and then consume on a different thread like it will do with Akka Stream. You will need to specifically use a thread safe one for that to work, such as an implementation of java.util.BlockingQueue. It will still mean you need to block a thread just waiting for elements though, so if you can avoid it, it is better to use for example Source.queue.
If you have to use a Java Queue, and can make it be a thread safe queue, you can implement that with Source.unfoldResource and block on poll (in a loop) to produce elements.
Hey john,
Thank you for your response. Can you explain java blocking queue as stream source with an example?Also Is there any function in Akka stream to set a polling interval on source?
//if you can pull 0 or more data at once
def getNDataFromTheQueue(n: Int): Seq[Data]
val source: Source[Data, NotUsed] = Source.tick(1.millis, 250.millis, 1).mapConcat(_ => getNDataFromTheQueue(10))
(I think kinda same can be achived with Source.repeate + throttle.)
I would be more happy if you could complete the remaining part of the puzzle, but I’m always up for the challenge:
import java.util.concurrent._
import akka.stream.scaladsl._
import akka.actor.Cancellable
import concurrent.duration._
type Data = String
val bq: BlockingQueue[Data] = new ArrayBlockingQueue[Data](1000)
def getNDataFromTheQueue(n: Int): List[Data] = {
import scala.collection.JavaConverters._
val arrayList = new java.util.ArrayList[Data]()
bq.drainTo(arrayList, n)
arrayList.asScala.toList
}
val source: Source[Data, Cancellable] = Source.tick(1.millis, 250.millis, 1).mapConcat(_ => getNDataFromTheQueue(10))
This compiles. Also your bq would be come from somewhere else, and the Data type alias can be used to your concrete type, or can be totally find-and-replaced. (Ofc. this code can be written in java too, the main logic would be the same, you drain the queue, and convert the list to an immutable one, and you need to use javadsl, and some more verbose duration settings.)
val printer = Flow[Data].map { x =>
println(x)
x
}
Super easy test flow:
val future: concurrent.Future[Done] =
source
.via(printer)
.runWith(Sink.ignore)
With these two, the flow works, so the problem is with the ask!
Another thing: read the errors: Cannot cast java.lang.String to scala.runtime.Nothing$
Where the hell we cast String to Nothing?
With these infos you can reach the answer faster: .ask[Data](actor)
For the record, it was super slow for me to find that missing type param too :P
Another common trick to click into the functions you use and read the docs :D