Alpakka MQTT Streaming: dead letters

So I’m just trying to follow the MQTT Streaming documentation ( https://doc.akka.io/docs/alpakka/current/mqtt-streaming.html ) with akka-stream-alpakka-mqtt-streaming version 1.1.0 to create a connection to the public MQTT broker iot.eclipse.org.

object Test2 extends App {
  implicit val system = ActorSystem("test2")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  val topic = "/test2/"
  val clientId = UUID.randomUUID().toString
  val settings = MqttSessionSettings()
  val session = ActorMqttClientSession(settings)

  val connection = Tcp().outgoingConnection("iot.eclipse.org", 1883)

  val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
    Mqtt
      .clientSessionFlow(session, ByteString("1"))
      .join(connection)


  val (commands: SourceQueueWithComplete[Command[Nothing]], events: Future[Publish]) =
    Source
      .queue(2, OverflowStrategy.fail)
      .via(mqttFlow)
      .collect {
        case Right(Event(p: Publish, _)) => p
      }
      .toMat(Sink.head)(Keep.both)
      .run()

  commands.offer(Command(Connect(clientId, ConnectFlags.CleanSession)))
  commands.offer(Command(Subscribe(topic)))
  session ! Command(
    Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi"))
  )

  // for shutting down properly
  commands.complete()
  commands.watchCompletion().foreach(_ => session.shutdown())

}

I’ve enabled log level “DEBUG” and got the following output:

[DEBUG] [07/25/2019 15:35:30.440] [main] [EventStream] StandardOutLogger started
[DEBUG] [07/25/2019 15:35:30.834] [main] [EventStream(akka://test2)] logger log1-Logging$DefaultLogger started
[DEBUG] [07/25/2019 15:35:30.834] [main] [EventStream(akka://test2)] logger log1-Logging$DefaultLogger started
[DEBUG] [07/25/2019 15:35:30.838] [main] [EventStream(akka://test2)] Default Loggers started
[DEBUG] [07/25/2019 15:35:30.838] [main] [EventStream(akka://test2)] Default Loggers started
[DEBUG] [07/25/2019 15:35:31.359] [test2-akka.actor.default-dispatcher-4] [akka://test2/system/IO-TCP/selectors/$a/0] Resolving iot.eclipse.org before connecting
[DEBUG] [07/25/2019 15:35:31.385] [test2-akka.actor.default-dispatcher-2] [akka://test2/system/IO-DNS] Resolution request for iot.eclipse.org from Actor[akka://test2/system/IO-TCP/selectors/$a/0#-1318290068]
[DEBUG] [07/25/2019 15:35:31.403] [test2-akka.actor.default-dispatcher-2] [akka://test2/system/IO-TCP/selectors/$a/0] Attempting connection to [iot.eclipse.org/198.41.30.241:1883]
[DEBUG] [07/25/2019 15:35:31.678] [test2-akka.actor.default-dispatcher-2] [akka://test2/system/IO-TCP/selectors/$a/0] Connection established to [iot.eclipse.org:1883]
[DEBUG] [07/25/2019 15:35:31.755] [test2-akka.actor.default-dispatcher-3] [akka://test2/user/client-connector-0] Start timer [receive-connack] with generation [1]
[DEBUG] [07/25/2019 15:35:31.757] [test2-akka.actor.default-dispatcher-3] [akka://test2/user/client-connector-0] Cancel all timers
[INFO] [07/25/2019 15:35:31.760] [test2-akka.actor.default-dispatcher-4] [akka://test2/user/client-connector-0] Message [akka.stream.alpakka.mqtt.streaming.impl.ClientConnector$SubscribeReceivedLocally] without sender to Actor[akka://test2/user/client-connector-0#-439865709] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test2/user/client-connector-0#-439865709]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [07/25/2019 15:35:31.760] [test2-akka.actor.default-dispatcher-4] [akka://test2/user/client-connector-0] Message [akka.stream.alpakka.mqtt.streaming.impl.ClientConnector$QueueOfferCompleted] without sender to Actor[akka://test2/user/client-connector-0#-439865709] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test2/user/client-connector-0#-439865709]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [07/25/2019 15:35:31.760] [test2-akka.actor.default-dispatcher-4] [akka://test2/user/client-connector-0] Message [akka.stream.alpakka.mqtt.streaming.impl.ClientConnector$ConnectionLost] without sender to Actor[akka://test2/user/client-connector-0#-439865709] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test2/user/client-connector-0#-439865709]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[DEBUG] [07/25/2019 15:35:31.765] [test2-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://test2/system/StreamSupervisor-0)] [client-events] Upstream failed, cause: WatchedActorTerminatedException: Actor watched by [Watch] has terminated! Was: Actor[akka://test2/user/client-connector-0#-439865709]
[DEBUG] [07/25/2019 15:35:31.766] [test2-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://test2/system/StreamSupervisor-0)] [client-commandFlow] Downstream finished.

what aim I missing?

Solved the issue by adding a Thread.sleep(10000) before commands.complete().

You should avoid using thread sleeps. They will bite you one day.

I suspect the problem here is that you’re not handling acknowledgements i.e. you’re not waiting for the server to acknowledge the receipt of your commands before continuing. This doc PR may help: https://github.com/akka/alpakka/pull/1908