MQTT only publishers


My use-case only uses IoT publishers. So I started thinkering, and ended up with the decision that I will embed the MQTT-server to the backend, using the MQTT streaming connector from alpakka.

Bcs I don’t need any subscriber handling, I’m curious how bad is this idea?

class MqttServer(
    implicit actorSystem: ActorSystem,
    ec: ExecutionContext
) {
  val port              = 1883
  val maxConnections    = 500
  val commandBufferSize = 3

  val settings = MqttSessionSettings()
  val session  = ActorMqttServerSession(settings)

  private def handler(s: Either[MqttCodec.DecodeError, Event[Nothing]]): (List[Command[Nothing]], List[Publish]) = {
    s match {
      case Right(Event(c: Connect, _)) =>
        (Command(ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionAccepted)) :: Nil, Nil)
      case Right(Event(cp: Subscribe, _)) =>
        (Command(SubAck(cp.packetId,, None, None) :: Nil, Nil)
      case Right(Event(publish@Publish(flags, _, Some(packetId), _), _))
        if flags.contains(ControlPacketFlags.RETAIN) =>
        //println(s"pub1: ${publish.topicName} - ${publish.payload.utf8String} ")
        (Command(PubAck(packetId)) :: Nil, publish :: Nil)
      case Right(Event(publish@Publish(flags, _, _, _), _)) =>
        //println(s"pub2: ${publish.topicName} - ${publish.payload.utf8String} ")
        (Nil, publish :: Nil)
      case _ =>
        (Nil, Nil)

  private val bindSource: Source[Publish, Future[Tcp.ServerBinding]] =
      .bind("", port)
        maxConnections, { connection =>
                 |-> mqttFlow ---> handler ----> pub
                 |                  |
                 |------- Command --

          GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
            import GraphDSL.Implicits._

            val f = builder.add{
              val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
                  .serverSessionFlow(session, ByteString(connection.remoteAddress.getAddress.getAddress))

            val unzip = builder.add(Unzip[List[Command[Nothing]], List[Publish]]())
            val mcc0 = builder.add(Flow[List[Command[Nothing]]].mapConcat(identity))
            val mcc1 = builder.add(Flow[List[Publish]].mapConcat(identity))
            val buffer = builder.add(Flow[Command[Nothing]].buffer(commandBufferSize, OverflowStrategy.dropHead))

            f.out ~>
            unzip.out0 ~> mcc0 ~> buffer ~> f
            unzip.out1 ~> mcc1


  def start() = {
      .to(Sink.ignore) //todo published-msg processing sink

Also; is there any authentication and TLS examples? I saw that the connect can contain a user/pass, and I probably can join a TLS bidi to the connection.flow, but tested examples would be nice in the docs too. (My next step will be checking the alpakka tests ofc, but still would be nice to offer some examples at least like links to the tests.)