Issue with EventProducerSource in app with many event sourced entitites

Hello,

I’m developing the Akka application using Event Sourcing in Cluster and Projections.
I started the project using the examples given in Akka’s examples on GitHub.

This is how the project looks:

Event sourced Workspace setup:

  def apply(workspaceId: String): Behavior[Command] =
    EventSourcedBehavior
      .withEnforcedReplies[Command, Event, State](
        persistenceId = PersistenceId(EntityKey.name, workspaceId),
        emptyState = State.empty,
        commandHandler = (state, command) => handleCommand(state, command),
        eventHandler = (state, event) => handleEvent(state, event)
      )
      .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100))
      .withTaggerForState { case (state, _) => Set("workspace") }
      .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))

Event producer setup:

    val eventProducerSource = EventProducer.EventProducerSource(
      entityType = "Workspace",
      streamId = "workspace",
      transformation = transformation,
      settings = EventProducerSettings(system)
    )

Projection setup:

  def init(system: ActorSystem[_]): Unit = {
    implicit val sys: ActorSystem[_] = system
    val numberOfProjectionInstances  = 4
    val projectionName: String       = "workspace-events"
    val sliceRanges =
      Persistence(system).sliceRanges(numberOfProjectionInstances)

    val eventsBySlicesQuery = GrpcReadJournal(List(WorkspaceEventsProto.javaDescriptor))

    ShardedDaemonProcess(system).init(
      projectionName,
      numberOfProjectionInstances,
      { idx =>
        val sliceRange = sliceRanges(idx)
        val projectionKey =
          s"${eventsBySlicesQuery.streamId}-${sliceRange.min}-${sliceRange.max}"
        val projectionId = ProjectionId.of(projectionName, projectionKey)

        val sourceProvider = EventSourcedProvider.eventsBySlices[AnyRef](
          system = system,
          eventsBySlicesQuery = eventsBySlicesQuery,
          entityType = eventsBySlicesQuery.streamId,
          minSlice = sliceRange.min,
          maxSlice = sliceRange.max
        )

        ProjectionBehavior(
          R2dbcProjection.exactlyOnce(
            projectionId = projectionId,
            settings = None,
            sourceProvider = sourceProvider,
            handler = () => new RdbcHandler(system)
          )
        )
      }
    )
  }

Part of conf which I think is relevant:

akka.projection.grpc {
  producer {
    query-plugin-id = "akka.persistence.r2dbc.query"
  }
  consumer {
    client {
      host = "127.0.0.1"
      port = 8101
      use-tls = false
    }
    stream-id = "workspace"
  }
}

Issue: This configuration works as expected, but when I create multiple event-sourced entities, along with event publisher and projections for each entity, then event publishing doesn’t work. I tried putting different stream IDs or putting wildcard (‘*’), changing streamId in akka.conf, but nothing made the app work for other entities. I always receive dead letter warn.

If I remove stream ID from akka conf then I get:

[info] java.lang.IllegalArgumentException: requirement failed: Configuration property [stream-id] must be an id exposed by the producing side but was undefined on the consuming side.

I also tried changing events by slices query in projection for different entity type, but it didn’t fixed an issue:

    val eventsBySlicesQuery =
      GrpcReadJournal(
        GrpcQuerySettings("Project"),
        List(ProjectEventsProto.javaDescriptor))

I think the issue is somewhere between event sourced entity and the event producer but I’m not sure where to look.

Thanks

The stream id is a “public” version of the entity type, each grpc projection only handles events for a single entity type.

You’ll need config blocks for each of the entity types, each with their own stream-id, each with their own EventProducerSource, start the producer gRPC endpoint with a list of those, and then start separate consumers for each of the types/stream ids (given that the consuming service is consuming all of them ofc).

1 Like

Is there any example of how to set up a config block per entity?
I looked at akka-projection examples but I didn’t find any examples. Same with the docs.

I’m afraid we don’t have any ready examples for that.

Actually, I mixed it up with replication where we have a predefined config format.

To create EventProducerSource for each entity on the producing you use the apply method which takes the entity type, the stream id, etc:

val settings = EventProducerSettings(system)
val eventProducerSources = Set(
    EventProducerSource("entityType1", "streamId1", transformation, settings),
    EventProducerSource("entityType2", "streamId2", transformation, settings),
    EventProducerSource("entityType3", "streamId3", transformation, settings))

val handler = EventProducer.grpcServiceHandler(eventProducerSources)

... bind handler to server...

On the consuming side you create a GrpcReadJournal for each entity using the apply that takes a GrpcQuerySettings which in turn can be created with the stream id as a parameter.

val projectionName: String = "stream1-consumer"
val numberOfProjectionInstances = 4
val sliceRanges =
      Persistence(system).sliceRanges(numberOfProjectionInstances)

ShardedDaemonProcess(system).init(
  projectionName,
  numberOfProjectionInstances,
  { idx =>
    val eventsBySlicesQuery = GrpcReadJournal(
      GrpcQuerySettings("streamId1"), 
      List(protoDescriptorForThatEntityType))

    val sliceRange = sliceRanges(idx)

    val sourceProvider =
      EventSourcedProvider
        .eventsBySlices[String](
          system,
          eventsBySlicesQuery,
          eventsBySlicesQuery.streamId,
          sliceRange.min,
          sliceRange.max)

     ProjectionBehavior(
        // or whichever type of projection you are running
        R2dbcProjection.atLeastOnceAsync(
          projectionId,
          None,
          sourceProvider,
          () =>
            new EventHandler(
              projectionId,
              eventsBySlicesQuery.streamId,
              sys)))
  })

This will then be needed for each entity type/stream id (you might also want to scale them separately rather than use a hardcoded 4 partitions like here ofc, some entities may produce a lot of events while others not).

I’ve tried that, but an event producer hasn’t been called.

I think the issue might be somewhere in event-sourced behavior.

val EntityKey: EntityTypeKey[Command] =
    EntityTypeKey[Command]("Workspace")

  def init(system: ActorSystem[_]): Unit =
    ClusterSharding(system).init(Entity(EntityKey)(entityContext => Workspace(entityContext.entityId)))

  def apply(workspaceId: String): Behavior[Command] =
    EventSourcedBehavior
      .withEnforcedReplies[Command, Event, State](
        persistenceId = PersistenceId(EntityKey.name, workspaceId),
        emptyState = State.empty,
        commandHandler = (state, command) => handleCommand(state, command),
        eventHandler = (state, event) => handleEvent(state, event)
      )
      .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100))
      .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))

Is there a way to setup stream ID here?

The stream id is only used between a Akka Projection gRPC producer and consumer to identify which of the entities the producer serves that a specific consumer wants to consume, without tying that to the service-internal entity type name.

Not sure what you mean with “en event producer has not been called”. Note that Akka Projection gRPC is not a push but a pull based way to consume events, nothing will happen unless a consumer connects to the producer and requests it to stream events.