Hi,
I’ve been seeing this error akka.stream.impl.SubscriptionTimeoutException: Substream Source has not been materialized in
when (I think) using splitWhen
in a pretty heavily loaded machine. I haven’t been able to reproduce it in a test. This makes me think that we aren’t leaking substreams, but they are just not being created in time.
The splitWhen
implementation doesn’t honour the StreamSubscriptionTimeoutTerminationMode
setting - only the timeout, so we can’t just turn the error into a warning.
To reproduce, I’ve tried placing sleeps in the stages of the substream that I control - e.g. the via
and map
steps. This hasn’t caused the error to occur.
The bit of code looks like:
private def fileProcessing(): Flow[Path, LoaderOutcome, NotUsed] = Flow[Path]
.via(startUnitOfWork)
.flatMapConcat { file =>
val documents = ArchiveStreamSource(() => new ArchiveInputStream(new FileInputStream(file.toFile)))
.filter(data => data.entry.name.lastOption.exists(name => name.endsWith(".xml")))
.splitWhen(_.isNewEntry)
.map(_.data).alsoTo(ping(100))
.via(documentsSplitter)
.via(documentProcessing).alsoTo(pingAndLog())
.concatSubstreams
.alsoTo(archiveHandler(file))
.recover {
case NonFatal(t) =>
Errored
}
documents
}
Does the timeout occur when the first element is not pulled through the split
stage within the timeout? I’m hoping that there is an issue with this stream and we don’t need to just increase the timeout to a totally safe value.
Thanks,
-Dan