Sure, I’ll just have to make some of the names generic and cut out some of the fields, but here’s the Behavior:
class Behavior private constructor(
private val id: String,
private val tag: String,
private val customCommandHandlerBuilder: CustomCommandHandlerBuilder,
) :
EventSourcedBehavior<Command, Event, State>(
PersistenceId.of(ENTITY_KEY.name(), id),
SupervisorStrategy.restartWithBackoff(Duration.ofMillis(200), Duration.ofSeconds(5), 0.1)
) {
...
override fun commandHandler(): CommandHandler<Command, Event, State> {
return customCommandHandlerBuilder
.withCommandHandlerBuilderBuilder { newCommandHandlerBuilder() }
.withEffect { Effect() }
.withId(id)
.build()
}
...
}
It’s initialized via a Spring Component:
@Component
class Initialize(...) : AkkaCluster.InitializeBehavior {
override fun initBehavior(actorSystem: ActorSystem<*>) {
val customCommandHandlerBuilder = CustomCommandHandlerBuilder()
.withClock(Clock.systemUTC())
.withClusterSharding(ClusterSharding.get(actorSystem))
...
ClusterSharding.get(actorSystem).init(
Entity.of(ENTITY_KEY) {
create(it.entityId, customCommandHandlerBuilder)
})
}
fun create(
entityId: String,
customCommandHandlerBuilder: CustomCommandHandlerBuilder,
): Behavior<Command> {
val index = abs(entityId.hashCode().rem(TAGS.size))
val tag = TAGS[index]
return Behavior(entityId, tag, customCommandHandlerBuilder)
}
}
As far as I can tell, initBehavior
here is only executed once at startup, but let me know if you want to see more details about how it’s called.
The CustomCommandHandlerBuilder
where the thenRun
is called:
class CustomCommandHandlerBuilder {
private lateinit var id: String
private lateinit var effect: () -> EffectFactories<Event, State>
private lateinit var clusterSharding: ClusterSharding
private lateinit var commandHandlerBuilder: () -> CommandHandlerBuilder<Command, Event, State>
fun withCommandHandlerBuilderBuilder(commandHandlerBuilder: () -> CommandHandlerBuilder<Command, Event, State>): CustomCommandHandlerBuilder {
this.commandHandlerBuilder = commandHandlerBuilder
return this
}
fun withClusterSharding(clusterSharding: ClusterSharding): CustomCommandHandlerBuilder {
this.clusterSharding = clusterSharding
return this
}
fun withEffect(effectFactory: () -> EffectFactories<Event, State>): CustomCommandHandlerBuilder {
this.effect = effectFactory
return this
}
fun withId(id: String): CustomCommandHandlerBuilder {
this.id = id
return this
}
fun build(): CommandHandler<Command, Event, State> {
return notInitializedCommandHandlerBuilder()
.orElse(anotherCommandHandler())
...
.build()
private fun notInitializedCommandHandlerBuilder(): CommandHandlerBuilderByState<Command, Event, NotInitialized, State> =
commandHandlerBuilder()
.forStateType(NotInitialized::class.java)
.onCommand(Command1::class.java, ::onCommand)
private fun onCommand(cmd: Command1): ReplyEffect<Event, State> {
val events = mutableListOf<Event>(
Event.Event1(createdDate = createdDate, ...),
Event.Event2(attr = "value", ...),
)
return effect().persist(events)
.thenRun {
logger.info("Command received by $id: $cmd. Events: $events")
dao.insert(
History(
id = id,
...
)
)
}
.thenReply(cmd.sender) {
Command1.Succeeded.INSTANCE
}
}