I’m using this sample class to test out streaming over SFTP. What I ultimately want to do is use a sink
which is another SFTP server and stream from source(sftp server A) to sink(sftp server B) with my app in the middle. But for now, I’m testing throughput with Sink.ignore
and the performance is much worse than just a plain SFTP client writing the file locally. Can someone tell me if this is a configuration problem or if I’m just “doing it wrong”
The timings are with a 10MB file pulling data from a VM in AWS to my laptop.
import akka.actor.ActorSystem
import akka.stream.alpakka.ftp.scaladsl.Sftp
import akka.stream.alpakka.ftp.{FtpCredentials, SftpSettings}
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.workday.scala.logging.Slogger
import net.schmizz.sshj.transport.verification.PromiscuousVerifier
import net.schmizz.sshj.userauth.method.AuthPassword
import net.schmizz.sshj.userauth.password.{PasswordFinder, Resource}
import net.schmizz.sshj.{DefaultConfig, SSHClient}
import java.net.InetAddress
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
class StreamingSftpTransport {
implicit val system: ActorSystem = ActorSystem("my-service")
private val PORT = 22
private val USER = "conor.griffin"
private val CREDENTIALS = FtpCredentials.create(USER, "password!")
private val BASE_PATH = s"/Users/$USER"
private val FILE_NAME = "10mfile"
private val CHUNK_SIZE = 131072
// Set up the source system connection
private val SOURCE_HOSTNAME = "suv1"
private val sourceSettings = SftpSettings(host = InetAddress.getByName(SOURCE_HOSTNAME))
.withCredentials(FtpCredentials.create("testsftp", "t3st123"))
.withPort(PORT)
.withStrictHostKeyChecking(false)
private val sourceClient = new SSHClient(new DefaultConfig) {
}
private val configuredSourceClient = Sftp(sourceClient)
// Set up the destination system connection
private val DEST_HOSTNAME = "localhost"
private val destSettings = SftpSettings(host = InetAddress.getByName(DEST_HOSTNAME))
.withCredentials(CREDENTIALS)
.withPort(PORT)
.withStrictHostKeyChecking(false)
private val destClient = new SSHClient(new DefaultConfig)
private val configuredDestClient = Sftp(destClient)
private val decider: Supervision.Decider = {
case a => print(a.getMessage)
Supervision.resume
}
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withSupervisionStrategy(decider))
def doTransfer(): Unit = {
println("Streaming")
val source = configuredSourceClient.fromPath(s"/home/testsftp/$FILE_NAME", sourceSettings, CHUNK_SIZE)
// val sink = configuredDestClient.toPath(s"$BASE_PATH/$FILE_NAME.out", destSettings)
val runnable = source
.runWith(Sink.ignore)
println("Streaming: Starting")
val start = System.currentTimeMillis()
Await.result(runnable, 180 seconds)
val end = System.currentTimeMillis()
println(s"Streaming: ${end - start}")
}
def doSftpTransfer(): Unit = {
println("SFTP")
val ssh = new SSHClient(new DefaultConfig)
ssh.addHostKeyVerifier(new PromiscuousVerifier)
ssh.connect(SOURCE_HOSTNAME, 22)
val passwordAuth: AuthPassword = new AuthPassword(new PasswordFinder() {
def reqPassword(resource: Resource[_]): Array[Char] = "t3st123".toCharArray
def shouldRetry(resource: Resource[_]) = false
})
ssh.auth("testsftp", passwordAuth)
println("SFTP: Starting")
val start = System.currentTimeMillis()
ssh.newSFTPClient().get("/home/testsftp/10mfile", "/Users/conor.griffin/Downloads/10mfile.sftp")
val end = System.currentTimeMillis()
println(s"SFTP: ${end - start}")
}
}
Output
[18-Dec-2020 19:41:00.994 UTC] INFO <Slf4jLogger> Slf4jLogger started
[18-Dec-2020 19:41:02.672 UTC] INFO <BouncyCastleRandom> Generating random seed from SecureRandom.
[18-Dec-2020 19:41:02.732 UTC] INFO <BouncyCastleRandom> Generating random seed from SecureRandom.
Streaming
Streaming: Starting
[18-Dec-2020 19:41:03.060 UTC] INFO <TransportImpl> Client identity string: SSH-2.0-SSHJ_0.27.0
[18-Dec-2020 19:41:03.434 UTC] INFO <TransportImpl> Server identity string: SSH-2.0-OpenSSH_7.4
[18-Dec-2020 19:41:05.117 UTC] INFO <SessionChannel> Will request `sftp` subsystem
[18-Dec-2020 19:41:57.819 UTC] INFO <TransportImpl> Disconnected - BY_APPLICATION
Streaming: 55019
SFTP
[18-Dec-2020 19:41:57.826 UTC] INFO <BouncyCastleRandom> Generating random seed from SecureRandom.
[18-Dec-2020 19:41:58.032 UTC] INFO <TransportImpl> Client identity string: SSH-2.0-SSHJ_0.27.0
[18-Dec-2020 19:41:58.247 UTC] INFO <TransportImpl> Server identity string: SSH-2.0-OpenSSH_7.4
SFTP: Starting
[18-Dec-2020 19:41:59.869 UTC] INFO <SessionChannel> Will request `sftp` subsystem
SFTP: 12987