Lack of mapAsyncUnordered for SourceWithContext

Hello,

I’m wondering why SourceWithContext don’t support mapAsyncUnordered: would it be hard to add as it already support mapAsync?
At first glance it looks that under the hood it would just get context element before executing desired function and then map result to add context element, pseudocode:

(streamElement, context)(asyncFunction) => asyncFunction.apply(streamElement).map((_, context))

It would be really helpful as automatic context propagation reduce boilerplate a lot and not everywhere ordered async processing is required.

Thanks for any response in advance!

Regards

The primary and motivating use case for SourceWithContext is propagating a consumer offset (e.g. Kafka). In that use case, reordering is fatal (e.g. it effectively means that instead of at-most-once or at-least-once, the guarantee is now at-least-zero-times, which is the single most useless guarantee you can have). Given that motivation, one can see why the operators on SourceWithContext preserve order.

That said, there is a set of operators for which context propagation is straightforward but ordering isn’t guaranteeable, like mapAsyncUnordered. Perhaps it might be reasonable to have a reordering SourceWithContext that’s opt-in, perhaps with an API like (in Scala):

sourceWithContext
  .reordering
  .mapAsyncUnordered(parallelism)(asyncFunction)
2 Likes

Note that this restriction ( no reordering a SourceWithContext based stream) is documented under Context Propagation (for the exact reason Levi mentions). It’s no so much that it’s not implemented, as much that it’s actively prevented.

In fact, that documentation does provide the “escape hatch” mechanism that you can use if you explicitly want to recorder. (A via which the documentation explicitly warns that if you use that to reorder that your downstream should expect it.)

Obviously not as elegant as the syntax feature which Levi proposes. But, if you just want to pass around some context without the safeguards provided by SourceWithContext it might be just as easy to pass around a tuple or other simple wrapper.

Thanks for answers.
I totally understand why it is important for Kafka consumer use case, anyway I believe it is not the only one, for me things like context propagation (like traceId) or some metrics related values (e.g. to be able measure how long it took to process given message) also fits well under Context aware akka-streams components.
What is odd for me is fact that design decisions like API for given components in general purpose library are limited because of single use case which is Kafka consumer. @leviramsey .reordering sounds good IMO as I understand that for backward compatibility it is not possible to change default behaviour