Didn’t dig into what/how the monix API works there but I can’t see how the usage of unfoldResourceAsync would be a problem except for an extra allocation of a future for each element in the stream which could be a reason to aim for unfoldResource instead, it already runs on a separate threadpool so blocking in open and close could be ok there, not sure.
Important to note is that in general a source can be materialized any number of times, so if that code leads to a single shared atomic that could be a problem.
Unfortunately what I’m thinking of doesn’t work. Here’s a dirty code sample:
import akka.Done
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.concurrent.atomic._
// Note the Atomics here are just for boxing values, nothing more
val source = Source.unfoldResourceAsync[AtomicInteger, (AtomicInteger, AtomicBoolean)](
create = () => Future((new AtomicInteger(1), new AtomicBoolean(true))),
read = {
case (ref, isActive) =>
Future {
if (isActive.compareAndSet(true, false))
Some(ref)
else
None
}
},
close = {
case (ref, _) => Future { ref.set(0); Done }
}
)
implicit val as = ActorSystem("test")
val result = source.flatMapConcat { number =>
println(s"Received (1): ${number.get()}")
Source.lazyFuture(() =>
Future {
Thread.sleep(2000)
number.get()
}
)
}.runFold(-1)((_, num) => num)
val r = Await.result(result, 10.seconds)
println(s"Received (2): $r")
Await.result(as.terminate(), 10.seconds)
And the output:
Received (1): 1
Received (2): 0
(these values received should have been equal)
As I suspected, the close isn’t back-pressured and so the resource is closed sooner than I’d like. It makes some sense to behave like that, if there’s buffering, plus the “reactive streams” protocol doesn’t back-pressure the final complete, but I don’t think we can leak resources from unfoldResourceAsync like that.
Any other way I could make this work?
UPDATE: modified sample to use just the standard library and akka
Damn, I just realized that what I’m trying to do is incompatible with org.reactivestreams.Publisher.
If the resource is streamed and the Publisher must be in charge of closing it, the problem is there’s no way to force the Subscriber to just request(1). This means that the Publisher has no way of knowing when the Subscriber is done processing the streamed resource, if that processing is asynchronous, as the only back-pressuring mechanism is request(n).
The other problem, this time with Akka Streams, seems to be that flatMapConcat does not back-pressure, doing request(16) on my source. Or maybe that’s from Source.fromPublisher
How could I force a request(1) from that Source.fromPublisher(source).flatMapConcat?
Yes, you are correct, if you expect to be able to force a single element in flight in a stream at a given moment that is not really possible since there can be explicit buffers, implicit in stage buffers, fan out, cycles etc downstream.
In general this is not a problem because you use Akka streams to pass immutable elements, or if they are mutable, at least only emit the individual instances once per instance. Resources are managed and extracted from in a single stage/operator and the result of interacting with them is sent downstream (unfoldResource/unfoldResourceAsync for example).
If you want your source to not emit another element before the previous one has reached some other point in the stream you cannot do this through the built in backpressure but will need to create some custom channel back from there to your source and have it deal with both backpressure and the additional custom one-at-a-time backpressure. (Not saying this is a great idea though)