- I declare the traits
Command
,Event
andAggregate
. All of them extend the CborSerializable. TheAggragate
carries the state of entity.
trait CborSerializable
trait Command extends CborSerializable
trait Event extends CborSerializable {
def stamp: LocalDateTime
}
trait Aggregate extends CborSerializable {
type State <: Aggregate
def id: String
def onCommand(cmd: Command): ReplyEffect[Event, State]
def onEvent(event: Event): State
}
- I declare the object Project, and define the
trait Project
which extends Aggregate and carries the state of the entity.
object Project {
private val EntityKey = EntityTypeKey[Command](name = "Project")
def init(system: ActorSystem[_]): Unit = {
ClusterSharding(system).init(Entity(EntityKey) { context => Project(context.entityId) })
}
def apply(id: String): Behavior[Command] = {
val starter = Starter(id)
EventSourcedBehavior
.withEnforcedReplies[Command, Event, Project](
persistenceId = PersistenceId(entityTypeHint = EntityKey.name, entityId = id),
emptyState = starter,
commandHandler = (state, cmd) => starter.onCommand(cmd),
eventHandler = (state, event) => starter.onEvent(event)
)
.withTagger((event => Set(id, event.getClass.toString)))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3))
.onPersistFailure(SupervisorStrategy.restartWithBackoff(minBackoff = 200.millis, maxBackoff = 5.seconds, randomFactor = 0.1))
}
//region Commands
type ReplyTo = ActorRef[StatusReply[String]]
final case class Open(name: String, staff: AssignedContact, replyTo: ReplyTo) extends Command
final case class AttachDetails(details: Details, replyTo: ReplyTo) extends Command
//endregion
//region Events
type Stamp = LocalDateTime
final case class Opened(id: String, name: String, staff: AssignedContact, stamp: Stamp) extends Event
final case class DetailsAttached(details: Details, stamp: Stamp) extends Event
//endregion
//region States
trait Project extends Aggregate {
type State = Project
def name: String
def details: Historic[Details]
}
final case class Starter(id: String, name: String = "", details: Historic[Details] = null) extends Project {
override def onCommand(cmd: Command): ReplyEffect[Event, Project] = {
cmd match {
case Open(name, staff, replyTo) =>
val stamp = LocalDateTime.now()
Effect.persist(Opened(id, name, staff, stamp))
.thenReply(replyTo)(p => StatusReply.Success("New project opened."))
case _ =>
Effect.unhandled.thenNoReply()
}
}
override def onEvent(event: Event): Project = {
event match {
case Opened(id, name, staff, stamp) => OpenedProject(id, name, Confirmed(current = Details(staff = staff)))
case e => throw UnexpectedEvent(e)
}
}
}
final case class OpenedProject(id: String, name: String, details: Historic[Details]) extends Project {
override def onCommand(cmd: Command): ReplyEffect[Event, Project] = {
cmd match {
case AttachDetails(details, replyTo) =>
Effect.persist(DetailsAttached(details, LocalDateTime.now()))
.thenReply(replyTo)(p => StatusReply.Success("Project details cached."))
}
override def onEvent(event: Event): Project = {
event match {
case DetailsAttached(d, _) =>
val c = this.details.current
this.copy(details = Pending(current = c, pending = d))
}
}
}
//endregion
- All the value objects are implementated with case class, and extend
CborSerializable
.
//region Value Objects
case class Details(catalog: String = "",
address: String = "",
period: ProjectPeriod = null,
totalRevenue: BigDecimal = 0.0,
currentRevenue: BigDecimal = 0.0,
background: String = "",
summary: String = "",
performance: String = "",
highlighted: Boolean = false,
upper: String = "",
newly: Boolean = true,
memo: String = "",
commencementDate: LocalDate = null,
country: AssignedContact = null,
duty: AssignedContact = null,
proprietor: AssignedContact = null,
director: AssignedContact = null,
inspector: AssignedContact = null,
aidedLeader: AssignedContact = null,
staff: AssignedContact = null
)
case class AssignedContact(id: String = TransientId, name: String, mobile: Mobile) extends CborSerializable
case class Mobile(number: String) extends CborSerializable
final case class ProjectPeriod(begin: LocalDate, end: LocalDate) extends CborSerializable {
require(end.isAfter(begin))
val period: Period = Period.between(begin, end)
def parting: Period = Period.between(begin, LocalDate.now())
def remaining: Period = Period.between(LocalDate.now(), end)
}
//endregion
}
- Even the
Historic
is abstract, so I append@JsonTypeInfo
and@JsonSubTypes
to it.
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
Array(
new JsonSubTypes.Type(value = classOf[Pending[_]], name = "pending"),
new JsonSubTypes.Type(value = classOf[Confirmed[_]], name = "confirmed")
))
trait Historic[+T] extends CborSerializable {
def current: T
}
case class Pending[T](current: T, pending: T) extends Historic[T]
case class Confirmed[T](current: T) extends Historic[T]
- In the test specification, I define the configuration.
object ProjectSpec {
val config = ConfigFactory
.parseString(
"""
akka.actor.serialization-bindings {
"priv.abbey.domain.CborSerializable" = jackson-cbor
}
akka.serialization.jackson.serialization-features {
WRITE_DATES_AS_TIMESTAMPS = on
WRITE_DURATIONS_AS_TIMESTAMPS = on
}
""")
.withFallback(EventSourcedBehaviorTestKit.config)
}
- The first command test works.
class ProjectSpec extends ScalaTestWithActorTestKit(ProjectSpec.config) with AnyFeatureSpecLike with GivenWhenThen {
val id = "2023-001"
val project = EventSourcedBehaviorTestKit[Command, Event, Project](system, Project(id))
Feature("test fixtures of Project") {
Scenario("open a new project.") {
When("send a command Open.")
val jack = AssignedContact(name = "Jack", mobile = Mobile("13900010001"))
val result = project.runCommand[StatusReply[String]](
replyTo => Open("Construction", jack, replyTo))
Then("a new project opened, the staff is assigned.")
result.reply should ===(StatusReply.Success("New project opened."))
result.stateOfType[OpenedProject].name shouldBe "Construction"
result.stateOfType[OpenedProject].details should ===(Confirmed(current = Details(staff = jack)))
}
}
}
- But the second command failed.
val jack = AssignedContact(name = "Jack", mobile = Mobile("13900010001"))
val result = project.runCommand[StatusReply[String]](
replyTo => Open("Construction", jack, replyTo))
Then("a new project opened, the staff is assigned.")
result.reply should ===(StatusReply.Success("New project opened."))
result.stateOfType[OpenedProject].name shouldBe "Construction"
result.stateOfType[OpenedProject].details should ===(Confirmed(current = Details(staff = jack)))
When("attach a new details.")
val tom = AssignedContact(name = "Tom", mobile = Mobile("19901020304"))
val result2 = project.runCommand[StatusReply[String]](
replyTo => AttachDetails(details = Details(catalog = "Good", staff = tom), replyTo))
Then("the attached details is cached.")
result2.reply should ===(StatusReply.Success("Project details cached."))
result2.stateOfType[OpenedProject].details should ===(Pending(current = Details(catalog = "Good", staff = jack), pending = Details(catalog = "Good", staff = tom)))
====
The logs shows the command Open
handled, event Opened
persisted, but command AttachDetails
can’t be handled.
19:39:34.159 [ScalaTest-run] DEBUG ProjectSpec - Starting ActorTestKit
19:39:34.542 [ScalaTest-run] DEBUG ProjectSpec - Starting ActorTestKit
19:39:34.672 [ScalaTest-run] DEBUG akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal - Using in memory storage [akka.persistence.testkit.journal] for test kit read journal
19:39:35.113 [ProjectSpec-akka.actor.default-dispatcher-8] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Initializing snapshot recovery: Recovery(SnapshotSelectionCriteria(9223372036854775807,9223372036854775807,0,0),9223372036854775807,9223372036854775807)
19:39:35.164 [ProjectSpec-akka.actor.default-dispatcher-3] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Snapshot recovered from 0 Map() VersionVector()
19:39:35.178 [ProjectSpec-akka.actor.default-dispatcher-3] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Replaying events: from: 1, to: 9223372036854775807
19:39:35.236 [ProjectSpec-akka.actor.default-dispatcher-7] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Recovery successful, recovered until sequenceNr: [0]
19:39:35.236 [ProjectSpec-akka.actor.default-dispatcher-7] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Returning recovery permit, reason: replay completed successfully
19:39:35.283 [ProjectSpec-akka.actor.default-dispatcher-7] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Recovery for persistenceId [PersistenceId(Project|2023-001)] took 73.99 ms
When send a command Open.
Then a new project opened, the staff is assigned.
When attach a new details.
19:39:37.182 [ProjectSpec-akka.actor.default-dispatcher-8] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Handled command [priv.abbey.domain.Project$Open], resulting effect: [Persist(priv.abbey.domain.Project$Opened)], side effects: [1]
19:39:37.228 [ProjectSpec-akka.actor.default-dispatcher-7] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Received Journal response: WriteMessagesSuccessful after: 31021100 nanos
19:39:37.232 [ProjectSpec-akka.actor.default-dispatcher-7] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Received Journal response: WriteMessageSuccess(PersistentRepr(Project|2023-001,1,e5b44ade-ef18-4b28-a7d9-576a5b82d345,0,None),1) after: 35388300 nanos
19:39:37.722 [ProjectSpec-akka.actor.default-dispatcher-8] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Handled command [priv.abbey.domain.Project$AttachDetails], resulting effect: [Unhandled], side effects: [1]
19:39:37.744 [ProjectSpec-akka.actor.default-dispatcher-8] INFO akka.actor.LocalActorRef - Message [priv.abbey.domain.Project$AttachDetails] to Actor[akka://ProjectSpec/system/test/$a#-766595102] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
The Exception shows:
Missing expected reply for command [AttachDetails(Details(Good,,null,0.0,0.0,,,,false,,true,,null,null,null,null,null,null,null,AssignedContact(Transient,Tom,Mobile(19901020304))),Actor[akka://ProjectSpec/system/testProbe-4#762760000])].
========
The test shows the result
has gotten state OpenedProject
, but why the entity can’t handle the command AttachDetails
later?
So I tried to run command Open
again. It works!
I’m suprised that the state didn’t seem to switch to OpenedProject yet.
I don’t know why and how to do then. Please give me more guides, thanks.