I need to enable username/password based alpakka mqtt server session streaming for clients to connect with credentials.
As a client;
val connect: Connect =
appConf.mqtt.username.flatMap(username =>
appConf.mqtt.password.map(password =>
Connect(clientName, ConnectFlags.CleanSession, username, password)
)
).getOrElse(Connect(clientName, ConnectFlags.CleanSession))
commands.offer(Command(connect))
You simply add the creds to the Connect.
As a server you need to build a more complicated logic.
val bindSource =
Tcp()
.bind("0.0.0.0", port)
.mapAsyncUnordered(maxConnections){ connection =>
hanlerGraph(connection).run()
}
.runWith(Sink.ignore)
This is pretty straightforward, mostly glue function.
def hanlerGraph(connection: Tcp.IncomingConnection) = {
val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
Mqtt
.serverSessionFlow(session, ByteString(connection.remoteAddress.getAddress.getAddress))
.join(connection.flow)
mqttFlow.map(handler).watchTermination()(Keep.right).join(Flow[Command[Nothing]])
}
This is not so interesting either.
private def handler(s: Either[MqttCodec.DecodeError, Event[Nothing]]): Command[Nothing] = {
s match {
case Right(Event(c: Connect, _)) =>
if(c.userName.nonEmpty && c.password.contains("supersecret"))
Command(ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionAccepted))
else
Command(ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionRefusedBadUsernameOrPassword))
case Right(Event(cp: Subscribe, _)) =>
Command(SubAck(cp.packetId, cp.topicFilters.map(_._2)), None, None)
case Right(Event(publish @ Publish(flags, _, Some(packetId), _), _))
if flags.contains(ControlPacketFlags.RETAIN) =>
Command(PubAck(packetId))
case _ =>
???
}
}
This is a function where you should handle the messages. At the connection you could match/extract the u/p from the Connection
message, an match it with your db, and return errors.
This function will probably need to return with a Future[List[Command[Nothing]]]
and you need to modify the handlerGraph
too.