I have a function of type Source[ByteString,_] => Future[Either[Fault,(String,Long)]]] that uses sttp to upload data. I have data as a Source[ByteString,_], and I have a pipeline. I’d like to use this function in my pipeline, but I don’t know how to turn it into a flow or a sink. Any ideas?
edit: to make it a bit more clear, I can easily turn this function into a Flow[Source[ByteString,_],T]. What I want is a Flow[ByteString,T] from that.
The Future returned from queue.offer(...) will be completed only when the element has been successfully put to the queue. You could use that as a backpressure signal and replace fold with foldAsync.
i wanted to come back and say that I have reimplemented it with foldAsync in the following way:
val client = Client(hostURI)
val (q, source) = Source
.queue[ByteString](16, OverflowStrategy.backpressure)
.preMaterialize()
val futureResult = client.uploadData(uuid, source, experimentFile)
val flow: Flow[ByteString, PTry[String], _] = Flow[ByteString]
.foldAsync[PTry[SourceQueueWithComplete[ByteString]]](
PSuccess(q)
) {
case (PSuccess(queue), s) =>
queue
.offer(s)
.flatMap {
case Enqueued => Future.successful(PSuccess(queue))
case QueueClosed =>
futureResult.map {
case PSuccess((_, _)) =>
PFailure(
QueuePrematurelyClosedFault(
"The stream queue was closed before it should've been."
)
)
case PFailure(f) =>
PFailure(
QueuePrematurelyClosedFault(
"The stream queue was closed before it should've been",
Some(f))
)
}
case QueueOfferResult.Failure(cause) =>
queue.complete()
Future.successful(
PFailure(
ExceptionFault(cause)
))
case Dropped =>
queue.complete()
Future.successful(
PFailure(
QueuePrematurelyClosedFault("elements were dropped!!")
))
}
.recover {
case f =>
queue.complete()
PFailure(
ExceptionFault(f)
)
}
case (failure, _) =>
Future.successful(failure)
}
.mapAsync(1) {
case PSuccess(queue) =>
queue.complete()
futureResult
case o =>
val res = futureResult.map(r => o.flatMap(_ => r))
res
}