gRPC Stream Cancellation, under what conditions?

Actually this seems to solve the issue. I am still using the MergeHub here but I am no longer getting the “MergeHub shutting down” errors either.

    (in.watchTermination() {
      case (_, terminationFuture) =>
        terminationFuture.map(_ => Received())
    })
      .toMat(sink)(Keep.left)
      .run()

Thank you for pointing that out, we’ve been stumped by this issue for several months! (yes, since last year). The lesson learned here is that sending a gRPC reply right away effectively shuts down the stream before the MergeHub can drain it.

1 Like