Hi all
I have the following scenario:
As you can see on the image above, Actor1 sends messages to Actor2 .
In Actor1 , a stream is produced and will be send to Actor2 as the following:
import akka.NotUsed
import akka.actor.{ActorRef}
import akka.stream.scaladsl.Sink
object WsConnector {
case object Ack
case object Initialized
case object Completed
final case class Msg(value: String)
final case class Failed(ex: Throwable)
val createActorRefWithAck: ActorRef => Sink[Msg, NotUsed] = receiver =>
Sink.actorRefWithAck(
receiver,
onInitMessage = Initialized,
ackMessage = Ack,
onCompleteMessage = Completed,
onFailureMessage = Failed
)
}
object ConsumerActor {
def props(wsSink: Sink[Msg, NotUsed]): Props = Props(new ConsumerActor(wsSink))
}
final class ConsumerActor(wsSink: Sink[Msg, NotUsed]) extends Actor with ActorLogging {
log.info("Consumer actor started.")
private implicit val materializer = ActorMaterializer()
private val config = context.system.settings.config.getConfig("akka.kafka.consumer")
private val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("SAP-SENDER-GROUP")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
val (consumerControl, streamComplete) = Consumer
.plainSource(
consumerSettings,
Subscriptions.topics("SAP-EVENT-BUS"))
.map(cr => Msg(cr.value()))
.toMat(wsSink)(Keep.both)
.run()
override def receive: Receive = {
case _ => log.info("Not identified message.")
}
}
val wsSink = WsConnector.createActorRefWithAck(wsActor)
context.actorOf(ConsumerActor.props(wsSink), "consumer-actor")
On Actor2 , I would like to pack the incoming messages into stream, for example Source.single(msg)
but I do not know, if it is the right way to do it.
At the end, the Actor2 should send the ACK
message to Actor1 , to confirm, that the messages has been received from Actor2 and ready to consome further messages.
What I’ve done so far:
override def receive: Receive = {
case Initialized =>
log.info("Initialization to receive messages via stream.")
sender() ! Ack
case Completed =>
log.info("Streams completed.")
sender() ! Ack
case Msg(value) =>
log.info(value)
sender() ! Ack
case Failed(ex) =>
log.info(s"Stream failed with ${ex.getMessage}.")
}
This part:
case Msg(value) =>
log.info(value)
sender() ! Ack
I would like to wrap the value into a stream in accordance with Ack
message confirmation.
Thanks