Hi @Adri,
there is a race condition between thread execution and termination. Try your code with 2000 elements, instead of 20 and you’ll see how the termination does happen.
The use of Thread.sleep
doesn’t help either because that’s a blocking API that will keep hold of the thread invoking it. You could try, the following instead:
object StreamKillSwitch extends App {
implicit val system = ActorSystem(Behaviors.ignore, "sks")
implicit val ec: ExecutionContext = system.executionContext
private val cs: CoordinatedShutdown = CoordinatedShutdown(system)
val latch = new CountDownLatch(8)
val (killStream, done) =
Source(1 to 2000)
.viaMat(KillSwitches.single)(Keep.right)
.throttle(1, 1.second)
.map(i => {
system.log.info(s"Start task $i")
latch.countDown()
system.log.info(s"End task $i")
i
})
.toMat(Sink.foreach(println))(Keep.both)
.run()
cs.addTask(CoordinatedShutdown.PhaseServiceUnbind, "stop-receiving") { () =>
Future(killStream.shutdown()).map(_ => Done)
}
cs.addTask(
CoordinatedShutdown.PhaseServiceRequestsDone,
"wait-processing-complete"
) { () =>
done
}
latch.await()
private val completed = cs.run(CoordinatedShutdown.JvmExitReason, None)
Await.ready(completed, 5.seconds)
}
See how I replaced the sleep
with throttle
on the stream and I replaced the sleep(720)
outside the stream with a countdown latch. This implementation still may allow more than 8 elements across the stream since there’s still a race between buffered messages and the completion triggered by kill-switch. In my machine I get something like:
SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
SLF4J: See also http://www.slf4j.org/codes.html#replay
[2020-11-30 11:06:51,961] [INFO] [akka.event.slf4j.Slf4jLogger] [sks-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
1
[2020-11-30 11:06:52,042] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-3] [] - Start task 1
[2020-11-30 11:06:52,042] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-3] [] - End task 1
2
[2020-11-30 11:06:53,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 2
[2020-11-30 11:06:53,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 2
3
[2020-11-30 11:06:54,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 3
[2020-11-30 11:06:54,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 3
4
[2020-11-30 11:06:55,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 4
[2020-11-30 11:06:55,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 4
5
[2020-11-30 11:06:56,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 5
[2020-11-30 11:06:56,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 5
6
[2020-11-30 11:06:57,060] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 6
[2020-11-30 11:06:57,060] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 6
7
[2020-11-30 11:06:58,061] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 7
[2020-11-30 11:06:58,061] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 7
8
[2020-11-30 11:06:59,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 8
[2020-11-30 11:06:59,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 8
9
[2020-11-30 11:07:00,063] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 9
[2020-11-30 11:07:00,063] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 9
[success] Total time: 9 s, completed Nov 30, 2020 11:07:00 AM
Which is one element too many because of that race.
Cheers,