Thanks @seglo
Unfortunately your hints doesn’t match my use case. Maybe more context will give you better understanding:
We would like to provide component that will allow publish notification to kafka for our web applications. Each HTTP request to webserver/result of consumption message from RabbitMQ/Kafka can result in notification that should be published.
As we have some experience in using Akka and think that akka streams with its built in backpressurre could benefits here, we’ve decided to give a try and use it as library under this kafka publisher component.
All applications are JVM based but use different technologies: RxJava, Reactor, Akka HTTP, Vert.x so communication between publisher component and other parts of applications must be done via common interface - CompletableFuture
in this case.
As you can see I don’t have fixed number of messages that can be added to SourceQueue
on start - elements are added dynamically and there is no way to re-populate SourceQueue
in case of stream failure.
Pseudo code of what we would like to achieve:
sourceQueue
.map(msgToPublish -> convertToProducerMsg(topicName, msgToPublish))
.via(Producer.flexiFlow(kafkaProducerSettings))
.map(result -> {
if(result.isSuccess()) {
return result.passThrough().complete(KafkaProducerResult.PUBLISHED);
} else {
LOG.error("Error during publication {} to kafka. Exception: {}", result.originalMsg(), result.failureCause())
return result.passThrough().complete(KafkaProducerResult.FAILED);
}
})
.to(Sink.ignore())
.run(system);
Unfortunately Producer.flexiFlow
doesn’t have possibility to produce element like:
ProducerMessage.PulbicationFailedResult(ex: Throwable, failedElement: Envelope[K, V, PassThrough]) extends Results[K, V, PassThrough]
to react not only on publication success, but also failures.
As stream preserve order we figured out hacky solution:
sourceQueue
.mapConcat(msgToPublishWithCompletableFuture -> {
return Arrays.asList( //
ProducerMessage.single(new ProducerRecord<>(topicName, msgToPublishWithCompletableFuture.getMessageToPublish()), msgToPublishWithCompletableFuture),
ProducerMessage.passThrough(msgToPublishWithCompletableFuture)
);
})
.via(Producer.flexiFlow(kafkaProducerSettings))
.fold(Optional.empty(), (state, element) -> {
// publication succeeded and state is empty -> adding successful publication passthrough value as state
// and waiting for associated ProducerMessage.PassThroughMessage
if (element instanceof ProducerMessage.Result && state.isEmpty()) {
return Optional.of(element.passThrough());
}
// received passthorugh element associated with state value -> publication succeeded
else if (element instanceof ProducerMessage.PassThroughResult &&
state.isPresent() &&
state.get().equals(element.passThrough()))
{
element.passThrough().getPublicationResultFuture().complete(PublicationResult.PUBLISHED);
return Optional.empty();
}
// received passthrough element when state is empty -> publication of associated message failed
else if (element instanceof ProducerMessage.PassThroughResult && state.isEmpty())
{
LOG.warn("[PUBLICATION FAILED] Received {}, when state is empty - publication associated with existing state failed", //
((ProducerMessage.PassThroughResult) element).passThrough());
element.passThrough().getPublicationResultFuture().complete(PublicationResult.FAILED);
return Optional.empty();
}
// as we're first receiving ProducerMessage.Message then ProducerMessage.PassThroughMessage
// and stream preserve order other cases are not possible
else
{
LOG.error("[CRITICAL] Illegal state of stream: Received unexpected {} element when state was {}", element, state);
return Optional.empty();
}
})
.to(Sink.ignore())
.run(system);
Like I wrote above it looks a bit hacky for me and maybe it would be worth considering adding the Producer.flexiFlow
variant which produce element both on publication success and failure (if Kafka Producer related exceptions).