Hi,
I built streaming using source and sink as Rabbitmq.Using FlowWithContext to carry the context for each element. One message is in sourceQ, the stream processing and post to destinationQ. The flow is working, but when i try to get the result, its not printing anything on the result part. Please help
val myamqpSource: SourceWithContext[Notification, CommittableReadResult, NotUsed] = {
AmqpSource.committableSource(
NamedQueueSourceSettings(connectionProvider, sourceQueueName)
.withDeclaration(queueDeclaration)
.withAckRequired(true),
bufferSize = 10
).asSourceWithContext[CommittableReadResult]((x: CommittableReadResult) => x)
.map(readResult => read[Notification](readResult.message.bytes.utf8String))
}
val myamqpFlow: FlowWithContext[myIn, myCtxIn, myOut, myCtxOut, NotUsed] =
FlowWithContext[myIn, myCtxIn]
.map(notificationObj => {
println(" 1 Notification Object" + notificationObj)
val meCol: UserRepository = new UserRepository(Mongo.userCollection)
val meObj = meCol.findById(notificationObj.instanceName)
println(" 2 User Object" + meObj)
WriteMessage(bytes = ByteString("abc"),
immediate = false,
mandatory = false)
})
.mapContext(myCtxIn => {myCtxIn.message})
val myamqpFlow2: FlowWithContext[myIn1, myCtxIn1, WriteResult, CtxOut, Future[Done]] =
AmqpFlowWithContext.withConfirm(alarmSettings)
.map(abc => abc)
.mapContext(myCtxIn => {
myCtxIn.bytes
})
val mysink2 = Sink.seq[(WriteResult, CtxOut)]
val grap = myamqpSource.via(myamqpFlow).via(myamqpFlow2).toMat(mysink2)(Keep.right)
val res = grap.run()
**res.onComplete {**
** case Success(posts) => println(" Results Tuplet are -->" + res)**
** case Failure(t) => println("An error has occurred: " + t.getMessage)**
** }**
Why the res block is not printing anything. Is it due to unbounded stream