Before filing, let me give here a brief of what I’m doing and if there’s something wrong then you can identify. Otherwise will go and file it to the repo.
For the following, which is from MQTT • Alpakka Documentation
<properties>
<akka.version>2.6.14</akka.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-stream-alpakka-mqtt_${scala.binary.version}</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
I create Actor System something like that
final ActorSystem mqttSubscriberClient1 = ActorSystem.create(“MqttSubscriberClient1”);
Use this system within the following and nothing seems to break
MqttSessionSettings settings = MqttSessionSettings.create();
MqttClientSession session = ActorMqttClientSession.create(settings, system);
Flow<ByteString, ByteString, CompletionStage<Tcp.OutgoingConnection>> connection =
Tcp.get(system).outgoingConnection(host, port);
Flow<Command<Object>, DecodeErrorOrEvent<Object>, NotUsed> mqttFlow =
Mqtt.clientSessionFlow(session, ByteString.fromString(connectionId)).join(connection);
SourceQueueWithComplete<Command<Object>> commands =
Source.<Command<Object>>queue(20, OverflowStrategy.fail(), 20)
.via(mqttFlow)
.collect(
new JavaPartialFunction<DecodeErrorOrEvent<Object>, Publish>() {
@Override
public Publish apply(DecodeErrorOrEvent<Object> x, boolean isCheck) {
if (x.getEvent().isPresent() && x.getEvent().get().event() instanceof Publish)
return (Publish) x.getEvent().get().event();
else throw noMatch();
}
})
.wireTap(event -> {
System.out.println("--------------------------------------------------------------------------------");
System.out.println("Client: " + connectionId + " received payload: " + event.payload().utf8String());
// System.out.println("Client: " + connectionId + " received topic name: " + event.topicName());
// System.out.println("Client: " + connectionId + " received packet id: " + event.packetId().toString());
// System.out.println("Client: " + connectionId + " received flags: " + event.flags());
System.out.println("--------------------------------------------------------------------------------");
})
.toMat(Sink.ignore(), Keep.left())
.run(system);
However, when I upgrade the versions like
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>maxio-mqtt-streaming</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<akka.version>2.6.17</akka.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-stream-alpakka-mqtt-streaming_${scala.binary.version}</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor-typed_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
</project>
There seems like no compilation error, but running the main class throws error at this same line
final ActorSystem mqttSubscriberClient1 = ActorSystem.create(“MqttSubscriberClient1”);
When I converted the above using typed.ActorSystem, i.e.
final ActorSystem mqttSubscriberClient1 = ActorSystem.create(Behaviors.empty(), "MqttSubscriberClient1");
I still got the same error which is follows.
LF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" java.lang.IllegalStateException: You are using version 2.6.17 of Akka, but it appears you (perhaps indirectly) also depend on older versions of related artifacts. You can solve this by adding an explicit dependency on version 2.6.17 of the [akka-stream-typed] artifacts to your project. Here's a complete collection of detected artifacts: (2.6.14, [akka-stream-typed]), (2.6.17, [akka-actor, akka-actor-typed, akka-protobuf-v3, akka-slf4j, akka-stream]). See also: https://doc.akka.io/docs/akka/current/common/binary-compatibility-rules.html#mixed-versioning-is-not-allowed
at akka.util.ManifestInfo.checkSameVersion(ManifestInfo.scala:184)
at akka.util.ManifestInfo.checkSameVersion(ManifestInfo.scala:162)
at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:1033)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:1022)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:1022)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:1045)
at akka.actor.typed.ActorSystem$.createInternal(ActorSystem.scala:290)
at akka.actor.typed.ActorSystem$.apply(ActorSystem.scala:198)
at akka.actor.typed.ActorSystem$.create(ActorSystem.scala:235)
at akka.actor.typed.ActorSystem.create(ActorSystem.scala)
at SubscribersMain1.main(SubscribersMain1.java:7)
Please let me know if I’m missing anything.
Thanks,
Ahad