We have following stream source:
val (actorRef, stream) = Source.actorRef[(Event, Promise[Done])](1024, OverflowStrategy.dropHead).preMaterialize()
As you can notice above the actor message queue size is 1024 and overflow strategy is DropHead.
So, in the scenario when 1025th Event
is inserted in actor message queue it gets dropped, but the promise associated with that Event is left orphaned in whole stream processing. This promise is never completed and the associated future never completes. We need a way to get the handle of dropped promises so that we can complete them with appropriate failure.
A Potential solution is to introduce a method in Source
similar to preMaterialize()
, for e.g. droppedMessages()
which return a tuple of original stream Source[(Event, Promise[Done]), NotUsed]
and a new stream, again of same type Source[(Event, Promise[Done]), NotUsed]
. For e.g.
val (stream, droppedEvents) = Source.actorRef[(Event, Promise[Done])](1024, OverflowStrategy.dropHead).droppedMessages()
Please note that our usecase is to perform an action (i.e. completion on Promise
), on a message just before it gets dropped and we are totally fine to discard them after action.