Hello,
I’m developing the Akka application using Event Sourcing in Cluster and Projections.
I started the project using the examples given in Akka’s examples on GitHub.
This is how the project looks:
Event sourced Workspace setup:
def apply(workspaceId: String): Behavior[Command] =
EventSourcedBehavior
.withEnforcedReplies[Command, Event, State](
persistenceId = PersistenceId(EntityKey.name, workspaceId),
emptyState = State.empty,
commandHandler = (state, command) => handleCommand(state, command),
eventHandler = (state, event) => handleEvent(state, event)
)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100))
.withTaggerForState { case (state, _) => Set("workspace") }
.onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))
Event producer setup:
val eventProducerSource = EventProducer.EventProducerSource(
entityType = "Workspace",
streamId = "workspace",
transformation = transformation,
settings = EventProducerSettings(system)
)
Projection setup:
def init(system: ActorSystem[_]): Unit = {
implicit val sys: ActorSystem[_] = system
val numberOfProjectionInstances = 4
val projectionName: String = "workspace-events"
val sliceRanges =
Persistence(system).sliceRanges(numberOfProjectionInstances)
val eventsBySlicesQuery = GrpcReadJournal(List(WorkspaceEventsProto.javaDescriptor))
ShardedDaemonProcess(system).init(
projectionName,
numberOfProjectionInstances,
{ idx =>
val sliceRange = sliceRanges(idx)
val projectionKey =
s"${eventsBySlicesQuery.streamId}-${sliceRange.min}-${sliceRange.max}"
val projectionId = ProjectionId.of(projectionName, projectionKey)
val sourceProvider = EventSourcedProvider.eventsBySlices[AnyRef](
system = system,
eventsBySlicesQuery = eventsBySlicesQuery,
entityType = eventsBySlicesQuery.streamId,
minSlice = sliceRange.min,
maxSlice = sliceRange.max
)
ProjectionBehavior(
R2dbcProjection.exactlyOnce(
projectionId = projectionId,
settings = None,
sourceProvider = sourceProvider,
handler = () => new RdbcHandler(system)
)
)
}
)
}
Part of conf which I think is relevant:
akka.projection.grpc {
producer {
query-plugin-id = "akka.persistence.r2dbc.query"
}
consumer {
client {
host = "127.0.0.1"
port = 8101
use-tls = false
}
stream-id = "workspace"
}
}
Issue: This configuration works as expected, but when I create multiple event-sourced entities, along with event publisher and projections for each entity, then event publishing doesn’t work. I tried putting different stream IDs or putting wildcard (‘*’), changing streamId in akka.conf, but nothing made the app work for other entities. I always receive dead letter warn.
If I remove stream ID from akka conf then I get:
[info] java.lang.IllegalArgumentException: requirement failed: Configuration property [stream-id] must be an id exposed by the producing side but was undefined on the consuming side.
I also tried changing events by slices query in projection for different entity type, but it didn’t fixed an issue:
val eventsBySlicesQuery =
GrpcReadJournal(
GrpcQuerySettings("Project"),
List(ProjectEventsProto.javaDescriptor))
I think the issue is somewhere between event sourced entity and the event producer but I’m not sure where to look.
Thanks