Alpakka MQTT Streaming with credentials

I need to enable username/password based alpakka mqtt server session streaming for clients to connect with credentials.

As a client;

val connect: Connect =
  appConf.mqtt.username.flatMap(username =>
    appConf.mqtt.password.map(password =>
      Connect(clientName, ConnectFlags.CleanSession, username, password)
    )
  ).getOrElse(Connect(clientName, ConnectFlags.CleanSession))

commands.offer(Command(connect))

You simply add the creds to the Connect.

As a server you need to build a more complicated logic.

val bindSource =
    Tcp()
      .bind("0.0.0.0", port)
      .mapAsyncUnordered(maxConnections){ connection =>
         hanlerGraph(connection).run()
       }
      .runWith(Sink.ignore)

This is pretty straightforward, mostly glue function.

def hanlerGraph(connection: Tcp.IncomingConnection) = {
  val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
    Mqtt
      .serverSessionFlow(session, ByteString(connection.remoteAddress.getAddress.getAddress))
      .join(connection.flow)
  mqttFlow.map(handler).watchTermination()(Keep.right).join(Flow[Command[Nothing]])
}  

This is not so interesting either.

private def handler(s: Either[MqttCodec.DecodeError, Event[Nothing]]): Command[Nothing] = {
    s match {
      case Right(Event(c: Connect, _)) =>
        if(c.userName.nonEmpty && c.password.contains("supersecret"))
          Command(ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionAccepted))
        else
           Command(ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionRefusedBadUsernameOrPassword))
      case Right(Event(cp: Subscribe, _)) =>
        Command(SubAck(cp.packetId, cp.topicFilters.map(_._2)), None, None)
      case Right(Event(publish @ Publish(flags, _, Some(packetId), _), _))
          if flags.contains(ControlPacketFlags.RETAIN) =>
        Command(PubAck(packetId))
      case _ =>
        ???
    }
  }

This is a function where you should handle the messages. At the connection you could match/extract the u/p from the Connection message, an match it with your db, and return errors.
This function will probably need to return with a Future[List[Command[Nothing]]] and you need to modify the handlerGraph too.