Keeping an object's context/metadata after passing through CSVParsing.lineScanner

Hi,

I am relatively new to Akka Streams and Alpakka and would like some advice on how to handle a problem I’m facing -

I have an object that contains a CSV String and some other metadata. I’d like to pass the String in this Java object through the Alpakka CSVParsing flow and then use the output CSV Map to populate other fields in the object. This is what the Flow currently looks like:

Flow.of(MyObject.class)
.map(MyObject::getCSVString)
.via(CsvParsing.lineScanner(.....)) 
.via(CsvToMap.toMapAsStrings(.....))
.via(....) // other operations for which I'd like to keep the metadata in the original MyObject element

The problem is that once I map my object to the string csv files, I lose the other data in MyObject object. Is there a way to keep the data in the object while passing the string in it through the CsvParsing flow?

I’ve looked into SourceWithContext and FlowWithContext which seem helpful but I’m not fully sure how to use it in this situation.

Please let me know if you have any tips/advice. Thank you!

To be honest I’m not totally sure what you want to achive but I try to help :smiley:

If your object has a String field with a full csv:
When you map your objects there will be an 1-n relationship between the object and the csv lines. That means you can’t really tell which Map is for which object, and if you do some decoupling like (object, lines), you will have a List[(object, line)] which is I think definitely not the way you want to go.
If this is the case I would start a substream for every element like:

source.mapAsync(4)(obj => csvParse(obj.getCSVString).runWith(...).map(csvMap => obj.fixupWithCsvMap(csvMap)))

val csvParse(s: String): Source[Map[String,String]] =
  Source.single(s).via(CsvParsing.lineScanner(.....)).via(CsvToMap.toMapAsStrings(.....))

If your object has only one csv line in it, so the data has 1-1 nature you can do broadcast and zip using the GraphDSL:

broadcast.out1 ~> zip.in1
broadcast.out2 ~> csvParserFlow ~> zip.in2
FlowShape(broadcast.in, zip.out)

(The java codes will be different ofc, but you can get the core ideas I think.)

EDIT:
BTW mutating the flowing objects in the flows is not a really good idea… You can be ok with it, but you could have really strange errors too, so be careful!

I have a very similar use case, but with XML parsing/filtering. A Broadcast-And-Zip pattern definitely works well for any 1-1 stages. For the more complicated 1-to-ZeroOrMany stages, I was working on a custom Broadcast-And-Zip-like stage, but with a custom zip, somewhat modeled after ZipLatest, but with the flow rate being controlled by only one of the inlets (the one fed by the 1-to-ZeroOrMany stage). I got pulled away by other priorities before I completed it, but maybe explore something similar.

Hi Gergő,

Thanks a lot for the advice so far. The real problem I’m facing here is the Alpakka CSV library offers a CSVParsing Flow and I cannot do something like

source.mapAsync(4)(obj => csvParse(obj.getCSVString)

Instead, it looks like

source.mapAsync(4)(obj => obj.getCSVString).via(CSVParsing))

After the CSV Parsing stage, I lose the other data in ‘obj’ (since I only extracted and transformed the string field). Is there any way to avoid losing this data and use it after I get the parsed results?

Hope this makes the problem clearer, please let me know if you have any questions!

Can you also elaborate a bit on why mutating objects in the flow might not be a good idea?

You can do it!
You need to start a new stream for each element.

You will have a big stream which doing sth like “for each element fix up the object with the parsed csv”. In code:

source.mapAsync(4)(obj => csvParserSource(obj.getCSVString).runWith(Sink.seq).map(csvMaps => obj.fixupWithCsvMap(csvMaps)))

The inner part csvParserSource(obj.getCSVString).runWith(Sink.seq) will generate a Future[Seq[Map[String,String]]] in scala, I think it will be something similar with the java api. If you “go into” the future, you can fix up your object, this is the .map(csvMaps => obj.fixupWithCsvMap(csvMaps)) part of my code, this will convert the Future[Seq[Map[String,String]]] to a Future[YourObject], and the mapAsync will “remove” the outer Future.

And you need a smaller csvParserSource which will “parse a csv”; it can be a simple source like:

val csvParse(s: String): Source[Map[String,String]] =
  Source.single(s).via(CsvParsing.lineScanner(.....)).via(CsvToMap.toMapAsStrings(.....))

Sorry but my java is really rusty, I know that these concepts are the same with both languages, and I hope if you check the java equivalent of the Future you can close the gap :smiley:

EDIT:

If you use a broadcast, and one sub-flow mutating the data, it will mutate the data that the other sub-flows see, which will lead to some really nasty race-conditions. (Ofc this is just the easiest example, and you had a chance that your data will totally fine if mutated in your stream, but in general it can lead to a lot of problems…)

Hi Gergő,

I got it to work using your first inner-stream suggestion. Thanks a lot for the help!