I have written a GraphDsl Java stream which uses an Alpakka plain source with auto-commit. The aim is to add functionality to shutdown the source and stream gracefully without loosing the consumed records
Problem:
While using the GraphDSL I am not able to materialize the Consumer.Drainingcontrol to call the drain&Shutdown
I am only able to obtain the Consumer.Control which has stop and shutdown methods.
I have used the Consumer.Control materialize to gracefully stop the source
- Invoke the Consumer.Control.stop()
- Then Invoke the Consumer.Control.shutdown()
The above approach is dropping the records at the source itself as I see from the logs that which ever elements that are consumed by the flow after the source are processed
Tried different ways to get the Consumer.DrainingControl from the stream but unfortunately I am not able to.
PsudoCode I am running below for reference:
final Source<ConsumerRecord<String, String>, Consumer.Control> kafkaMessages = Consumer.plainSource( consumerSettings, Subscriptions.topics(kafkaTopic1,kafkaTopic2) ).via( killSwitch.flow());
RunnableGraph<Consumer.Control> result = RunnableGraph.fromGraph(
GraphDSL.create( kafkaMessages,
(builder, out) -> {
UniformFanOutShape<Pair, Pair> partitions =
builder.add(
Partition.create( concurrency, element -> getPartitionNumber( concurrency, streamType ) ) );
UniformFanInShape<Pair, Pair> output =
builder.add( MergeSequence.create( concurrency, Pair::second ) );
builder
.from( out )
.via( builder.add( CustomBusinessLogic ) )
.viaFanOut( partitions );
for( int i =0; i < concurrency; i++){
builder
.from( partitions.out( i ) )
.via( builder.add( BusinessLogic() ) )
.viaFanIn( output );
}
builder
.from(output.out())
.to(
builder.add(
Flow.<Pair>create()
.filter( element -> element.first() != null )
.map( element -> ProducerMessage.multi( (List<ProducerRecord<String, String>>) element.first()) )
.via( Producer.flexiFlow(producerSettings) )
.to( Sink.last() )
));
return ClosedShape.getInstance();
}));
Consumer.Control control = result.run(materializer);
// ToShutdown, used following as the flow
control.stop
control.shudown
killSwithch.shutdown
I observe that the CompletionStage returned by the stop method completes instantaneously and the shutdown stage sometimes does not complete at all
Please provide any suggestions or pointers to some documents to achieve a proper shutdown without loosing the source records