I’m trying to integrate UnixDomainSocket with akka-http by customizing ClientTransport with UninDomainSocket#outgoingConnection.
final class UnixDomainSocketTransport(file: File) extends ClientTransport {
override def connectTo(
host: String,
port: Int,
settings: ClientConnectionSettings
)(implicit system: ActorSystem): Flow[ByteString, ByteString, Future[OutgoingConnection]] = {
implicit val ex = system.dispatcher
val address = InetSocketAddress.createUnresolved(host, port)
UnixDomainSocket()
.outgoingConnection(file)
.mapMaterializedValue(_.map { c =>
system.log.info(s"materialized $c")
Http.OutgoingConnection(address, address)
})
}
}
Unfortunately i could not get it works because of the error “Substream Source cannot be materialized more than once”. Here is the code to reproduce the error.
akka.version = 2.5.19
alpakka.version = 1.0-M2
The error was expected if Souce was operated prefixAndTail, which i found out in akka/FlowPrefixAndTailSpec.scala. While the Flow returned by UninDomainSocket#outgoingConnection was nested with a prefixAndTail operated Source, it could be found https://github.com/akka/alpakka/blob/master/unix-domain-socket/src/main/scala/akka/stream/alpakka/unixdomainsocket/impl/UnixDomainSocketImpl.scala#L239.
I’m not sure this is a bug or a wrong way to use. Hope any one could help me to work around with it, thanks!
Thanks for reply! I already knew that connect settings could avoid the problem, that why i had commented withMaxConnections(1).
But, i want to use UnixDomainSocket for watching docker daemon events, then refresh the containers or services if it gets some notifications from docker daemon. So, i need at lease two connections of UnixDomainSocket, one for long pulling (watching) events, and the other for short querying.
I realized that the semantics of Future[OutgoingConnection] in Flow[ByteString, ByteString, Future[OutgoingConnection]] is different between Tcp#OutgoingConnection (a new connection) and UnixDomainSocket#OutgoingConnection (the same connection).
Since UnixDomainSocket#OutgoingConnection followed the declaration of Tcp#OutgoingConnection , it would be better that UnixDomainSocket keep the same semantics!
Ahh, yes. The semantics should be the same, or at least there should be a possibility to tweak configuration to get the same semantics. Especially since the Unix Domain Socket connector API was modeled after the TCP Stream API.
Would you be up for creating a PR which would make the UnixDomainSocket#OutgoingConnection return a new connection by default?