I’m trying to use alpakka-mqtt to subscribe to a mqtt topic, poll a lagom service, convert event and post to another mqtt topic. My problem is that alpakka is only connecting to the mqtt broker but not subscribing to the defined topic. I hope someone can drop me a hint what i’m doing wrong.
My Code is: (it’s a lagom service)
private FileRequestHandler(Config cfg, ActorSystem sys) {
final Materializer materializer = ActorMaterializer.create(sys);
final RecordSerializer recSer = new RecordSerializer();
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
mapper.registerModule(new ParameterNamesModule());
MqttConnectionSettings connectionSettings =
MqttConnectionSettings.create(
"tcp://localhost:1883",
"", //clientId is set in sink/source individually
new MemoryPersistence()
);
Sink<MqttMessage, CompletionStage<Done>> mqttSink =
MqttSink.create(connectionSettings.withClientId("m2m-drv-sta2-FileReq-sink"), MqttQoS.atLeastOnce());
Source<MqttMessage, CompletionStage<Done>> mqttSource =
MqttSource.atLeastOnce(
connectionSettings
.withClientId("m2m-drv-sta2-FileReq-source")
.withCleanSession(false),
MqttSubscriptions.create("sta2/p/+/evt/RequestFileInfo/#", MqttQoS.atLeastOnce()),
BUFFER_SIZE).map(mqttIn -> {
MqttMessage msg = mqttIn.message();
ByteBuffer buf = msg.payload().asByteBuffer();
log.info("We got the message from mqtt: Topic="+msg.topic()+" Payload="+buf.array().length);
RequestFileInfo nfo = (RequestFileInfo)recSer.fromBinary(buf.array(), "RequestFileInfo");
return new RequestFileInfoContainer(msg,nfo);
}).map(cont -> {
//TODO: poll lagom service and convert event
log.info("pushing back to mqtt");
byte[] payload = mapper.writeValueAsBytes(""/*TODO: insert converted value*/);
return MqttMessage.apply("out/FileMetadata/avro", ByteString.fromArray(payload));
});
mqttSource.runWith(mqttSink, materializer);
}
yes. I subscribed & posted to the same topic with mqtt.fx (mqtt client) and i got the message i published in the subscription, but nothing in the service. In vernemq-trace tool i can see that both source and sink are connecting to the broker, but no subscription is performed.
The implementation for consuming messages in Flow and in Source is the same. So it does not really matter which one you choose. The difference is only in the shape of the stream.
Since it is unclear why the subscription does not get through, you should try enabling debug logging. Maybe that will give some clue. Take a look at mqtt test logging configuration here and here to see how to enable verbose debug logging to a file.
I changed the clientid => now its working as expected.
Last question: If i’m using atLeastOnce i need to ack every message after it has been posted to the other topic - where do have to call msg.ack() to make sure no message is lost?
Yes i already saw this method - but where can i call this in the stream to ensure a transaction?
I need to call it either AFTER the the new message has been written to the sink OR the processing of an event is filtered out.
Currently i observe that new messages are delivered to the stream immediately as i post them and i do not call msg.ack()… don’t know why that is…
You can use MqttFlow.atLeastOnce which will give you a flow. The messages you send to the flow will be sent to the broker and then emitted by the flow where you will be able to continue the stream and call ack() on those messages.