Hi I have a code sample that reproduces a problem I see when trying to integrate a third party api, and get backpressure to block that library.
One option I explored was to wrap the third party library in a custom Source but
I cant see an easy way to turn the third party library into an Akka Source, as the data is returned by an object that is created at runtime by the api, and the api is huge.
What I would like to happen is that an external library should block when it maxes out the throttle, however it doesnt appear to happen, CPU is at 100% on all cores and memory goes up.
import akka.stream._
import akka.stream.scaladsl._
import akka.{ NotUsed, Done }
import akka.actor.ActorSystem
import akka.util.ByteString
import scala.concurrent._
import scala.concurrent.duration._
import java.nio.file.Paths
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.OutHandler
object QueueDemo extends App {
implicit val system = ActorSystem("QueueStream")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val sink:Sink[Int,Future[Done]]=Sink.foreach(x ⇒ println(x))
val res: (SourceQueueWithComplete[Int], Future[Done]) = Source
.queue[Int](100, OverflowStrategy.backpressure)
.throttle(100, 1.second, 1, ThrottleMode.shaping)
.toMat(sink)(Keep.both)
.run()
val queue=res._1
val done=res._2
val n=4000*4000
println(n)
//simulate a process that reads data and who's api is not modifiable
(1 to n).map(value=>
queue.offer(value).map {
case QueueOfferResult.Enqueued ⇒ println(s"queued $value")
case QueueOfferResult.Dropped ⇒ println(s"dropped $value")
case QueueOfferResult.Failure(ex) ⇒ println(s"Offer failed ${ex.getMessage}")
case QueueOfferResult.QueueClosed ⇒ println("Source Queue closed")
}(ec))
done.onComplete(_ => {println("Done"); system.terminate()} )
}
That’s not how Source.queue works, it gives you back a future which it completes successfully if the element got queued or fails if the element couldn’t be put in the buffer because it was full etc. That means that in this sample you will create futures as fast as your cpu core can muster, which of the first 100 will end up in the buffer, and then most of the rest will fail.
If the only option is to have someone call you, as in a blocking api pushing data, and need to block the thread to make the caller not do anything more, I don’t think you can achieve that in a reasonable way with queue and I can’t think of a streams operator that does that out of the box.
The closest is the StreamConverters.asOutputStream which materializes into an OutputStream which will block if there is no demand, but that is only for bytes. The easiest way for other types of elements will likely be to write your own source GraphStage which manages a thread to block on and safely passing things between that and the running stream.
On the other hand what you have is pulling data, which is more common in blocking circumstances (OutputStream, JDBC etc) you can do that with Source.unfoldResource which will call you back and let you pull new elements only when there is demand/no backpressure.
From the docs its not clear what happens when the queue fills from the producer side …
Source.queue
Source.queue can be used for emitting elements to a stream from an actor (or from anything running outside the stream). The elements will be buffered until the stream can process them. You can offer elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received.
Would it be possible to clarify that.
Also would it make sense to have a put method as well as an offer on SourceQueueWithComplete
Here is my attempt at the solution.
On a related note, how do I pass the queue only to graphstagelogic, instead of it being state of the graph stage
It seems to work as I expected. Memory is stable, cpu at 50%. Is what I have implemented OK from a performance point of view?
Feel free to use the code for whatever …
Thanks
import akka.stream._
import akka.stream.scaladsl._
import akka.{ NotUsed, Done }
import akka.actor.ActorSystem
import akka.util.ByteString
import scala.concurrent._
import scala.concurrent.duration._
import java.nio.file.Paths
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.OutHandler
import java.util.concurrent.BlockingQueue
import java.util.concurrent.ArrayBlockingQueue
object QueueDemo extends App {
implicit val system = ActorSystem("QueueStream")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val sink:Sink[Int,Future[Done]]=Sink.foreach(x ⇒ println(x))
val queue:BlockingQueue[Int]=new ArrayBlockingQueue[Int](10)
val graph=new BlockingQueueSource(queue)
val res=Source.fromGraph(graph)
.toMat(sink)(Keep.both)
.run()
val done=res._2
val n=4000*4000
println(n)
//simulate a process that pushes data and who's api is not modifiable
(1 to n).map(value=>queue.put(value))
done.onComplete(_ => {println("Done"); system.terminate()} )
}
class BlockingQueueSource[A](queue:BlockingQueue[A]) extends GraphStage[SourceShape[A]] {
val out: Outlet[A] = Outlet("BlockingQueueSource")
override val shape: SourceShape[A] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
//TODO get queue from an attribute
//TODO val queue=inheritedAttributes.get(classOf[BlockingQueue[A]])
override def onPull(): Unit = {
val data=queue.take()
push(out, data)
}
})
}
}
You must not block in onPull like that, it will potentially block the entire stream from executing, so while you want to block on the pushing side, you must rather poll(), and then schedule a check for new elements in a while if there was nothing in the queue and keep doing that until there is an element to emit.
Ok, thanks for taking time to explain things. I only started using akka streams a few weeks ago, so thanks for your patience.
Just to recap then, is this the correct way to do it:
object QueueDemo extends App {
implicit val system = ActorSystem("QueueStream")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val sink:Sink[Int,Future[Done]]=Sink.foreach(x ⇒ println(x))
val queue:BlockingQueue[Int]=new ArrayBlockingQueue[Int](10)
val source=Source.unfoldResource[Int,BlockingQueue[Int]](()=>queue, (q:BlockingQueue[Int])=>Some(q.take()), (q:BlockingQueue[Int])=>{})
val res=source
.toMat(sink)(Keep.both)
.run()
val done=res._2
val n=4000*4000
//simulate a process that pushes data and who's api is not modifiable
(1 to n).map(value=>queue.put(value))
done.onComplete(_ => {println("Done"); system.terminate()} )
}
Looks about right. UnfoldResources is already running on a separate dispatcher for blocking operations by default so blocking in the read-function is ok.
Not sure about the semantics when/if the stream stops/fails though, that will likely leave your submitting end hanging with this solution.
A custom graph stage could achieve the same without blocking a thread when there is nothing in the queue, plus provide more clear semantics on multiple materializations of the stream and completion/failure of the stream but would on the other hand be a little more work to get right.