Beginner question regarding splitWhen/groupBy followed by fold

Hey guys,

I’m new to Akka Streams and I’m encountering unexpected behaviour. My question is probably quite silly - but I really don’t understand what I’m seeing right now…

I’m trying to group a couple of simple data objects by one of their members. A very simple example would be grouping a range of numbers in even and uneven numbers.

So for example for range (1 … 10) I’d expect the two lists/collections: [1, 3, 5, 7, 9] and [2, 4, 6, 8, 10].

I’m trying to implement it like this:

Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    .groupBy(Integer.MAX_VALUE, i -> i % 2 == 0)
    .fold(new ArrayList<Integer>(), (e, f) -> {e.add(f); return f;})
    .mergeSubstreams()
    .runWith(Sink.seq(), mat);

Instead of the expected two distinct lists, I get the two lists that contain all elements: [2, 1, 3, 4, 5, 6, 7, 8, 9, 10] and [2, 1, 3, 4, 5, 6, 7, 8, 9, 10]

My question at this point is: Why do both lists contain all elements?
Bonus question: Why are the first two elements of the lists out of order?

One more detail: At first I suspected that I misunderstood groupBy or splitWhen. However when I use
reduce( (e, f) -> e + f)
instead of fold I get the expected result [25, 30] where reduce was run once for even numbers and once for uneven numbers. Why does fold seemingly behave so differently then?

Any help is greatly appreciated!

Edit: I have found another clue: In fold I create a mutable list as “zero”. Apparently both SubSources use the same list instance which explains the odd behaviour. To fix that I would have to provide ArrayList::new rather than creating the instance myself. Unfortunately there is no method for that.

Your observation is correct, two Fold stages are created, but they share the same mutable collection.

On the Scala side of things we pretty much always use immutable collections, which is a bit harder in Java as there are no such collections in the standard library. Even in Scala a factory based zero could be useful though.

I have created an issue to discuss if we should add support for this or at least clarify docs: https://github.com/akka/akka/issues/24971

Thanks Johan :)

I’m curious how the discussion will go!