FlowWithContext - Not printing the Sink response

Hi,

I built streaming using source and sink as Rabbitmq.Using FlowWithContext to carry the context for each element. One message is in sourceQ, the stream processing and post to destinationQ. The flow is working, but when i try to get the result, its not printing anything on the result part. Please help

 val myamqpSource: SourceWithContext[Notification, CommittableReadResult, NotUsed] = {
    AmqpSource.committableSource(
      NamedQueueSourceSettings(connectionProvider, sourceQueueName)
        .withDeclaration(queueDeclaration)
        .withAckRequired(true),
      bufferSize = 10
    ).asSourceWithContext[CommittableReadResult]((x: CommittableReadResult) => x)
      .map(readResult => read[Notification](readResult.message.bytes.utf8String))
  }

val myamqpFlow: FlowWithContext[myIn, myCtxIn, myOut, myCtxOut, NotUsed] =
    FlowWithContext[myIn, myCtxIn]
      .map(notificationObj => {
        println(" 1 Notification Object" + notificationObj)
        val meCol: UserRepository = new UserRepository(Mongo.userCollection)
        val meObj = meCol.findById(notificationObj.instanceName)
        println(" 2 User Object" + meObj)
        WriteMessage(bytes = ByteString("abc"),
          immediate = false,
          mandatory = false)
      })
      .mapContext(myCtxIn => {myCtxIn.message})

val myamqpFlow2: FlowWithContext[myIn1, myCtxIn1, WriteResult, CtxOut, Future[Done]] =
    AmqpFlowWithContext.withConfirm(alarmSettings)
      .map(abc => abc)
      .mapContext(myCtxIn => {
        myCtxIn.bytes
      })

val mysink2 = Sink.seq[(WriteResult, CtxOut)]

val grap = myamqpSource.via(myamqpFlow).via(myamqpFlow2).toMat(mysink2)(Keep.right)
val res = grap.run()

**res.onComplete {**
**    case Success(posts) => println(" Results Tuplet are  -->" + res)**
**    case Failure(t) => println("An error has occurred: " + t.getMessage)**
**  }**

Why the res block is not printing anything. Is it due to unbounded stream

Yes, looks like your stream is unbounded, so it will stay alive indefinitely waiting for new elements from AMQP. If you want it to end at some point you will have to introduce an operator that stops it at some point (for example take, takeWhile or idleTimeout).