Right, so, all producers have ProducerSettings::parallelism
, which is the maximum number of in-flight objects (these objects may be MultiMessage
s or Message
s). This maximum is enforced by an internal buffer in the MapAsync
stage that is part of all producers.
Note that it’s a MapAsync
rather than a MapAsyncUnordered
, which means that the buffer only pulls new elements when the current head is finished (acknowledged by Kafka). This is probably a conscious design decision to support the commitableSink
cases (Kafka-to-Kafka streams) - out-of-order production of results would commit offsets in the upstream consumer without guaranteeing that all records in between have been processed.
This means:
- We are still using native Kafka producer batching. But: with a default flow using
Message
s (default parallelism is 100), we only allow 100 in-flight records at a time which may be way below batch sizes in Kafka, and we may incur additional latency by the order guarantee. In particular, if 100 messages don’t hit thebatch.size
in yourKafkaProducer
settings, then you may waitlinger.ms
for every batch of 100 records, since the latency of talking to Kafka may well be much higher than the time it takes to pull new elements into the flow stage. - Increasing
ProducerSettings::parallelism
should alleviate this problem. - Using
MultiMessage
s should similarly alleviate the problem. - At first glance, it seems that the performance of the latter should be slightly better. The first one keeps two futures for every record, the latter keeps one for every record and one for every batch.
- Arguably, it might be easier to see at first glance what the max number of inflight records is if we only adjust
parallelism
, though?
@extantpedant, does this help you at all?
For anyone with deeper knowledge of alpakka-kafka:
- Does this sound like a sensible assessment?
- Any comments on
MultiMessage
vs increasingparallelism
? - Would it be sensible to provide producers that do
mapAsyncUnordered
in the library code?