So I’m just trying to follow the MQTT Streaming documentation ( https://doc.akka.io/docs/alpakka/current/mqtt-streaming.html ) with akka-stream-alpakka-mqtt-streaming version 1.1.0 to create a connection to the public MQTT broker iot.eclipse.org.
object Test2 extends App {
implicit val system = ActorSystem("test2")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val topic = "/test2/"
val clientId = UUID.randomUUID().toString
val settings = MqttSessionSettings()
val session = ActorMqttClientSession(settings)
val connection = Tcp().outgoingConnection("iot.eclipse.org", 1883)
val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
Mqtt
.clientSessionFlow(session, ByteString("1"))
.join(connection)
val (commands: SourceQueueWithComplete[Command[Nothing]], events: Future[Publish]) =
Source
.queue(2, OverflowStrategy.fail)
.via(mqttFlow)
.collect {
case Right(Event(p: Publish, _)) => p
}
.toMat(Sink.head)(Keep.both)
.run()
commands.offer(Command(Connect(clientId, ConnectFlags.CleanSession)))
commands.offer(Command(Subscribe(topic)))
session ! Command(
Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi"))
)
// for shutting down properly
commands.complete()
commands.watchCompletion().foreach(_ => session.shutdown())
}
I’ve enabled log level “DEBUG” and got the following output:
[DEBUG] [07/25/2019 15:35:30.440] [main] [EventStream] StandardOutLogger started
[DEBUG] [07/25/2019 15:35:30.834] [main] [EventStream(akka://test2)] logger log1-Logging$DefaultLogger started
[DEBUG] [07/25/2019 15:35:30.834] [main] [EventStream(akka://test2)] logger log1-Logging$DefaultLogger started
[DEBUG] [07/25/2019 15:35:30.838] [main] [EventStream(akka://test2)] Default Loggers started
[DEBUG] [07/25/2019 15:35:30.838] [main] [EventStream(akka://test2)] Default Loggers started
[DEBUG] [07/25/2019 15:35:31.359] [test2-akka.actor.default-dispatcher-4] [akka://test2/system/IO-TCP/selectors/$a/0] Resolving iot.eclipse.org before connecting
[DEBUG] [07/25/2019 15:35:31.385] [test2-akka.actor.default-dispatcher-2] [akka://test2/system/IO-DNS] Resolution request for iot.eclipse.org from Actor[akka://test2/system/IO-TCP/selectors/$a/0#-1318290068]
[DEBUG] [07/25/2019 15:35:31.403] [test2-akka.actor.default-dispatcher-2] [akka://test2/system/IO-TCP/selectors/$a/0] Attempting connection to [iot.eclipse.org/198.41.30.241:1883]
[DEBUG] [07/25/2019 15:35:31.678] [test2-akka.actor.default-dispatcher-2] [akka://test2/system/IO-TCP/selectors/$a/0] Connection established to [iot.eclipse.org:1883]
[DEBUG] [07/25/2019 15:35:31.755] [test2-akka.actor.default-dispatcher-3] [akka://test2/user/client-connector-0] Start timer [receive-connack] with generation [1]
[DEBUG] [07/25/2019 15:35:31.757] [test2-akka.actor.default-dispatcher-3] [akka://test2/user/client-connector-0] Cancel all timers
[INFO] [07/25/2019 15:35:31.760] [test2-akka.actor.default-dispatcher-4] [akka://test2/user/client-connector-0] Message [akka.stream.alpakka.mqtt.streaming.impl.ClientConnector$SubscribeReceivedLocally] without sender to Actor[akka://test2/user/client-connector-0#-439865709] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test2/user/client-connector-0#-439865709]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [07/25/2019 15:35:31.760] [test2-akka.actor.default-dispatcher-4] [akka://test2/user/client-connector-0] Message [akka.stream.alpakka.mqtt.streaming.impl.ClientConnector$QueueOfferCompleted] without sender to Actor[akka://test2/user/client-connector-0#-439865709] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test2/user/client-connector-0#-439865709]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [07/25/2019 15:35:31.760] [test2-akka.actor.default-dispatcher-4] [akka://test2/user/client-connector-0] Message [akka.stream.alpakka.mqtt.streaming.impl.ClientConnector$ConnectionLost] without sender to Actor[akka://test2/user/client-connector-0#-439865709] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test2/user/client-connector-0#-439865709]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[DEBUG] [07/25/2019 15:35:31.765] [test2-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://test2/system/StreamSupervisor-0)] [client-events] Upstream failed, cause: WatchedActorTerminatedException: Actor watched by [Watch] has terminated! Was: Actor[akka://test2/user/client-connector-0#-439865709]
[DEBUG] [07/25/2019 15:35:31.766] [test2-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://test2/system/StreamSupervisor-0)] [client-commandFlow] Downstream finished.
what aim I missing?