I am trying to download files from sftp which are larger in size either 10 gb or more. while the first file is getting downloaded it throws below error then it doesn’t try to download the next file.
20/03/27 19:07:28 INFO net.schmizz.sshj.transport.TransportImpl: Received SSH_MSG_DISCONNECT (reason=BY_APPLICATION, msg=Idle connection)
20/03/27 19:07:28 ERROR net.schmizz.sshj.transport.TransportImpl: Dying because - Idle connection
net.schmizz.sshj.transport.TransportException: [BY_APPLICATION] Idle connection
at net.schmizz.sshj.transport.TransportImpl.gotDisconnect(TransportImpl.java:548)
at net.schmizz.sshj.transport.TransportImpl.handle(TransportImpl.java:508)
at net.schmizz.sshj.transport.Decoder.decode(Decoder.java:102)
at net.schmizz.sshj.transport.Decoder.received(Decoder.java:170)
at net.schmizz.sshj.transport.Reader.run(Reader.java:59)
20/03/27 19:07:28 INFO net.schmizz.sshj.transport.TransportImpl: Disconnected - BY_APPLICATION
Below is the code
val ftpGraph = (sftpFile: FtpFile, sftpSettings: SftpSettings) =>
Sftp.fromPath(sftpFile.path, sftpSettings)
.completionTimeout(3000 second)
.idleTimeout(3000 second)
.log("[FTP-Process]")
.addAttributes(Attributes.logLevels(onFailure = Attributes.LogLevels.Debug))
.to(FileIO.toPath(Paths.get(tempDir + “/” + sftpFile.name)))
val fileFlow =
Flow[FtpFile].mapAsync(parallelism = 1) { sftpFile =>
println(“Processing SFTP file: " + sftpFile.path)
for {
ftpResult <- ftpGraph(sftpFile, sftpSettings).run
if ftpResult.wasSuccessful
csv <- unzipGraph(sftpFile, tempDir).run
csvFile <- gcpUploadGraph(csv, conf.gcpCsvPath + “/” + csv.substring(csv.lastIndexOf(”/") + 1, csv.length)).run
ignore <- cleanupGraph(sftpFile, sftpSettings, csv).run
} yield s"Upload from ${sftpFile.path} to $ftpResult.wasSuccessful successful"
}
val fileSink = Sink.foreach[String] { s =>
println("fileSink: " + s)
}
val finalStep = Sftp.ls("/gets/", sftpSettings)
.filter(ftpFilter)
.initialTimeout(300 second )
.log("[FTP-LIST]")
.addAttributes(Attributes.logLevels(onElement = Attributes.LogLevels.Debug))
.via(fileFlow)
.toMat(fileSink)(Keep.right)
.run()