java.lang.IllegalStateException Sink.asPublisher(fanout = false) only supports one subscriber

We sometimes encounter the error randomly under load.
With our Play Framework REST API, we have mainly blocking db IO.
The majority of the requests come through a custom body parser

private val streamParser: BodyParser[AkkaStreams.BinaryStream] =
  BodyParser { _ =>
    Accumulator.source[ByteString].map(Right.apply)
  }

We tried a custom dispatcher as advised in the case of highly blocking IO such as

  actor {
    default-dispatcher {
      executor = "thread-pool-executor"
      throughput = 1
      thread-pool-executor {
        fixed-pool-size = 62 # db conn pool (50) + number of cores (11) + housekeeping (1)
      }
    }
  }

Not sure on how to estimate the db connection pool atm but such a high value still leads to the same random error.
Would you have any idea on how to avoid / size it?

after installing akka-diagnostics and using a custom well sized dispatcher, the thread starvation warnings disappeared but the sink.asPublisher error still remains.
Could it be related to the cluster architecture activated even with only a single node?

The error means that something creates a Sink using asPublisher(fanout = false) and then tries to subscribe more than once to a publisher returned by materializing that sink, so I’d rather expect a logic error than starvation causing it. Maybe there is a retry somewhere in your logic trying to re-use the publisher or something along those lines.

Edit: or somewhere inside Play framework

1 Like

@johanandren starvation can cause that (see, for instance, FanoutPublisherSink fails with unhelpful error message when default subscription timeout triggers · Issue #28926 · akka/akka · GitHub): a subscriber which immediately cancels is automatically attached after the subscription timeout, so a subscription which is waiting on something else would hit this.

@BillOTei note that all that’s required (even in the absence of thread starvation) is semantic blocking: not subscribing until something else happens (e.g. a DB query returns). If that something happens later than the subscription timeout (e.g. because of DB load), this is inevitable. In that case, you’re either looking at not delaying the subscription until then or accepting it and perhaps modifying the akka.stream.materializer.subscription-timeout.timeout setting (default is 5 seconds) to something longer.

I’ll leave commentary on what Play might or might not be doing for others: that may or may not make not delaying the subscription impractical (if it’s buried deep enough in Play).

1 Like

yes, its Accumulator.source[ByteString].map(Right.apply) creating the Source on each request with a body I believe but shouldn’t that be freed at the end?

thanks. The thing is that the only piece of code related to the Sink.asPublisher API is the custom Play body parser I wrote. Besides parsing all the requests there’s not much happening or accessable I believe.