Hello!
I want to send data from my Akka service to another via Rsocket, which supports backpressure.
I have a handler where I receive actors SourceRef, merge them, and send asFlow to rsocket.
The problem is the consumer is too slow and I should batch messages.
In actors, I use:
val source = Source
.queue<EventState>(1, OverflowStrategy.backpressure())
.conflate { arg1, arg2 ->
arg2
}
.preMaterialize(context.system)
And on each update message in actor I send some message in source:
source!!.first().offer(state).toCompletableFuture().thenAccept {
context.log.info("EventActor: ${persistenceId.entityId()}. Send eventState: $state to queue, result: ${it.isEnqueued}")
}
The problem is the source start conflating message only after receiving 53 messages, as I mentioned the consumer is very slow.
Can you help me how to create a source with buffer 1, which will merge messages if the buffer is full?
Thanks!