We are trying to achieve some batch sink after which we could use other operators in a stream using the next code. The buffer would accept some number of inputs, then when a buffer is full it would do some operation on the data and then return it as a list further to process. And when the source finished we want to process what is left in the buffer (using Concat operator). But it looks like Concat doesn’t works is expected with flatMapConcat. Source in Concat is executed first instead of last which cause missing inputs. Is it bug or is there is a workaround to make source in concat to produce last?
IntStream intStream = IntStream.range(0, 55);
Buffer buffer = new Buffer();
Source.fromIterator(intStream::iterator)
.flatMapConcat(val -> Source.from(buffer.write(val)))
.concat(Source.lazySource(() -> Source.from(buffer.flush())))
.map(item -> {
System.out.println(item);
return item;
})
.toMat(Sink.foreach(item -> {
}), Keep.left())
.run(_actorSystem.getMaterializer());
public static class Buffer {
List<Integer> _buffer = new ArrayList<>();
public List<Integer> write(int i) {
_buffer.add(i);
if (_buffer.size() == 10) {
return flush();
}
return List.of();
}
public List<Integer> flush() {
List<Integer> buffer = _buffer;
_buffer = new ArrayList<>();
return buffer;
}
}