Hi, I have a project that is supposed to publish all events written into a Cassandra table (events
). Currently, the service that’s supposed to write the events is not up and running yet. So I’m trying to test my publisher by writing fake events into the database and trying to get Akka Persistence Query (Cassandra) to pick them up and publish them.
The journal read looks like this:
val readJournal: CassandraReadJournal with EventsByTagQuery = PersistenceQuery(system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
readJournal
.eventsByTag(settings.publisher.eventTag, startFromOffset)
.map { envelope: EventEnvelope =>
publisher ! PublishEvent(envelope)
}
.runWith(Sink.ignore)
To my understanding, Akka Persistence Query polls my keyspace.tag_views
table and should publish whatever records it finds in there. But, when I manually insert rows into tag_views
, they are not showing up in the readJournal.eventsByTag
stream, and are not getting published.
Can I get some advice on what conditions I need to fulfill to ‘trick’ the persistence query journal into finding and picking up my event? For reference, the tag_views
table looks like this:
cassandra@cqlsh> describe table keyspace.tag_views;
CREATE TABLE keyspace.tag_views (
tag_name text,
timebucket bigint,
timestamp timeuuid,
persistence_id text,
tag_pid_sequence_nr bigint,
event blob,
event_manifest text,
meta blob,
meta_ser_id int,
meta_ser_manifest text,
sequence_nr bigint,
ser_id int,
ser_manifest text,
writer_uuid text,
PRIMARY KEY ((tag_name, timebucket), timestamp, persistence_id, tag_pid_sequence_nr)
) WITH CLUSTERING ORDER BY (timestamp ASC, persistence_id ASC, tag_pid_sequence_nr ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'bucket_high': '1.5', 'bucket_low': '0.5', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'enabled': 'true', 'max_threshold': '32', 'min_sstable_size': '50', 'min_threshold': '4', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'unchecked_tombstone_compaction': 'false'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
So far, I’ve just inserted a couple of records into this table, making sure the tag_name
field is set to the same value as settings.publisher.eventTag
in the code above.