Akka stream merge data from multiple replicas system

Hello everyone,

I’m encountering an issue while working with Akka Stream and I’m hoping someone can assist me in resolving it.

Here’s the context: I have two systems, let’s refer to them as SystemA and SystemB, both deployed on a Kubernetes (k8s) cluster. SystemA is deployed with two replicas, namely SystemA-1 and SystemA-2, while SystemB has only one instance.

SystemA exposes two APIs: API-1, which receives data from other systems, and API-2, which returns a source to other systems. When SystemB starts up, it randomly selects one of the SystemA replicas and consumes API-2. Essentially, API-2 acts as a pipeline between SystemA and SystemB. Currently, SystemA receives data from API-1, and I’ve implemented a Source.queue in SystemA to offer the incoming data from API-1 to API-2. Consequently, SystemB receives data from SystemA, but the issue arises because SystemA has multiple replicas. Due to the load balancing strategy, different replicas receive different data (e.g., data 1, 2, 3, 4). Data 1 and 3 are sent to SystemA-1, while data 2 and 4 are sent to SystemA-2. However, since SystemB only connects to a randomly chosen replica, it only receives data from either SystemA-1 (data 1, 3) or SystemA-2 (data 2, 4). What I aim for is to enable SystemB to receive all data (1, 2, 3, 4).

I suspect the issue lies in my usage of Source.queue for handling data delivery. After researching, I came across MergeHub as a potential solution to this problem. However, I’m unsure how to implement the code. If anyone has a solution or suggestions, I would greatly appreciate your assistance. Thank you.

Hello,

You’re right, your current approach with Source.queue only delivers data from the chosen replica to SystemB. To achieve your goal of SystemB receiving data from all SystemA replicas, you can leverage Akka Streams’ merging capabilities. Here’s how to tackle this with MergeHub:

Replace Source.queue with Separate Sources: In SystemA, instead of using a single Source.queue, create separate HealthCareGov sources for each replica. This ensures data from each replica is streamed independently.

Utilize MergeHub: Use MergeHub.source to create a new source that merges the data streams from all SystemA replicas. MergeHub acts as a central point where all individual streams converge.

Connect to API-2: Connect the merged source from MergeHub to your API-2 endpoint. SystemB will now receive data from all replicas through this single source.

// In SystemA (each replica)
val sourcePerReplica = // Logic to create a source for incoming data from API-1

val mergedSource = sourcePerReplica.mergeMat(MergeHub.source[YourDataType])(Keep.right)

// Connect mergedSource to your API-2 endpoint

We define sourcePerReplica to handle data received by each replica from API-1. This could involve reading from a queue or message broker specific to your implementation.

MergeHub.source[YourDataType] creates a new source specifically designed to merge streams of type YourDataType (replace with your actual data type).

mergeMat combines the individual sources with the MergeHub source. Keep.right ensures the merged source is used further.

Finally, connect the mergedSource to your API-2 endpoint, making it available to SystemB.

I hope the solution may help you.

Hi everyone,

To ensure SystemB receives all data from both replicas of SystemA, you can use Akka Streams’ MergeHub. This will combine the streams from all replicas into a single stream that SystemB can consume.

Here’s a quick guide:

Set up MergeHub in SystemA :

val (mergeHubSource, killSwitch) = MergeHub.source

.viaMat(KillSwitches.single)(Keep.both)

.toMat(BroadcastHub.sink(256))(Keep.both)

.run()

Send data to MergeHub :

def offerToMergeHub(data: String): Unit = {

Source.single(data).runWith(Sink.fromSubscriber(mergeHubSource))

}

Expose the merged stream via API-2 :

path(“api-2”) {

get {

complete {

mergeHubSource.map(data => ServerSentEvent(data))

}

}

}

SystemB consumes the stream :

val responseFuture = Http().singleRequest(HttpRequest(uri = “http://systemA-instance/api-2”))

responseFuture.flatMap { response =>

Unmarshal(response).to[Source[ServerSentEvent, _]]

}.foreach { source =>

source.runWith(Sink.foreach(event => println(s"Received event: ${event.data}")))

}

This way, SystemB will receive all the data from both replicas of SystemA.

Hope this helps!

Thank you
gregbowers