Hi al
I have the following test:
final class DetectorSpec extends BddSpec {
private val sap = Container.sap()
private val kafka = Container.kafka()
sap.start()
kafka.start()
override def afterAll(): Unit = {
sap.stop()
kafka.stop()
}
private def withKafkaAndSapOnline(testCode: TestProbe[ServerEvent] => Unit)
: Unit = {
val config = ConfigFactory.parseString(
s"""akka.actor.default-dispatcher = {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
akka.actor.testkit.typed.single-expect-default = 0s
akka.loglevel = DEBUG
kafka {
servers = "${kafka.getBootstrapServers}"
zookeeper = "${kafka.getMappedPort(2181)}"
}
sap {
server = "ws://${sap.getContainerIpAddress}:${sap.getMappedPort(8080)}"
}""")
val testKit = ActorTestKit("DetectorSystem1", config)
testKit.spawn(DetectorSupervisor.create(), "DetectorSupervisor")
val inbox = testKit.createTestProbe[ServerEvent]("Receiver")
testKit.system.receptionist ! Receptionist.Register(ServerStateKey, inbox.ref)
testCode(inbox)
testKit.shutdownTestKit()
}
private def withKafkaAndSapOffline(testCode: (TestProbe[ServerEvent], TestProbe[ServerEvent]) => Unit)
: Unit = {
val config = ConfigFactory.parseString(
s"""akka.actor.default-dispatcher = {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
akka.actor.testkit.typed.single-expect-default = 0s
akka.loglevel = DEBUG
kafka {
servers = "PLAINTEXT://localhost:9092"
zookeeper = "2181"
}
sap {
server = "ws://127.0.0.1"
}""")
val testKit = ActorTestKit("DetectorSystem2", config)
testKit.spawn(DetectorSupervisor.create(), "DetectorSupervisor")
val inbox1 = testKit.createTestProbe[ServerEvent]("Receiver1")
val inbox2 = testKit.createTestProbe[ServerEvent]("Receiver2")
testKit.system.receptionist ! Receptionist.Register(ServerStateKey, inbox1.ref)
testKit.system.receptionist ! Receptionist.Register(ServerStateKey, inbox2.ref)
testCode(inbox1, inbox2)
testKit.shutdownTestKit()
}
private def withKafkaOfflineSapOnline(testCode: TestProbe[ServerEvent] => Unit)
: Unit = {
val config = ConfigFactory.parseString(
s"""akka.actor.default-dispatcher = {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
akka.actor.testkit.typed.single-expect-default = 0s
akka.loglevel = DEBUG
kafka {
servers = "PLAINTEXT://localhost:9092"
zookeeper = "2181"
}
sap {
server = "ws://${sap.getContainerIpAddress}:${sap.getMappedPort(8080)}"
}""")
val testKit = ActorTestKit("DetectorSystem3", config)
val inbox = testKit.createTestProbe[ServerEvent]("Receiver")
testKit.spawn(DetectorSupervisor.create(), "DetectorSupervisor")
testKit.system.receptionist ! Receptionist.Register(ServerStateKey, inbox.ref)
testCode(inbox)
testKit.shutdownTestKit()
}
private def withKafkaOnlineSapOffline(testCode: TestProbe[ServerEvent] => Unit)
: Unit = {
val config = ConfigFactory.parseString(
s"""akka.actor.default-dispatcher = {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
akka.actor.testkit.typed.single-expect-default = 0s
akka.loglevel = DEBUG
kafka {
servers = "${kafka.getBootstrapServers}"
zookeeper = "${kafka.getMappedPort(2181)}"
}
sap {
server = "ws://127.0.0.1:8080"
}""")
val testKit = ActorTestKit("DetectorSystem4", config)
testKit.spawn(DetectorSupervisor.create(), "DetectorSupervisor")
val inbox = testKit.createTestProbe[ServerEvent]("Receiver")
testKit.system.receptionist ! Receptionist.Register(ServerStateKey, inbox.ref)
testCode(inbox)
testKit.shutdownTestKit()
}
feature("Detect Kafka and SAP availability") {
info("As a technical user, I want to be notified in real time, if Kafka and SAP is up and running or not.")
scenario("SAP and Kafka are available") {
withKafkaAndSapOnline { inbox =>
Given("I am waiting for the current state message")
When("I am receive the state message")
Then("it should contain `SAP and Kafka are online`")
inbox.fishForMessage(5.second){
case ServerOfflineApproved =>
FishingOutcomes.continue
case ServerOnlineApproved =>
FishingOutcomes.complete
case _ =>
FishingOutcomes.fail("Unexpected message")
}
}
}
scenario("SAP is online and Kafka is offline") {
withKafkaOfflineSapOnline { inbox =>
Given("I am waiting for the current state message")
When("I am receive the state message")
Then("it should contain `Kafka is offline`")
inbox.fishForMessage(5.second){
case ServerOfflineApproved =>
FishingOutcomes.complete
case _ =>
FishingOutcomes.fail("Unexpected message")
}
}
}
scenario("SAP is offline and Kafka is online") {
withKafkaOnlineSapOffline { inbox =>
Given("I am waiting for the current state message")
When("I am receive the state message")
Then("it should contain `SAP is offline`")
inbox.fishForMessage(5.second){
case ServerOfflineApproved =>
FishingOutcomes.complete
case _ =>
FishingOutcomes.fail("Unexpected message")
}
}
}
scenario("SAP and Kafka are offline") {
withKafkaAndSapOffline { (inbox1, inbox2) =>
Given("I am registering two listeners")
When("I am receive the state message")
Then("it should contain `Kafka and SAP are offline`")
inbox1.fishForMessage(5.second){
case ServerOfflineApproved =>
FishingOutcomes.complete
case _ =>
FishingOutcomes.fail("Unexpected message")
}
inbox2.fishForMessage(5.second){
case ServerOfflineApproved =>
FishingOutcomes.complete
case _ =>
FishingOutcomes.fail("Unexpected message")
}
}
}
}
}
As you can see after every test, it will shutdown the ActorTestKit
. But sometimes it freezes, that means, I’ve got the following message:
[DEBUG] [07/19/2019 20:37:57.788] [DetectorSystem3-akka.actor.default-blocking-io-dispatcher-11] [akka://DetectorSystem3/system/IO-TCP/selectors/$a/0] Attempting connection to [localhost/127.0.0.1:32846]
[DEBUG] [07/19/2019 20:37:57.789] [DetectorSystem3-akka.io.pinned-dispatcher-2] [akka://DetectorSystem3/system/IO-TCP/selectors/$a/0] Connection established to [localhost:32846]
[INFO] [07/19/2019 20:37:58.807] [ScalaTest-run-running-DetectorSpec] [akka://DetectorSystem3/user/DetectorSupervisor/KafkaActor/KafkaStreamer] !!!!!!!!!!!!!!!!!!!!! Shutdown KafkaDetectorActor !!!!!!!!!!!!!!!!!!!!!
[INFO] [07/19/2019 20:37:58.810] [ScalaTest-run-running-DetectorSpec] [akka://DetectorSystem3/user/DetectorSupervisor/SapActor/SapStreamer] !!!!!!!!!!!!!!!!!!!!! Shutdown SapDetectorActor !!!!!!!!!!!!!!!!!!!!!
and the ActorTestKit
never shuts down.
Here is the evidence:
It is just running and running… How to stop it?
The source code is hosted on https://gitlab.com/bifunctor/plugger
Thanks