I owe you a concrete example of the problem.
I think that the case for a mapVia
style scenario is where you don’t have convenient control over the types with the Flow as it is in another library. A good example of this is with Alpakka’s MQTT flow:
...
.map {
case (payload, span) =>
(MqttMessage(mqttEventTopic, payload), span)
}
.via(MqttFlow(settings, 8, MqttQoS.atLeastOnce))
In this example, I’d like to pass through the span
. I can’t as the library doesn’t accommodate me. This is a variant on the problem I originally described i.e. just moving the types around, but I feel that it is a useful scenario to illustrate the problem I’m having.
As a potential solution, let’s imagine that via
can implement my mapVia
by taking a partial function and returning a Source (warning - untested code):
...
.via {
case (payload, span) =>
Source
.single(MqttMessage(mqttEventTopic, payload))
.via(
MqttFlow(settings, 8, MqttQoS.atLeastOnce)
.map { mqttMessage =>
(mqttMessage, span)
}
)
}
Hmmm. Maybe all that I need here is a flatMapConcat
…
MQTT is just one example. Here’s another:
.map {
case ((someOtherData, decryptedData), span) =>
((getSecret("path-to-secret"), ByteString(decryptedData)), (someOtherData, span))
}
.via(IdentityStreams.encrypter)
In this one, the encrypter
flow requires a secret and unencrypted data so that it may encrypt. Its signature is:
Flow[((GetSecret, ByteString), A), (ByteString, A), NotUsed]
So, because I control the encrypter, I’ve made it accommodate the ability to carry through elements. However, I must still reorder the types as the encrypter is general-purpose and part of another library.
I’ve come across a number of these scenarios. As stated, this problem I’m finding is having an additional map when I have no convenient control over the `Flow’s types.