Hello!
Currently we only have one aggregate in our system (call it aggregate A), and one guardian actor. Everything works there. We want to add another Aggregate (call it aggregate B) and a projection that handles events from AggregateB. Note we are using Akka persistence with CQRS here.
My question is should we initialize a new ShardedDaemonProcess for the new projection? If so, should they use different processors? As an aside, should my aggregates (and thus my projections) run off of different Guardian actors and seed nodes? I’ve attached my proposed code for AggregateBProjection, I’m wondering if it breaks down anywhere, as I’m a little confused.
Aggregate A Projection
public class AggregateAProjection {
public static void init(ActorSystem<?> system) {
if (Cluster.get(system).selfMember().hasRole("read-model")) {
ClusterShardingSettings shardingSettings = ClusterShardingSettings.create(system);
ShardedDaemonProcessSettings shardedDaemonProcessSettings =
ShardedDaemonProcessSettings.create(system)
.withShardingSettings(shardingSettings.withRole("read-model"));
// Allows us to run N actors, where N = parallelism
ShardedDaemonProcess.get(system).init(
ProjectionBehavior.Command.class,
"AggregateAProcessors",
4,
index -> ProjectionBehavior.create(createProjectionFor(system, index)),
shardedDaemonProcessSettings,
Optional.of(ProjectionBehavior.stopMessage())
);
}
static AtLeastOnceCassandraProjection<EventEnvelope<Message>> createProjectionFor(
ActorSystem<?> system,
int index) {
String tag = "aggregateA-slice-" + index;
SourceProvider<Offset, EventEnvelope<Message>> sourceProvider = EventSourcedProvider
.eventsByTag(
system,
CassandraReadJournal.Identifier(),
tag
);
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);
return CassandraProjection.atLeastOnce(
ProjectionId.of("event_projections", tag),
sourceProvider,
new AggregateAHandler(system, tag)
).withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
}
}
AggregateB Projection
public class AggregateBProjection {
public static void init(ActorSystem<?> system) {
if (Cluster.get(system).selfMember().hasRole("read-model")) {
ClusterShardingSettings shardingSettings = ClusterShardingSettings.create(system);
ShardedDaemonProcessSettings shardedDaemonProcessSettings =
ShardedDaemonProcessSettings.create(system)
.withShardingSettings(shardingSettings.withRole("read-model"));
// Allows us to run N actors, where N = parallelism
ShardedDaemonProcess.get(system).init(
ProjectionBehavior.Command.class,
"AggregateBProcessors",
4,
index -> ProjectionBehavior.create(createProjectionFor(system, index)),
shardedDaemonProcessSettings,
Optional.of(ProjectionBehavior.stopMessage())
);
}
static AtLeastOnceCassandraProjection<EventEnvelope<Message>> createProjectionFor(
ActorSystem<?> system,
int index) {
String tag = "aggregateB-slice-" + index;
SourceProvider<Offset, EventEnvelope<Message>> sourceProvider = EventSourcedProvider
.eventsByTag(
system,
CassandraReadJournal.Identifier(),
tag
);
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);
return CassandraProjection.atLeastOnce(
ProjectionId.of("event_projections", tag),
sourceProvider,
new AggregateBHandler(system, tag)
).withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
}
}