Hello,
I am using RabbitMQ 3.4.4 and akka-stream-alpakka-amqp" in version “0.18”.
I am experimenting and exception when using the AMQP connector.
Here is the code I use to test the connector:
import akka.stream._
import akka.stream.scaladsl._
import akka.{ NotUsed, Done }
import akka.actor.ActorSystem
import akka.util.ByteString
import scala.concurrent._
import scala.concurrent.duration._
import java.nio.file.Paths
import akka.stream.alpakka.amqp._
import akka.stream.alpakka.amqp.scaladsl._
import scala.util._
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
import scala.concurrent.ExecutionContext.Implicits.global
val sourceInt = Source(1 to 100).map(s => ByteString(s))
val connProvider = AmqpUriConnectionProvider("amqp://pipeline:abc123g@my-host:5672")
val queueName = "pipe.parser.input"
val queueDeclaration = QueueDeclaration(queueName,
durable = true,
arguments = Map("x-max-length" -> "100000"))
val amqpSink = AmqpSink.simple(
AmqpSinkSettings(connProvider)
.withRoutingKey(queueName)
.withDeclarations(queueDeclaration))
val fg = sourceInt.runWith(amqpSink)
fg.onComplete {
case Failure(e) => e.printStackTrace()
case Success(r) => println(r)
}
And I receive this exception:
scala> java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:947)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
at akka.stream.alpakka.amqp.AmqpConnectorLogic.$anonfun$preStart$2(AmqpConnector.scala:47)
at scala.collection.immutable.List.foreach(List.scala:389)
at akka.stream.alpakka.amqp.AmqpConnectorLogic.preStart(AmqpConnector.scala:40)
at akka.stream.alpakka.amqp.AmqpConnectorLogic.preStart$(AmqpConnector.scala:24)
at akka.stream.alpakka.amqp.AmqpSinkStage$$anon$1.preStart(AmqpSinkStage.scala:58)
at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:295)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:554)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:676)
at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:724)
at akka.actor.Actor.aroundPreStart(Actor.scala:528)
at akka.actor.Actor.aroundPreStart$(Actor.scala:528)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:667)
at akka.actor.ActorCell.create(ActorCell.scala:654)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:525)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-length' for queue 'pipe.parser.input' in vhost '/': received '100000' but current is '100000', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
... 22 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-length' for queue 'pipe.parser.input' in vhost '/': received '100000' but current is '100000', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:504)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:643)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581)
... 1 more
As you can see in the exception I am giving the exact same value as the configuration is waiting for.
When not giving the parameter of the configuration x-max-length it complains it needs it as a long.
The problem is that the parameter map of the QueueDeclaration is Map[String,AnyRef] wich makes it difficult to give it that long it request.
Thanks for helping if there is any solution (outside recreating the queue without the x-max-length parameter wich I check would work but is not accecptable in my context).
Note: this was originally an issue (https://github.com/akka/alpakka/issues/894) but I have been advised to put it as a question here.
Many thanks for helping.