Substream Source has not been materialized in ... with splitWhen

Hi,

I’ve been seeing this error akka.stream.impl.SubscriptionTimeoutException: Substream Source has not been materialized in when (I think) using splitWhen in a pretty heavily loaded machine. I haven’t been able to reproduce it in a test. This makes me think that we aren’t leaking substreams, but they are just not being created in time.

The splitWhen implementation doesn’t honour the StreamSubscriptionTimeoutTerminationMode setting - only the timeout, so we can’t just turn the error into a warning.

To reproduce, I’ve tried placing sleeps in the stages of the substream that I control - e.g. the via and map steps. This hasn’t caused the error to occur.

The bit of code looks like:

  private def fileProcessing(): Flow[Path, LoaderOutcome, NotUsed] = Flow[Path]
    .via(startUnitOfWork)
    .flatMapConcat { file =>
      val documents = ArchiveStreamSource(() => new ArchiveInputStream(new FileInputStream(file.toFile)))
        .filter(data => data.entry.name.lastOption.exists(name => name.endsWith(".xml")))
        .splitWhen(_.isNewEntry)
        .map(_.data).alsoTo(ping(100))
        .via(documentsSplitter)
        .via(documentProcessing).alsoTo(pingAndLog())
        .concatSubstreams
        .alsoTo(archiveHandler(file))
        .recover {
          case NonFatal(t) =>
            Errored
        }

      documents
    }

Does the timeout occur when the first element is not pulled through the split stage within the timeout? I’m hoping that there is an issue with this stream and we don’t need to just increase the timeout to a totally safe value.

Thanks,
-Dan

The scenario where it fails is when the predicate passed to splitWhen returned true, so that a new Source is created and sent downstream, but it never gets materialized run downstream, for example if you forgot to run the source, it got stuck in a buffer or if there is some logic accidentally blocking and starving the stream/dispatcher so that it never gets to run.

I’m not sure if it would make sense to adhere to the TerminationMode here, maybe. But it would also make it quite easy to not detect when you have a leak. Please open up an issue if you feel confident that it would be a good idea.

It should adhere to the akka.stream.materializer.subscription-timeout.timeout though, default is 5 seconds, maybe you could tune that upwards in your system if you can’t find anything that seems fishy in the stream logic?