Custom SourceProvider for events from outside Akka (Custom SQL query | TimescaleDB)(R2DBC)

I’m attempting to take advantage of Akka Projections (R2DBC) for processing IoT metrics collected in TimescaleDB hypertable.

I was successful in creating a basic source provider (ticket) that works correctly when using Sequence OffsetType.

However this is only fine if I have a global event sequence for all device events which I very much would like to avoid. This seems to leave me with two options which do not seem viable:

Start a projection for every deviceId
Basically I could use what I have right now but with slightly different SQL query. Big downside here is that this cause many more DB calls since I need a new SourceProvider instance for each device (10k+) and prevents me from using ShardedDaemonProcess easily.

Implement TimestampOffset based SourceProvider
I’ve done a sample implementation, I’ve extended SourceProvider with BySlicesSourceProvider and it works*, but there are couple of issues:

  • Since my event table does not contain slice column, I calculate slice from deviceId%totalNumberOfSlices this inevitably causes an exception when projection tries to store the offset, for the purpose of the test I’ve just set the min&max slice in BySlicesSourceProvider to 0 and Int.MAX respectively which circumvents the issue.

  • Every event processed produces a record in akka_projection_timestamp_offset_store. Amount of data stored roughly equates to the event itself which assuming I want to run 10 projections would effectively shift my storage requirements by an order of magnitude. (Maybe it’s a consequence of this sketchy approach to slices, I’m not sure if that is how its supposed to behave)

So I’m at a loss here, I ask for your suggestions how you would approach this problem.
Do you think using Akka Projections in this scenario is even valid?

Thanks

You should probably calculate the slice in the same way as Akka Persistence, i.e. a fixed number of 1024 slices.

math.abs(persistenceId.hashCode % numberOfSlices)

You can probably use akka.persistence.Persistence(system).sliceForPersistenceId

It probably works to use deviceId as the persistenceId, but even better if you can prefix it with an entityTypeName and | separator. E.g. MyDevice|device123. See akka.persistence.typed.PersistenceId

Regarding the offset storage, that is how it is designed. It will delete the old entries when they fall outside a time window. Note that the offset store is also responsible for validation of that the events are arriving in the expected sequence number order. That is a sequence number per persistenceId. Gaps are not allowed, and it will deduplicate already seen sequence numbers.

Thank you Patrik.

I’ve added new events to the test dataset and offset store was indeed cleared up.
So that is all well and good, thank you for the clarification.

Since I need to query by slice to mimic how the work sharing is done, I need to have slice number available in DB - in my case compute it.
Therfore I think it’s not possible to take advantage of akka.persistence.Persistence(system).sliceForPersistenceId

Current deviceId%totalNumberOfSlices approach is basically free (query for 1k events takes ~600ms). To match what what set in the envelope (s"device-$deviceId") I’ve decided to code sql procedure that replicates Java hashCode - it works and now I produce exactly the same slice numbers. But, and it’s a big but - it takes about 24 seconds to complete same query.
(Test done on my desktop, no indexes, no nothing, just a proof of concept)

In my opinion, if there exists something like BySlicesSourceProvider it would be beneficial for it to also take care of how the slice is calculated for a given source.

Or, because that struck me as weird at the time, when one uses EventEnvelope there is an argument “sliceNr” which I populated according to my formula.

  def apply[Event](
      offset: Offset,
      persistenceId: String,
      sequenceNr: Long,
      event: Event,
      timestamp: Long,
      entityType: String,
      slice: Int): EventEnvelope[Event] =
    apply(offset, persistenceId, sequenceNr, event, timestamp, entityType, slice, filtered = false, source = "")

Unfortunately it seems to be disregarded down the line, whole thing could be avoided if only that field would be respected, then I could stick with the current way of calculating the slice. That change would also probably be compatible with journal events.

What are your thoughts on that?

Thank you.