Looking at the implementation and running some simple tests, SourceQueueWithComplete.complete()
does not “complete the stream normally” as written in the ScalaDoc.
Calling SourceQueueWithComplete.offer()
after calling SourceQueueWithComplete.complete()
can still succeed, i.e. result in a OfferQueueResult.Enqueued
. All you need to do is use bufferSize > 1
and call SourceQueueWithComplete.offer()
continuously, i.e. prevent the buffer from becoming empty. This will prevent the stream from being completed.
Is this really the desired behavior? For my usecase I would like the stream to “really” complete, i.e. calling offer()
should always result in OfferQueueResult.Dropped
.