I have a flow with mapAsync
stage. This stage should emits its elements in the same order they were pushed. I verified (using log printing) that events are pushed by the desired order. However, my mapAsync
stage doesn’t always pushed this elements in the desired order. I think the the function I used caused this issue, but
MyFlow (the issue is at the second mapAsync
):
Flow[EventFlowInfo]
.mapAsync[EventOrError](EventVerificationParallelism) { eventFlowInfo: EventFlowInfo =>
// toDo: add verification rules
Future(Right(eventFlowInfo))
}
.mapAsync[EventOrError](5) { eventOrError =>
runStage("event storage", eventOrError, persistToEventStorage)
}
runStage
method:
private def runStage(stageName: String,
eventFlowInfoEither: EventOrError,
stage: (EventFlowInfo) => Future[EventOrError]): Future[EventOrError] = {
eventFlowInfoEither match {
case Right(eventFlowInfo: EventFlowInfo) =>
logger.info(s"running stage for $eventFlowInfo")
stage(eventFlowInfo)
case error@Left(_) =>
logger.error(s"failed event will not be persisted to $stageName")
Future(error)
}
}
persistToEventStorage
method:
private[flow] def persistToEventStorage(eventFlowInfo: EventFlowInfo): Future[EventOrError] = {
eventFlowInfo.event match {
case Some(event: Event) =>
logger.info(s"persisting event of doc [${event.docId}] from sender [${eventFlowInfo.sender}] to event storage")
create(Event.toEventRow(event))
.map[EventOrError]{ eventId =>
logger.info(s"event for doc [${event.docId}] from sender [${eventFlowInfo.sender}] was created successfully with id [$eventId]")
val updatedEvent: Event = event.copy(id = eventId)
Right(eventFlowInfo.copy(event = Some(updatedEvent)))
}
.recover { case ex: Throwable =>
logger.error(s"was not able to persist event for doc [${event.docId}] from sender [${eventFlowInfo.sender}]", ex)
Left(EventStorageException(ex, eventFlowInfo))
}
case None =>
Future.successful(Right(eventFlowInfo))
}
}
It’s important to say that I verified that there is no issue with the method create
, however i do think that something is wrong in persistToEventStorage
method. I’m using the same ExecutionContext
.
Scala version “2.12.7”
Akka version “2.5.17”