When I disconnected the network and continued to send messages, I found that the stream returned the result of the successful transmission, but the message was not sent to the rabbitmq service.
code:
private def initQueue() = {
val exchangeDeclaration = ExchangeDeclaration(
mqParam.attribute.exchange,
mqParam.attribute.eType.toString,
mqParam.attribute.durable,
mqParam.attribute.autoDelete,
mqParam.attribute.internal,
mqParam.attribute.props
)
val connectSettings: AmqpDetailsConnectionProvider = {
AmqpDetailsConnectionProvider(mqParam.hostAndPorts).withCredentials(
AmqpCredentials.create(mqParam.credentials.account, mqParam.credentials.password)
).withVirtualHost(mqParam.vhost)
}
val sinkSetting = AmqpSinkSettings(connectSettings).withDeclarations(exchangeDeclaration).withExchange(mqParam.attribute.exchange)
val sink = AmqpSink.simple(sinkSetting)
val newQueue = Source.queue[ByteString](mqParam.bufferSize, mqParam.overflowStrategy).toMat(sink)(Keep.left).run()
newQueue.watchCompletion() onComplete { _ => self ! QueueCompletion}
newQueue
}
private def offerData(data: ByteString, client: ActorRef, queue: SourceQueueWithComplete[ByteString]) = {
queue.offer(data) flatMap {
case QueueOfferResult.Enqueued => Future.successful(true)
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed"))
} pipeTo client
}