Dear board members,
With the release of new kafka integration, I decided to try a simple example that would do the following:
- Consume event from Akka EventStream.
- Publish the event to Kafka
- Accomplish the above through streams and alpakka-kafka, with Java API
I am using the test framework supplied with the module, but I seems not be able to crack it, since whatever I am publishing to the stream is not being consumed from EventStream to be published to Kafka. What am I missing in the code below:
@Test
public void test() {
var topicName = createTopicName(1);
Source<String, ActorRef> sourceActor = Source.actorRef(1000, OverflowStrategy.fail());
sourceActor.mapMaterializedValue(actor -> {
sys.getEventStream()
.subscribe(actor, String.class);
return "ok";
}).runWith(Sink.ignore(), mat);
Sink<ProducerRecord<String, String>, CompletionStage<Done>> producerRecordCompletionStageSink = Producer
.plainSink(producerDefaults());
sourceActor.map(value -> {
System.out.println("Consumed record from event bus. Publishing to kafka");
return new ProducerRecord<String, String>(topicName, value);
}).
runWith(producerRecordCompletionStageSink, mat);
sys.getEventStream().publish("test event");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
}
Any pointers would be highly appreciated. I know it sounds like a very basic question and i feel ashamed in advance for asking it :)