I have a Source[T, ActorRef[T]]
that was created via calling ActorSource.actorRef[T]
as per ActorSource.actorRef • Akka Documentation. When I send the type matched by the completion matcher, the stream does not end as expected. Is there some other action I need to take to successfully complete the stream?
Here is code:
sealed trait ActorProtocol
case class Response(r: HttpResponse) extends ActorProtocol
object Complete extends ActorProtocol
// ... in a function that creates the stream
val (actor, httpSource) = ActorSource.actorRef[ActorProtocol](
completionMatcher = { case Complete => println("Complete message sent!")},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
OverflowStrategy.dropNew
).preMaterialize()
// ... a while later, want to end the stream
actor ! Complete
// in console, see "Complete message sent!"
Since as the console showed, the message was received by the actor materialized by the stream, I’d expect the stream to complete, except it does not.
Here is some more code:
// a class to make completing the stream more ergonomic
sealed abstract class Completable[T](ref: ActorRef[T], completeMessage: T) with Cancellable {
def complete =
cancel
ref ! completeMessage
}
// in companion object
def fromCancellable[T](cancellable)(actor: ActorRef[T], completeMessage: T) = new Completable(actor, completeMessage) { override def cancel = cancellable.cancel }
// body of function that creates the stream - yields Source[HttpResponse, Completable]
val (actor, httpSource) = ActorSource.actorRef[ActorProtocol](
completionMatcher = { case Complete => println("Complete message sent!")},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
OverflowStrategy.dropNew
).preMaterialize()
val runnable = new Runnable {
override def run(): Unit = httpClient.makeRequest(request) onComplete {
case Failure(exception) => throw exception
case Success(res) =>
actor ! Response(res)
}
}
val completable = fromCancellable(akkaScheduler.scheduleAtFixedRate(Duration.Zero, dur)(runnable))
httpSource
.via(Flow.fromFunction[ActorProtocol, HttpResponse] { p => p match {
case Response(r) => r
}})
.mapMaterializedValue(_ => completable)
// how it gets called
val (completable, stream) = codeToCreateStream
// ... some time later
completable.complete
// see complete message printed, expect stream to complete, but it does not :(