I’ve read the docs, but I can’t figure out what it is. ‘Tag’ is a static partitioning, ‘slice’ is dynamic? I have not seen anywhere the principle of dividing identifiers by ‘slices’, only indicating the number of these slices. Can anyone describe in more detail the essence of this abstraction and how specifically it is implemented in queries?
Removing this response: more definitive and accurate answers were given later in the thread.
I’ve read the Tagging Events in EventSourcedBehavior. There is no problem
“When consuming from Akka Persistence Journal, the events must be sliced by tagging them”, I have a tag vector
for the entity type
and a tag
for each entity calculated by entityId
val i = math.abs(entityContext.entityId.hashCode % tags.size)
val selectedTag = tags(i)
From R2DBC plugin “it’s important that each Projection instance consumes a subset of the stream of envelopes.” and eventsBySlices
is used in the example. There is no problem too. But I can not understand “A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all persistence ids over the slices”. How does it achieve? There is no code (entityId → slice) such as for tag
(entityId → tag)
I saw the schema, and slice
is a column in the table. Thus, it is calculated when adding an event
slice INT NOT NULL
, but slice ranges are calculated later when projection starts.
I am completely confused how it works before the projection is launched
Updated:
class Persistence(val system: ExtendedActorSystem) extends Extension
/**
* A slice is deterministically defined based on the persistence id.
* `numberOfSlices` is not configurable because changing the value would result in
* different slice for a persistence id than what was used before, which would
* result in invalid eventsBySlices.
*
* `numberOfSlices` is 1024
*/
final def numberOfSlices: Int = 1024
/**
* A slice is deterministically defined based on the persistence id. The purpose is to
* evenly distribute all persistence ids over the slices and be able to query the
* events for a range of slices.
*/
final def sliceForPersistenceId(persistenceId: String): Int =
math.abs(persistenceId.hashCode % numberOfSlices)
/**
* Scala API: Split the total number of slices into ranges by the given `numberOfRanges`.
*
* For example, `numberOfSlices` is 1024 and given 4 `numberOfRanges` this method will
* return ranges (0 to 255), (256 to 511), (512 to 767) and (768 to 1023).
*/
final def sliceRanges(numberOfRanges: Int): immutable.IndexedSeq[Range] = {
val rangeSize = numberOfSlices / numberOfRanges
require(
numberOfRanges * rangeSize == numberOfSlices,
s"numberOfRanges [$numberOfRanges] must be a whole number divisor of numberOfSlices [$numberOfSlices].")
(0 until numberOfRanges).map { i =>
(i * rangeSize until i * rangeSize + rangeSize)
}.toVector
}
So. If everything is simplified, then the slices
are the tag vector with 1024 (is not configurable) length. The projection instance uses a range of tags from this vector.
Misleading documentation “For example if using 1024 slices and running 4 Projection instances” - we cannot use other then 1024 slices. And number of Projection instances can only be the result of raising the number 2 to a power (max 1024).
The big difference from tags is the possibility to partition the set of slices into ranges that can efficiently be queried from the database.
There is currently no dynamic scaling of the number of projection consumers but that is something we had in mind when introducing slices as a new way to partition events instead of the old events-by-tag-scheme.
You are right that the number of slices is hard coded and that the docs are incorrect about that, we made the assumption that a max of 1024 parallel consumers would “be enough for anyone” rather than make it configurable for now. This also avoids anyone shooting themselves in the foot by changing the number when there is an existing dataset.
We expect that in most applications a few up to something in the order of tens of consumers (so 4, 8, 16, 32, 64) will be a good balance between resource overhead and possible parallelisation.
I am sure that a big step has been taken away from static tags.
When I read the documentation, I could not understand the magic at all, how they were able to achieve this. And any number of slices and any number of projections and no restrictions. Now everything is clear.
Thank you for the additional clarifications, and especially for the development and implementation of ideas
This has already been answered, but it’s an important question and I would like to add some more details (and maybe repeat some of what has already been said).
The main purpose of the eventsBySlices
query is to consume all events for an entity type in Akka Projections. That was the practical usage of eventsByTag
as well. You want to split up the total number of events into “partitions”, here has named slices, so that you can have several consumers processing the events in parallel (possibly on different nodes in an Akka Cluster). The ordering guarantee that we must honor is that events from a specific entity (persistenceId) must be delivered in the same order as they were emitted by the entity.
The slice that an entity belongs to is deterministically derived from the persistenceId.
math.abs(persistenceId.hashCode % 1024)
When you start the Projection instances you may first decide that 2 projection instances are enough. Projection instance 1 will consume events for slices 0 to 511. Projection instance 2 will consume events 512 to 1023. What is important is that the offsets of the consumed events are stored for each slice.
Later you have a higher load of events and decide to change to 4 projection instances. Then the Projection instances will be responsible for slice ranges (0 to 255), (256 to 511), (512 to 767) and (768 to 1023). That split is possible because we have stored the offset for each slice and the new projection instances can continue from the previous offset. You can change to less Projection instances as well.
Changing number of Projection instances like this is not possible with tags. You decide the number of tags when you write the events and you must have the same number of Projection instances with eventsByTag as number of tags that you decided up front. That caused the recommendation to use as many tags as you might possibly need in the future, and therefore run “too many” Projection instances.
A nice UX benefit is that you don’t have to care about tagging in the EventSourceBehavior
, since the slice is always included. You can even decide to add Projections later and still be able to consume events that were stored before you added the Projections.
I haven’t figured out all the details yet.
It turns out that we always have 1024 stored offsets. When the projection
is started, a stream of events is generated for each slice
from the stored offset and these streams are combined in accordance with the slices range
. Are these thoughts in the right direction?
I’d say that are implementation details. Conceptually you can think of it that an offset is stored for each slice and a projection instance is started from the lowest offset in the slice range that it is responsible for, with deduplication of events that have already been processed.
The Akka R2DBC Projection implementation is actually storing offsets for each persistenceId, but now we are getting deep into implementation details.
I’ll note that it is possible to implement something similar (including the ability to add arbitrarily many projections) via tags, though it has to be implemented in domain terms (when I did this, I called it “domain sharding” to distinguish from cluster sharding). The basic idea is to assign a persistence ID to a domain shard as part of its genesis event: the persistent behavior then tags events to its shard as needed. When you need more shards, double the number of domain shards (doubling the projection instances) which roughly halves the rate at which new persistence IDs are assigned to any particular shard. Then each projection conceptually has only a single offset for the single shard it’s handling (and conceivably different tags might be sharded differently).