Is there any way to use sharding like mechanism with consistent hashing?
I mean I would like to have one ActorRef per Akka node to which I can send a message that will be after forwarded to the appropriate receiver.
I know that there are consistent hashing group and pool, but they are not creating new actors base on incoming messages as Akka sharding does.
I want to use this approach as sometimes shard coordinator singleton is on Akka node under high load, and Akka cluster stops working.
With consistent hashing, there is no need for a coordinator as message are dispatched to actors/nodes base on hash derived from a message.
This is exactly what sharding does. However you have to take into account that the topology of your cluster may change over time - nodes crashing, being removed - re-added etc. which is why sharding needs to have consistent state and rebalancing and cannot permanently tie a hash to a specific node.
Note that the interaction with the coordinator is just to figure out where a shard is, the general, happy case, will not require any coordination but the sending node will already know what node hosts a specific shard .
Looking into implementing your own ShardAllocationStrategy will likely provide some insights on this topic.
Couldn’t you make the rebalancing “formula” deterministic as well, so that each node can update the rebalanced shard-to-node mappings themselves without having to contact the shard coordinator? After all, each node is informed if any node in the cluster leaves / joins. That way you could get rid of the shard coordinator altogether, and each node would know immediately where to send a message. Or are there inevitable inconsistency issues when doing it this way?
This approach would also allow for better scalability and resource-balancing as you could greatly increase the number of shards. I don’t think this is practical, but to make the point, in the most extreme case you might even be able to eliminate the need for shards and just have what’s currently called shard regions (calculate the node on a per-entity level). The latter is only practical if the mappings can be expressed as a fast mathematical formula, even after rebalancings.
A new node joining the cluster would have to be informed about the history of the topology events (i.e. rebalancings) or a snapshot of the current mappings, to be able to calculate the correct mappings in the future. Not much different to what happens in receiveRecover of a persistent actor. The new node could obtain this information from any other node in the cluster as part of the joining-process.
Even with a deterministic function telling you where a specific actor lives you’d have to deal with stopping old actors and starting new ones when the topology changes, and this will include some form of handover to make sure the old and new actor are not alive at the same time, so then you are back to some form of coordination.
Currently the nodes only know the current state from when they joined and then the topology changes after that you’d also need some form of coordination tracking the topology since the first start of the cluster (probably not great for that large long lived clusters).
I’m you can come up with alternatives to how Cluster Sharding is implemented in Akka with different pros and cons but I’m not convinced you can come up with something that requires less coordination unless you loose the requirement to not have the same sharded entity alive concurrently in the cluster.
I’ve seen one alternative sharding implementation based on a consistent hash ring presented by Francesco Di Muccio at TrueCaller but it seems that never ended up being open sourced. It also had its own specific pros and cons though.
Please let us know if you explore this topic further!
Thanks a lot for your reply. I agree that that the current implementation is fine, and practical. Unless the coordinator singleton can ever become a bottleneck, there is no need to change it.
Also, don’t let me waste your time if this is unproductive. But here is what I think:
handover to make sure the old and new actor are not alive
Both the old node and the new node would know when the handover of a shard is completed. Between those two nodes it could be ensured that only one instance of the shard (entities) is active. Now, if another node mistakenly sends a message to the old node (after handover completed) or to the new node (before handover completed), the receiving node could forward that message to the other, correct node, since it knows that the actor lives there. This principle could be universally applied to redirect falsely addressed messages, because each node knows exactly which shards it controls. If it receives a wrongly addressed message, it just has to send it to the node it believes to be correct.
During handover, both nodes would stack incoming messages, and then send them to the correct node once handover is complete.
The mapping function would still take the place of the coordinator, which would have the mentioned scalability benefits.
you’d also need some form of coordination tracking the topology
I thought that functionality is already implemented by Akka, no?
Couldn’t the leader coordinate this? The leader’s job is already to track cluster events, so there would not be any issue in respect to order of events.
Both the old node and the new node would know when the handover of a shard is completed.
I think this was the strategy taken by the consistent hash ring implementation I mentioned, however it is quite a bit trickier than one may expect, because of the possibility of a chain of nodes leaving before the handover ever gets done and message delivery ordering guarantees.
I thought that functionality is already implemented by Akka, no?
Couldn’t the leader coordinate this? The leader’s job is already to track cluster events, so there would not be any issue in respect to order of events.
The only difference between the leader and the the rest of the nodes is the responsibility to take topology decisions that need cluster consistency to be taken safely, which node that is the leader can change at any time (because it is downed) without any form of handover. The historical state of the cluster is not kept internally, only the current view from the point of view of the node.
Ok cool. Thanks again for your answers, was an interesting discussion. Should I ever become so successful using Akka that the coordinator becomes a bottle-neck, I might see to implement the consistent hash ring. Until then I’ll just stick with the current (and good) implementation like everyone else