I am trying to setting up small example to stream MQTT messages but not successful.
import akka.Done
import akka.actor.ActorSystem
import akka.stream.OverflowStrategy
import akka.stream.alpakka.mqtt.streaming.MqttCodec.DecodeError
import akka.stream.alpakka.mqtt.streaming._
import akka.stream.alpakka.mqtt.streaming.scaladsl.{ActorMqttClientSession, Mqtt}
import akka.stream.scaladsl.{BroadcastHub, Keep, Sink, Source, Tcp}
import akka.util.ByteString
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Promise}
object Main {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("MqttClient")
val host = "localhost"
val port = 1883
val connect = Connect("some-client-id", ConnectFlags.None)
val subscribe = Subscribe("measurements")
val pubAck = PubAck(PacketId(1))
val settings = MqttSessionSettings()
val clientSession = ActorMqttClientSession(settings)
val (client, clientSource) = Source
.queue[Command[Nothing]](2, OverflowStrategy.backpressure)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
val subscribed = Promise[Done]()
Source
.fromGraph(clientSource)
.via(
Mqtt
.clientSessionFlow(clientSession, ByteString("1"))
.join(Tcp().outgoingConnection(host, port))
)
.wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
case Right(Event(conAck: ConnAck, carry: Option[_])) =>
println(s"--> Client Connected $conAck and $carry")
subscribed.success(Done)
case Right(Event(sub: SubAck, carry: Option[_])) =>
println(s"--> Client Subscribed $sub and carry: $carry")
subscribed.success(Done)
case Right(Event(p: Publish, carry: Option[_])) =>
println(s"--> Client published $p and carry: $carry")
client.offer(Command(pubAck.copy(packetId = p.packetId.get)))
case Right(event: Event[_]) => println(s"unknown event $event")
case Left(error) => println(s"failure $error")
})
.runWith(Sink.ignore)
client.offer(Command(connect))
client.offer(Command(subscribe))
Await.ready(subscribed.future, 3.seconds)
}
}
Response:
--> Client Connected ConnAck(ConnAckFlags(1),ConnAckReturnCode(0)) and None
--> Client Subscribed SubAck(PacketId(1),Vector(ControlPacketFlags(1))) and carry: None
Command:
mosquitto_pub -d -h localhost -p 1883 -u 7f9a441e-11f9-4e87-9b6a-1bfa2ce67329 -t 'measurements' -m '[{"name": "measurement1", "value": 10000}]'
I am unable to receive messages.
Please help me to spot anything I am missing!