Why doesn't the EventSourcedBehaviorTestKit change state?

  1. I declare the traits Command, Event and Aggregate. All of them extend the CborSerializable. The Aggragate carries the state of entity.
trait CborSerializable

trait Command extends CborSerializable

trait Event extends CborSerializable {
  def stamp: LocalDateTime
}

trait Aggregate extends CborSerializable {
  type State <: Aggregate

  def id: String

  def onCommand(cmd: Command): ReplyEffect[Event, State]

  def onEvent(event: Event): State
}
  1. I declare the object Project, and define the trait Project which extends Aggregate and carries the state of the entity.
object Project {
  private val EntityKey = EntityTypeKey[Command](name = "Project")

  def init(system: ActorSystem[_]): Unit = {
    ClusterSharding(system).init(Entity(EntityKey) { context => Project(context.entityId) })
  }

  def apply(id: String): Behavior[Command] = {
    val starter = Starter(id)

    EventSourcedBehavior
      .withEnforcedReplies[Command, Event, Project](
        persistenceId = PersistenceId(entityTypeHint = EntityKey.name, entityId = id),
        emptyState = starter,
        commandHandler = (state, cmd) => starter.onCommand(cmd),
        eventHandler = (state, event) => starter.onEvent(event)
        )
      .withTagger((event => Set(id, event.getClass.toString)))
      .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3))
      .onPersistFailure(SupervisorStrategy.restartWithBackoff(minBackoff = 200.millis, maxBackoff = 5.seconds, randomFactor = 0.1))
  }

  //region Commands
  type ReplyTo = ActorRef[StatusReply[String]]

  final case class Open(name: String, staff: AssignedContact, replyTo: ReplyTo) extends Command

  final case class AttachDetails(details: Details, replyTo: ReplyTo) extends Command
  //endregion

  //region Events
  type Stamp = LocalDateTime

  final case class Opened(id: String, name: String, staff: AssignedContact, stamp: Stamp) extends Event

  final case class DetailsAttached(details: Details, stamp: Stamp) extends Event
  //endregion

  //region States
  trait Project extends Aggregate {
    type State = Project
    def name: String
    def details: Historic[Details]
  }

  final case class Starter(id: String, name: String = "", details: Historic[Details] = null) extends Project {
    override def onCommand(cmd: Command): ReplyEffect[Event, Project] = {
      cmd match {
        case Open(name, staff, replyTo) =>
          val stamp = LocalDateTime.now()
          Effect.persist(Opened(id, name, staff, stamp))
                .thenReply(replyTo)(p => StatusReply.Success("New project opened."))
        case _                          =>
          Effect.unhandled.thenNoReply()
      }
    }

    override def onEvent(event: Event): Project = {
      event match {
        case Opened(id, name, staff, stamp) => OpenedProject(id, name, Confirmed(current = Details(staff = staff)))
        case e                              => throw UnexpectedEvent(e)
      }
    }
  }

  final case class OpenedProject(id: String, name: String, details: Historic[Details]) extends Project {
    override def onCommand(cmd: Command): ReplyEffect[Event, Project] = {
      cmd match {
        case AttachDetails(details, replyTo)  =>
          Effect.persist(DetailsAttached(details, LocalDateTime.now()))
                .thenReply(replyTo)(p => StatusReply.Success("Project details cached."))
      }

    override def onEvent(event: Event): Project = {
      event match {
        case DetailsAttached(d, _) =>
          val c = this.details.current
          this.copy(details = Pending(current = c, pending = d))
      }
    }
  }
  //endregion
  1. All the value objects are implementated with case class, and extend CborSerializable.
  //region Value Objects
  case class Details(catalog: String = "",
                     address: String = "",
                     period: ProjectPeriod = null,
                     totalRevenue: BigDecimal = 0.0,
                     currentRevenue: BigDecimal = 0.0,
                     background: String = "",
                     summary: String = "",
                     performance: String = "",
                     highlighted: Boolean = false,
                     upper: String = "",
                     newly: Boolean = true,
                     memo: String = "",
                     commencementDate: LocalDate = null,
                     country: AssignedContact = null,
                     duty: AssignedContact = null,
                     proprietor: AssignedContact = null,
                     director: AssignedContact = null,
                     inspector: AssignedContact = null,
                     aidedLeader: AssignedContact = null,
                     staff: AssignedContact = null
                    )

  case class AssignedContact(id: String = TransientId, name: String, mobile: Mobile) extends CborSerializable 
  
  case class Mobile(number: String) extends CborSerializable

  final case class ProjectPeriod(begin: LocalDate, end: LocalDate) extends CborSerializable {
    require(end.isAfter(begin))

    val period: Period = Period.between(begin, end)

    def parting: Period = Period.between(begin, LocalDate.now())

    def remaining: Period = Period.between(LocalDate.now(), end)
  }
  //endregion
}
  1. Even the Historic is abstract, so I append @JsonTypeInfo and @JsonSubTypes to it.
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
  Array(
    new JsonSubTypes.Type(value = classOf[Pending[_]], name = "pending"),
    new JsonSubTypes.Type(value = classOf[Confirmed[_]], name = "confirmed")
    ))
trait Historic[+T] extends CborSerializable {
  def current: T
}

case class Pending[T](current: T, pending: T) extends Historic[T]

case class Confirmed[T](current: T) extends Historic[T]
  1. In the test specification, I define the configuration.
object ProjectSpec {
	val config = ConfigFactory
		.parseString(
			"""
			akka.actor.serialization-bindings {
				"priv.abbey.domain.CborSerializable" = jackson-cbor
			}

	    akka.serialization.jackson.serialization-features {
				WRITE_DATES_AS_TIMESTAMPS = on
				WRITE_DURATIONS_AS_TIMESTAMPS = on
			}
			""")
		.withFallback(EventSourcedBehaviorTestKit.config)
}
  1. The first command test works.
class ProjectSpec extends ScalaTestWithActorTestKit(ProjectSpec.config) with AnyFeatureSpecLike with GivenWhenThen {
	val id = "2023-001"

	val project = EventSourcedBehaviorTestKit[Command, Event, Project](system, Project(id))

	Feature("test fixtures of Project") {
		Scenario("open a new project.") {
			When("send a command Open.")
			val jack   = AssignedContact(name = "Jack", mobile = Mobile("13900010001"))
			val result = project.runCommand[StatusReply[String]](
				replyTo => Open("Construction", jack, replyTo))

			Then("a new project opened, the staff is assigned.")
			result.reply should ===(StatusReply.Success("New project opened."))
			result.stateOfType[OpenedProject].name shouldBe "Construction"
			result.stateOfType[OpenedProject].details should ===(Confirmed(current = Details(staff = jack)))
		}
	}
}
  1. But the second command failed.
val jack   = AssignedContact(name = "Jack", mobile = Mobile("13900010001"))
val result = project.runCommand[StatusReply[String]](
			replyTo => Open("Construction", jack, replyTo))

Then("a new project opened, the staff is assigned.")
result.reply should ===(StatusReply.Success("New project opened."))
result.stateOfType[OpenedProject].name shouldBe "Construction"
result.stateOfType[OpenedProject].details should ===(Confirmed(current = Details(staff = jack)))

When("attach a new details.")
val tom     = AssignedContact(name = "Tom", mobile = Mobile("19901020304"))
val result2 = project.runCommand[StatusReply[String]](
			replyTo => AttachDetails(details = Details(catalog = "Good", staff = tom), replyTo))

Then("the attached details is cached.")
result2.reply should ===(StatusReply.Success("Project details cached."))
result2.stateOfType[OpenedProject].details should ===(Pending(current = Details(catalog = "Good", staff = jack), pending = Details(catalog = "Good", staff = tom)))

====

The logs shows the command Open handled, event Opened persisted, but command AttachDetails can’t be handled.

19:39:34.159 [ScalaTest-run] DEBUG ProjectSpec - Starting ActorTestKit
19:39:34.542 [ScalaTest-run] DEBUG ProjectSpec - Starting ActorTestKit
19:39:34.672 [ScalaTest-run] DEBUG akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal - Using in memory storage [akka.persistence.testkit.journal] for test kit read journal
19:39:35.113 [ProjectSpec-akka.actor.default-dispatcher-8] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Initializing snapshot recovery: Recovery(SnapshotSelectionCriteria(9223372036854775807,9223372036854775807,0,0),9223372036854775807,9223372036854775807)
19:39:35.164 [ProjectSpec-akka.actor.default-dispatcher-3] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Snapshot recovered from 0 Map() VersionVector()
19:39:35.178 [ProjectSpec-akka.actor.default-dispatcher-3] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Replaying events: from: 1, to: 9223372036854775807
19:39:35.236 [ProjectSpec-akka.actor.default-dispatcher-7] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Recovery successful, recovered until sequenceNr: [0]
19:39:35.236 [ProjectSpec-akka.actor.default-dispatcher-7] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Returning recovery permit, reason: replay completed successfully

19:39:35.283 [ProjectSpec-akka.actor.default-dispatcher-7] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Recovery for persistenceId [PersistenceId(Project|2023-001)] took 73.99 ms

When send a command Open.

Then a new project opened, the staff is assigned.

When attach a new details.

19:39:37.182 [ProjectSpec-akka.actor.default-dispatcher-8] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Handled command [priv.abbey.domain.Project$Open], resulting effect: [Persist(priv.abbey.domain.Project$Opened)], side effects: [1]
19:39:37.228 [ProjectSpec-akka.actor.default-dispatcher-7] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Received Journal response: WriteMessagesSuccessful after: 31021100 nanos
19:39:37.232 [ProjectSpec-akka.actor.default-dispatcher-7] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Received Journal response: WriteMessageSuccess(PersistentRepr(Project|2023-001,1,e5b44ade-ef18-4b28-a7d9-576a5b82d345,0,None),1) after: 35388300 nanos
19:39:37.722 [ProjectSpec-akka.actor.default-dispatcher-8] DEBUG akka.persistence.typed.internal.EventSourcedBehaviorImpl - Handled command [priv.abbey.domain.Project$AttachDetails], resulting effect: [Unhandled], side effects: [1]
19:39:37.744 [ProjectSpec-akka.actor.default-dispatcher-8] INFO akka.actor.LocalActorRef - Message [priv.abbey.domain.Project$AttachDetails] to Actor[akka://ProjectSpec/system/test/$a#-766595102] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

The Exception shows:

Missing expected reply for command [AttachDetails(Details(Good,,null,0.0,0.0,,,,false,,true,,null,null,null,null,null,null,null,AssignedContact(Transient,Tom,Mobile(19901020304))),Actor[akka://ProjectSpec/system/testProbe-4#762760000])].

========

The test shows the result has gotten state OpenedProject, but why the entity can’t handle the command AttachDetails later?

So I tried to run command Open again. It works!

I’m suprised that the state didn’t seem to switch to OpenedProject yet.

I don’t know why and how to do then. Please give me more guides, thanks.

Looks like a small oversight, the event handler used is starter.onEvent rather than state.onEvent which is the current state of the entity (and the same is true for the command handler).

Oh, it’s too ashamed. :joy:

I remember it now. I tried to implement the state with Option[Project], but I forgot to change it while refactoring the match statement.

Well, it works now. Thanks.