Adapted from this StackOverflow question:
I’m new to Akka streams. I’m trying to create a simple distributed worker example to perform an arbitrary job. I have created a random job generator:
import java.util.UUID
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
final case class Job(id: UUID)
class RandomJobSource extends GraphStage[SourceShape[Job]] {
final val out: Outlet[Job] = Outlet.create("RandomJobSource.out")
final val shape = SourceShape.of(out)
override type Shape = SourceShape[Job]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, Job(id = UUID.randomUUID()))
}
})
}
}
I can create an infinite stream of Job
s with:
val jobs = Source.fromGraph(new RandomJobSource)
Now to work off the stream, I create a balancer and worker:
def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
import GraphDSL.Implicits._
Flow.fromGraph(GraphDSL.create() { implicit b ⇒
val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
val merge = b.add(Merge[Out](workerCount))
for (_ ← 1 to workerCount) {
// for each worker, add an edge from the balancer to the worker, then wire
// it to the merge element
balancer ~> worker.async ~> merge
}
FlowShape(balancer.in, merge.out)
})
}
def worker[In, Out](f: In => Out): Flow[In, Out, NotUsed] = Flow.fromFunction(f)
private def work(job: Job): Unit = {
println(s"Doing job ${job.id}...")
Thread.sleep(5000 + Random.nextInt(1000))
}
Finally, I distribute the work. To put it all together:
import akka.stream._
import akka.stream.scaladsl._
import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
object ExampleMain extends App {
def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
import GraphDSL.Implicits._
Flow.fromGraph(GraphDSL.create() { implicit b ⇒
val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
val merge = b.add(Merge[Out](workerCount))
for (_ ← 1 to workerCount) {
// for each worker, add an edge from the balancer to the worker, then wire
// it to the merge element
balancer ~> worker.async ~> merge
}
FlowShape(balancer.in, merge.out)
})
}
def worker[In, Out](f: In => Out): Flow[In, Out, NotUsed] = Flow.fromFunction(f)
private def work(job: Job): Unit = {
println(s"Doing job ${job.id}...")
Thread.sleep(5000 + Random.nextInt(1000))
}
implicit val system: ActorSystem = ActorSystem("QuickStart")
implicit val mat: Materializer = ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher
val jobs = Source.fromGraph(new RandomJobSource)
val res: Future[Done] = jobs
.withAttributes(Attributes.inputBuffer(initial = 1, max = 1))
// Work serially to avoid under-working priority jobs
.buffer(1, OverflowStrategy.backpressure)
.wireTap(job => println(s"Next job: ${job.id}"))
// Put work in a balancer with 5 workers.
.via(balancer(worker(work), 5))
.runWith(Sink.ignore)
res.onComplete(_ => system.terminate())
}
I expect that because there’s a buffer size of 1, whenever a worker finishes its work, a new message will be pulled off the source (and as a consequence, I’ll see the "Next job: ${job.id}"
line).
What I see instead if that my first 5 workers work and some relatively large number of "Next job: ${job.id}"
lines are printed to the console:
Next job: 4f33258a-cd0f-4d54-b782-4ae97c67f125
Next job: 363a2b95-7cfa-4e8a-b683-17d2fccf48c5
Next job: 82a8ca64-1d23-474e-afc4-9b6d69a7f842
Doing job 363a2b95-7cfa-4e8a-b683-17d2fccf48c5...
Next job: d01faedf-65c3-4b88-83c2-fd0ce98607fe
Next job: 67f24856-a843-4df5-a87c-61f46b1f7141
Doing job 82a8ca64-1d23-474e-afc4-9b6d69a7f842...
Doing job 4f33258a-cd0f-4d54-b782-4ae97c67f125...
Doing job d01faedf-65c3-4b88-83c2-fd0ce98607fe...
Next job: 28b62312-a084-428f-ab61-d2426a242e52
Doing job 67f24856-a843-4df5-a87c-61f46b1f7141...
Next job: 80f9cb48-7447-494d-a1d9-e091049e7822
Next job: 38d91ef0-cc90-4bb8-a1f2-4aedfc5f655f
Next job: 546566b7-f7b6-4ed3-84c9-ddc8eb19c65a
Next job: 1a3f8f0c-be5a-4f14-a054-9f7fcd8a356a
Next job: 8f425329-2ac6-4dd0-a7b8-e77345ba125f
Next job: d1d23777-6bf8-4c91-a61d-7bc60f97c725
Next job: 628ddabe-25a1-47b0-9989-f963d6394f64
Next job: 6291e210-5228-4175-9b18-76a35d82dff0
Next job: 8c6344ad-b6f5-43a0-b709-4ddd2d080982
Next job: dd917c8d-ff12-44e2-81a7-98f66f55170c
Next job: 65449eb7-ea9d-4e29-b11f-d37190bb0001
Next job: 5649871d-a4bd-4edc-8c62-167f1da41786
Next job: af3d20cb-2e48-4668-bf42-3fabbd4cffad
Next job: 0f5c9ec2-3b69-4e91-8d87-89b0c1580d8a
Next job: 43f12408-a02c-4ac6-8564-a2ecc96155ac
Next job: 20951e30-1801-4d78-9683-dc0f45dc61e6
Next job: a95c94b9-a9ca-49b6-8ff4-eef81dcb13cd
Next job: 76107909-6f2f-4035-b34f-6c0b10a79d76
Next job: d4551999-f7fb-4bf6-9da9-3e18b8dfc4da
Next job: 2df2ceee-953f-44c8-8169-91152c0a672a
Next job: 7425e65a-89fb-45d1-9514-54b2af3c9791
Next job: f986a2a0-a82c-4fbc-af2c-f5439856bba2
Next job: ac165054-8718-4eb8-9161-40738c8b67b0
Next job: 27ea10d9-34eb-4989-9d5d-c67118eaf41e
Next job: e587fa05-2bc3-4901-8ed0-fe1efa912d3e
Next job: 77ee6cb0-77e1-4f2b-8d81-d57907755832
Next job: 9224f6eb-c81d-4bd5-b7f1-5bb09cc4e97e
...
Surely, these jobs are worked off in batches. Once they are, this pattern repeats. Why aren’t the messages being pulled off as the work is being finished, though?