Hi,
I am trying to implement a synchronous rpc, following the sample in this page:
I am using Java , RabbitMQ and alpakka version akka-stream-alpakka-amqp_2.12.
I am facing some problems:
- Firstly, in the method publishAndConsumeRpc(), the api OutgoingMessage.create() doesnt exist. So I used this code, but this also doesnt work. The assert(expectNextUnordered) fails with my code. So am I reading response from the wrong replyTo queue?
String res = null;
// #run-rpc-flow
try {
res = result.first().toCompletableFuture().get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
final String routingKey = res;
Sink<OutgoingMessage, CompletionStage<Done>> amqpSink =
AmqpSink.createReplyTo(AmqpReplyToSinkSettings.create(connectionProvider));
amqpSource
.map(
b ->
OutgoingMessage.apply(b.bytes().concat(ByteString.fromString("a")),
false, false, Option.apply(b.properties()), Option.apply(routingKey)))
.runWith(amqpSink, materializer);
- Secondly, instead of using TestSink can I use a regular sink?. Basically I want to read the message returned in the “replyto” queue. But I am unable to find the right api to read the response.
I would really appreciate any help.