I have a simple implementation using fs2 where I stream at regular intervals a message that I want to send to my Actor instance, but somehow it seems not to receive the message at all.
Here is my code:
import cats.effect._
import cats.effect.implicits._
import cats.effect.unsafe.implicits._
import fs2._
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.TimeUnit
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
object StreamUtils {
def getATaskAsStream(pingInterval: FiniteDuration)(callback: Int => Unit): Stream[IO, Int] = {
val source = Stream.awakeEvery[IO](pingInterval).map(_ => 0)
val sink = source.evalMap(value => IO(callback(value)))
source.concurrently(sink)
}
}
class HelloActor extends Actor {
val io = StreamUtils.getATaskAsStream(FiniteDuration(500, TimeUnit.MILLISECONDS))(x => {
println(s"In the Akka actor..... Sending message to $self $x")
self ! x
}).compile.toList
io.unsafeRunSync()
def receive = {
case "hello" => println("hello back at you")
case x: Int => println(s"Received from stream $x")
case _ => println("Received some shit....")
}
}
object Main {
def main(args: Array[String]): Unit = {
val system = ActorSystem("HelloSystem")
// default Actor constructor
val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
helloActor ! "hello"
println("Starting....")
println("Done....")
}
}
Here is a running version in Scastie: Scastie - An interactive playground for Scala.