HI, I have started in prod a lagom system. All works fine. I created about 100 Items but the cassandra event processor stored only 10. So I created a new EventProcoessor. Other offset, other table. I thought all created items would rerun and the database would grow. But I get exception!
One very unclear exception:
[error] akka.actor.OneForOneStrategy [sourceThread=application-akka.actor.default-dispatcher-15, akkaTimestamp=14:13:23.316UTC, akkaSource=akka://application/system/sharding/MainStorageItemEventProcessor/de.****.storageManagement.impl.storageItem.event.ItemEvent1/de.****.storageManagement.impl.storageItem.event.ItemEvent1, sourceActorSystem=application] - Ask timed out on [Actor[akka://application/user/readSideGlobalPrepare-MainStorageItemEventProcessor-singletonProxy#591393528]] after [40000 ms]. Message of type [com.lightbend.lagom.internal.persistence.cluster.ClusterStartupTaskActor$Execute$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://application/user/readSideGlobalPrepare-MainStorageItemEventProcessor-singletonProxy#591393528]] after [40000 ms]. Message of type [com.lightbend.lagom.internal.persistence.cluster.ClusterStartupTaskActor$Execute$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:866)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:864)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
at java.lang.Thread.run(Thread.java:748)
I do not understand this error. I used the online-auction as template for this.
readSide.register[ItemEvent](new MainStorageItemEventProcessor(cassandraSession,cassandraReadSide))
this is called in the serviceImpl!
Here now my readprocessor
class MainStorageItemEventProcessor(session: CassandraSession,
readSide: CassandraReadSide)(implicit ec: ExecutionContext) extends ReadSideProcessor[ItemEvent] {
val logger = Logger(this.getClass)
private val insertItemPromise = Promise[PreparedStatement]
private def insertItem: Future[PreparedStatement] = insertItemPromise.future
private val itemEmbeddedPromise = Promise[PreparedStatement]
private def itemEmbedded = itemEmbeddedPromise.future
private val externalReferenceSetPromise = Promise[PreparedStatement]
private def externalReferenceSet = externalReferenceSetPromise.future
private val itemCommittedPromise = Promise[PreparedStatement]
private def itemCommitted = itemCommittedPromise.future
private val newTransferTargetSetPromise = Promise[PreparedStatement]
private def newTransferTargetSet = newTransferTargetSetPromise.future
private val disposabilityStatusChangedPromise = Promise[PreparedStatement]
private def disposabilityStatusChanged = disposabilityStatusChangedPromise.future
private val transitStatusChangedPromise = Promise[PreparedStatement]
private def transitStatusChanged = transitStatusChangedPromise.future
private val storageStatusChangedPromise = Promise[PreparedStatement]
private def storageStatusChanged = storageStatusChangedPromise.future
private def processInsertItem(event: ItemCreated, entityId: String) = insertItem.map{ps β
val bind = ps.bind()
bind.setString(MainStorageItemEventProcessor.entityId,entityId)
bind.setString(MainStorageItemEventProcessor.externalReference,event.externalReference.orNull)
bind.setString(MainStorageItemEventProcessor.manufactor,event.manufactor.orNull)
bind.setString(MainStorageItemEventProcessor.incomingOrderId,event.orderId.orNull)
bind.setString(MainStorageItemEventProcessor.currentPosition,event.currentPosition)
bind.setInt(MainStorageItemEventProcessor.storageYear, LocalDateTime.ofInstant(event.storageDate,ZoneOffset.UTC).getYear )
bind.setInt(MainStorageItemEventProcessor.storageMonthOfYear, LocalDateTime.ofInstant(event.storageDate,ZoneOffset.UTC).getMonthValue )
bind.setToNull(MainStorageItemEventProcessor.storageStatus)
bind.setToNull(MainStorageItemEventProcessor.transitStatus)
bind.setToNull(MainStorageItemEventProcessor.disposabilityStatus)
bind.setToNull(MainStorageItemEventProcessor.holder)
bind.setToNull(MainStorageItemEventProcessor.weight)
bind.setToNull(MainStorageItemEventProcessor.length)
bind.setToNull(MainStorageItemEventProcessor.height)
bind.setToNull(MainStorageItemEventProcessor.width)
bind.setToNull(MainStorageItemEventProcessor.newTargetKind)
bind.setToNull(MainStorageItemEventProcessor.newTarget)
List(bind)
}
private def processStorageStatusChanged(x: StorageStatusChanged, entityId: String) = storageStatusChanged.map{ps β
val bind = ps.bind()
bind.setString(MainStorageItemEventProcessor.storageStatus, x.newStatus.toString)
bind.setString(MainStorageItemEventProcessor.entityId, entityId)
List(bind)
}
private def processDisposabilityStatusChanged(x: DisposabilityStatusChanged, entityId: String) = disposabilityStatusChanged.map{ps β
val bind = ps.bind()
bind.setString(MainStorageItemEventProcessor.disposabilityStatus, x.newStatus.toString)
bind.setString(MainStorageItemEventProcessor.entityId, entityId)
List(bind)
}
private def processTransitStatusChanged(x: TransitStatusChanged, entityId: String) = transitStatusChanged.map{ps β
val bind = ps.bind()
bind.setString(MainStorageItemEventProcessor.transitStatus, x.newStatus.toString)
bind.setString(MainStorageItemEventProcessor.entityId, entityId)
List(bind)
}
private def processItemCommitted(x: ItemCommitted, entityId: String) = itemCommitted.map{ps β
val bind = ps.bind()
bind.setString(MainStorageItemEventProcessor.holder, x.receivingOperator.id)
bind.setString(MainStorageItemEventProcessor.entityId, entityId)
List(bind)
}
private def processItemEmbedded(x: ItemEmbedded, entityId: String) = itemEmbedded.map{ps β
val bind = ps.bind()
bind.setToNull(MainStorageItemEventProcessor.holder)
bind.setToNull(MainStorageItemEventProcessor.newTarget)
bind.setToNull(MainStorageItemEventProcessor.newTargetKind)
bind.setString(MainStorageItemEventProcessor.currentPosition,x.storagePosition)
bind.setString(MainStorageItemEventProcessor.entityId,entityId)
List(bind)
}
private def processNewTransferTargetSet(x: NewTransferTargetSet, entityId: String) = newTransferTargetSet.map{ps β
val bind = ps.bind()
bind.setString(MainStorageItemEventProcessor.newTarget,x.target.operator.map(_.id).getOrElse(x.target.operator.get.id))
bind.setString(MainStorageItemEventProcessor.newTargetKind,x.targetKind.toString)
bind.setString(MainStorageItemEventProcessor.entityId,entityId)
List(bind)
}
private def processExternalReferenceSet(x: ExternalReferenceSet, entityId: String) = externalReferenceSet.map{ps β
val bind = ps.bind()
bind.setString(MainStorageItemEventProcessor.externalReference,x.externalReference)
bind.setString(MainStorageItemEventProcessor.entityId, entityId)
List(bind)
}
private def prepareStatements(): Future[Done] = {
val insertItemFuture = session.prepare(
s"""
|INSERT INTO ${MainStorageItemEventProcessor.TABLE_NAME}
|(${MainStorageItemEventProcessor.entityId},${MainStorageItemEventProcessor.externalReference},${MainStorageItemEventProcessor.manufactor},${MainStorageItemEventProcessor.incomingOrderId},${MainStorageItemEventProcessor.currentPosition},${MainStorageItemEventProcessor.storageYear},${MainStorageItemEventProcessor.storageMonthOfYear},${MainStorageItemEventProcessor.storageStatus},${MainStorageItemEventProcessor.transitStatus},${MainStorageItemEventProcessor.disposabilityStatus},${MainStorageItemEventProcessor.holder},${MainStorageItemEventProcessor.weight},${MainStorageItemEventProcessor.length},${MainStorageItemEventProcessor.height},${MainStorageItemEventProcessor.width},${MainStorageItemEventProcessor.newTarget},${MainStorageItemEventProcessor.newTargetKind})
|VALUES
|(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""".stripMargin
)
insertItemPromise.completeWith(insertItemFuture)
val itemEmbeddedFuture = session.prepare(
s"""
|UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET
|${MainStorageItemEventProcessor.holder} = ?,
|${MainStorageItemEventProcessor.currentPosition} = ?,
|${MainStorageItemEventProcessor.newTarget} = ?,
|${MainStorageItemEventProcessor.newTargetKind} = ?
|WHERE
|${MainStorageItemEventProcessor.entityId} = ?
""".stripMargin
)
itemEmbeddedPromise.completeWith(itemEmbeddedFuture)
val externalReferenceSetFuture = session.prepare(
s"UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET ${MainStorageItemEventProcessor.externalReference} = ? WHERE ${MainStorageItemEventProcessor.entityId} = ?"
)
externalReferenceSetPromise.completeWith(externalReferenceSetFuture)
val itemCommittedFuture = session.prepare(
s"UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET ${MainStorageItemEventProcessor.holder} = ? WHERE ${MainStorageItemEventProcessor.entityId} = ?"
)
itemCommittedPromise.completeWith(itemCommittedFuture)
val newTransferTargetSetFuture = session.prepare(
s"UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET ${MainStorageItemEventProcessor.newTarget} = ?, ${MainStorageItemEventProcessor.newTargetKind} = ? WHERE ${MainStorageItemEventProcessor.entityId} = ?"
)
newTransferTargetSetPromise.completeWith(newTransferTargetSetFuture)
val disposabilityStatusChangedFuture = session.prepare(
s"UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET ${MainStorageItemEventProcessor.disposabilityStatus} = ? WHERE ${MainStorageItemEventProcessor.entityId} = ?"
)
disposabilityStatusChangedPromise.completeWith(disposabilityStatusChangedFuture)
val transitStatusChangedFuture = session.prepare(
s"UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET ${MainStorageItemEventProcessor.transitStatus} = ? WHERE ${MainStorageItemEventProcessor.entityId} = ?"
)
transitStatusChangedPromise.completeWith(transitStatusChangedFuture)
val storageStatusChangedFuture = session.prepare(
s"UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET ${MainStorageItemEventProcessor.storageStatus} = ? WHERE ${MainStorageItemEventProcessor.entityId} = ?"
)
storageStatusChangedPromise.completeWith(storageStatusChangedFuture)
(for{
_ β insertItemFuture
_ β itemEmbeddedFuture
_ β externalReferenceSetFuture
_ β itemCommittedFuture
_ β newTransferTargetSetFuture
_ β disposabilityStatusChangedFuture
_ β transitStatusChangedFuture
_ β storageStatusChangedFuture
} yield Done).recover{
case t β throw new RuntimeException(s"HILFE: $t",t)
}
}
private def createTable(): Future[Done] = {
for{
_ β session.executeCreateTable(
s"""
|CREATE TABLE IF NOT EXISTS ${MainStorageItemEventProcessor.TABLE_NAME} (
|${MainStorageItemEventProcessor.entityId} TEXT PRIMARY KEY,
|${MainStorageItemEventProcessor.manufactor} TEXT,
|${MainStorageItemEventProcessor.externalReference} TEXT,
|${MainStorageItemEventProcessor.incomingOrderId} TEXT,
|${MainStorageItemEventProcessor.storageStatus} TEXT,
|${MainStorageItemEventProcessor.transitStatus} TEXT,
|${MainStorageItemEventProcessor.disposabilityStatus} TEXT,
|${MainStorageItemEventProcessor.currentPosition} TEXT,
|${MainStorageItemEventProcessor.holder} TEXT,
|${MainStorageItemEventProcessor.weight} INT,
|${MainStorageItemEventProcessor.length} INT,
|${MainStorageItemEventProcessor.height} INT,
|${MainStorageItemEventProcessor.width} INT,
|${MainStorageItemEventProcessor.newTarget} TEXT,
|${MainStorageItemEventProcessor.newTargetKind} TEXT,
|${MainStorageItemEventProcessor.storageYear} INT,
|${MainStorageItemEventProcessor.storageMonthOfYear} INT
|)
""".stripMargin)
_ β {
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.manufactor})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.externalReference})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.incomingOrderId})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.storageStatus})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.transitStatus})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.disposabilityStatus})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.currentPosition})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.holder})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.weight})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.length})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.height})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.width})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.newTarget})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.newTargetKind})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.storageYear})")
session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.storageMonthOfYear})")
}
} yield Done
}
override def buildHandler(): ReadSideProcessor.ReadSideHandler[ItemEvent] = {
readSide.builder[ItemEvent](MainStorageItemEventProcessor.OFFSET_TABLE)
.setGlobalPrepare(() β createTable())
.setPrepare(_ β prepareStatements())
.setEventHandler[ItemCreated](a β processInsertItem(a.event,a.entityId))
.setEventHandler[StorageStatusChanged](a β processStorageStatusChanged(a.event,a.entityId))
.setEventHandler[DisposabilityStatusChanged](a β processDisposabilityStatusChanged(a.event,a.entityId) )
.setEventHandler[TransitStatusChanged](a β processTransitStatusChanged(a.event,a.entityId))
.setEventHandler[ItemCommitted](a β processItemCommitted(a.event,a.entityId))
.setEventHandler[ItemEmbedded](a β processItemEmbedded(a.event,a.entityId) )
.setEventHandler[NewTransferTargetSet](a β processNewTransferTargetSet(a.event,a.entityId))
.setEventHandler[ExternalReferenceSet](a β processExternalReferenceSet(a.event,a.entityId))
.build()
}
override def aggregateTags: Set[AggregateEventTag[ItemEvent]] = ItemEvent.Tag.allTags
}
Is their someone who can help me?