I am unable to get events by tags, I am using akka 2.5.23
, akka-persistence-cassandra 0.98
, and Cassandra 3.6
This is part of my config file (I removed persistence snapshot config for brevity):
cassandra {
host = "localhost"
host = ${?CASSANDRA_HOST}
port = 9042
port = ${?CASSANDRA_PORT}
number-of-retries = 500
}
cassandra-journal {
contact-points = [${cassandra.host}]
port = ${cassandra.port}
tags = {
account = 1
accountcreated = 2
}
event-adapters{
proto = "MyProtobufAdapter"
}
event-adapter-bindings{
"com.google.protobuf.Message" = proto
}
}
akka {
actor {
serializers {
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"com.google.protobuf.Message" = proto
}
}
persistence {
journal.plugin = "cassandra-journal"
}
}
The events are successfully persisted in Cassandra:
cqlsh:akka> select persistence_id, tags from messages;
persistence_id | tags
--------------------------------------+-------------------------------
b34f6158-b693-44af-a01d-be4f9e0463d9 | {'account', 'accountcreated'}
I can successfully read all persistenceIds
:
import akka.persistence.query.{EventEnvelope, PersistenceQuery, Sequence}
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
val journal: CassandraReadJournal = PersistenceQuery(system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
journal.persistenceIds.runWith(Sink.foreach(println(_)))
But when I try to get a live stream of events by tags, it is empty, but as you can see from the cqlsh
output, I have tagged messages.
journal.eventsByTag("account", Sequence(0L)).runWith(Sink.foreach(envelope => println(envelope.persistenceId)))
I am unable to find the problem, please help.