We are struggling a bit with the recoverWithRetries and the events coming from a kafka subscriber.
The recoverWithRetries expects the original Source as input for retrying.
Except we seem not to get this from the subscriber, which return a CompletableStage.
We use the event subscriber for instance to hook up to an external system through http.
So it occurs that you have connectivity or other exceptions which restarts the client.
But what we actually want is to retry for 3 times and then persist the errornous event elsewhere.
Do you have an example somewhere on how to resolve this properly and gracefully handle this scenario?
Other solution could be to handle retries, per message, in message handler, by chaining invoke CompletableStages.
Persist would then be done in final stage of the chain by sending a command to entity or by publishing to other kafka topic (using alpakka kafka). Just a note, if any of this stages fail stream will be restarted and complete message handle will be repeated.
Would it be possible to use the recoverWithRetries functionality in combination with subscribers?
The below pseudo code will illustrate the issue:
Flow flow = Flow.fromFunction(i -> {
throw new RuntimeException(String.format("%d could not be handled", i));
}).recoverWithRetries(3, ???);
topic.subscribe().atLeastOnce(flow);
The subscribe.atLeastOnce(...) returns a CompletionStage<Done>, but the flow earlier needs to have this information in order to retry it again. Does that mean that this concept of retrying doesn’t come out-of-the-box for subscribers and need to be custom build (e.g. with a dedicated error queue)?
CompletionStage<Done> is used to commit offset to kafka when message is consumed successfuly. If CompletionStage fails subscriber is restarted and message is consumed again.
I did not try it but would assume that needs to be custom build.