So I have a Source
of Committable[A]
elements
case class Committable[A](commit: () => Future[Unit], value: A)
val source: Source[Committable[A], NotUsed] = ...
and I want to transform it into Source
that runs commit
after successful processing of value
by downstream.
The naive implementation (that works with fs2
btw) is:
def autocommit[A](in: Source[Committable[A], NotUsed]): Source[A, NotUsed] =
in.flatMapConcat { c =>
val commit = Source.single(())
.mapAsync(_ => c.commit())
.flatMapConcat(_ => Source.empty[A])
Source.single(c.value).concat(commit)
}
The problem is that c.commit()
is called regardless of how c.value
is processed by downstream. So the following code would commit:
autocommit(source).mapAsync(1)(_ => Future.failed(new RuntimeException))
As I understand from docs this is kinda expected behaviour due to materialization and buffering.
So is it possible to achieve what I want?