Hi @seglo,
Thank you for your answer.
I’d like to understand why is there this limitation. Is it some fundamental kafka limitation, or is because of the way alpakka TransactionalProducerStage is written, and it should be possible to write a similar stage without this limitation.
What I am ultimately trying to do is to perform a sorted merge based on the timestamp from different topics containing different types of messages coming from different systems. It is impossible to guarantee ordering between those systems, but I know that for each partition the messages are in order, so it’s possible to perform this merge. This merge of course can’t handle all possible cases, but it can fix ordering within a certain configurable period.
Because of this I can’t use a single Transactional.source
with multiple topic/partition subscriptions, as I can “advance” in one partition, while some other partition stays at the current message, and I think that a simple Transactional.source
can’t do that.
I think I have a solution for that, and it seems to work correctly, but I feel like I am missing something as it doesn’t seem very complex, yet out of the box alpakka doesn’t allow you to do that.
Here’s what my PoC code does:
case class TransactionalMessageWithTimestamp(
transactionalMessage: ConsumerMessage.TransactionalMessage[ByteString, ByteString],
timestamp: ZonedDateTime
)
// Each of these sources is created with a call to Transactional.source()
val sources: Seq[Source[TransactionalMessageWithTimestamp, Consumer.Control]] = //....
var producer = new KafkaProducer[ByteString, ByteString](producerProperties.asJava)
val doneFuture =
// basically Source.combine
CombinedSource(sources: _*)(MergeSortedWithMaxDelay.strategy(maxDelay))
.map { msg =>
val partition = computePartition(/*......*/)
ProducerMessage.single(
new ProducerRecord(
outputTopic,
partition,
msg.transactionalMessage.record.key(),
msg.transactionalMessage.record.value()
),
msg.transactionalMessage.partitionOffset
)
}
.groupedWithin(50, 1.second) // prepare a batch of messages for one transaction
.map { batch =>
producer.beginTransaction()
val offsets = mutable.Map.empty[TopicPartition, Long]
// basically go through every message in a batch
batch.foreach { message =>
val (records, passThrough) =
message match {
case ProducerMessage.Message(record, pt) =>
(immutable.Seq(record), pt)
case ProducerMessage.MultiMessage(recs, pt) =>
(recs, pt)
case ProducerMessage.PassThroughMessage(pt) =>
(immutable.Seq.empty, pt)
}
// send every message to the producer
records.foreach(producer.send)
val topicPartition = passThrough.key.topicPartition
val offset = passThrough.offset
// keep track of the highest encountered offset for each source partition in a batch
offsets(topicPartition) = math.max(offsets.getOrElse(topicPartition, offset), offset)
}
producer.sendOffsetsToTransaction(
offsets.mapValues(offset => new OffsetAndMetadata(offset + 1)).asJava,
groupId
)
producer.commitTransaction()
}
.runWith(Sink.ignore)
The code is crude, it buffers messages in the memory for the whole duration of the transaction, and blocks the flow during whole transaction, but it seems to be working correctly. It seems it wouldn’t be that hard to rewrite it as a proper sink that keeps track for the highest encountered offset for each partition since the beginning of the last transaction.
Do you think that would work?
Is there a simpler way to achieve this?