I am receiving frequent updates and I want to forward the latest update each one second (not used updates are thrown away). I wrote test, it works on 50%, because it does not sends latest update and instead of it 1 second old update.
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.testkit.{ImplicitSender, TestKit}
import org.joda.time.DateTime
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
import scala.concurrent.duration._
class ThrottlingTest
extends TestKit(ActorSystem("ThrottlingTest"))
with ImplicitSender
with FunSuiteLike
with BeforeAndAfterAll {
private case class Wrapper(timestamp: DateTime, payload: Int)
private implicit val materializer: ActorMaterializer = ActorMaterializer()
override def afterAll: Unit = {
TestKit.shutdownActorSystem(system)
}
private var id = 1
private def create() = {
val res = Wrapper(DateTime.now(), id)
id += 1
res
}
test("throttling test") {
val queue = Source
.queue[Wrapper](1, OverflowStrategy.dropHead)
.throttle(1, 1.second)
.toMat(
Sink
.foreach { o =>
val now = DateTime.now()
val diff = now.getMillis - o.timestamp.getMillis
println(s"[$now] $o diff: $diff ms")
})(Keep.left)
.run()
println(s"start ${DateTime.now()}")
Range.apply(1, 1000).foreach { _ =>
queue.offer(create())
Thread.sleep(100)
}
}
}
Output looks like:
[2019-03-13T12:26:39.988+01:00] Wrapper(2019-03-13T12:26:38.895+01:00,19) diff: 1093 ms
[2019-03-13T12:26:40.989+01:00] Wrapper(2019-03-13T12:26:39.932+01:00,29) diff: 1057 ms
[2019-03-13T12:26:41.977+01:00] Wrapper(2019-03-13T12:26:40.968+01:00,39) diff: 1009 ms
[2019-03-13T12:26:42.988+01:00] Wrapper(2019-03-13T12:26:41.897+01:00,48) diff: 1091 ms
[2019-03-13T12:26:43.988+01:00] Wrapper(2019-03-13T12:26:42.937+01:00,58) diff: 1051 ms
Problem is that my code does not forward/send latest update. What I need is to see diff around 100ms. Any idea. Thank you.