Hi there,
I am new to akka and akka http and have been running into a problem where when using a multi-form request, I’m getting the error ‘Substream Source cannot be materialized more than once’ when I execute my ‘upload’ route multiple times (calls after the first cause this error to happen).
After doing some searching online it seems like this is being caused by pulling out the formField “metadata” (which consumes the stream) and then trying to use the same multi-form data in storeUploadFile.
Does anyone know of a workaround or a better way to handle a multi-form request with an upload?
import java.io.File
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.StatusCodes.OK
import akka.http.scaladsl.server.Directives.{complete, path, storeUploadedFile}
import akka.http.scaladsl.server.directives.FileInfo
import akka.stream.ActorMaterializer
import com.google.common.io.Files.{getFileExtension, getNameWithoutExtension}
import com.google.inject.{AbstractModule, Guice}
import com.typesafe.config.{Config, ConfigFactory}
import javax.inject.{Inject, Singleton}
import net.codingwell.scalaguice.InjectorExtensions._
import net.codingwell.scalaguice.ScalaModule
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext}
import scala.util.{Failure, Success}
object FileStorerMain extends App {
val actorSystem = ActorSystem()
val injector = Guice.createInjector(new Module(actorSystem))
val httpRouter = injector.instance[HttpServer]
httpRouter.start()
private def shutdown() = {
httpRouter.stop()
actorSystem.terminate()
}
sys.addShutdownHook(shutdown())
}
@Singleton
class HttpServer @Inject()(config: Config, private implicit val actorSystem: ActorSystem) {
private implicit val materializer: ActorMaterializer = ActorMaterializer()
private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
private val routes = {
path("upload") {
formField("metadata") { metadata =>
storeUploadedFile("file", createTempFile) {
case (_, file) => complete(OK)
}
}
}
}
private val host = "localhost"
private val port = 9008
private val bindingFuture = Http().bindAndHandle(routes, host, port)
bindingFuture.onComplete {
case Success(serverBinding) => println(s"Server bound to ${serverBinding.localAddress}")
case Failure(_) => println(s"Failed to bind to $host:$port")
}
private def createTempFile(fileInfo: FileInfo): File = {
val filename = getNameWithoutExtension(fileInfo.fileName)
val extension = getFileExtension(fileInfo.fileName)
val file = File.createTempFile(s"$filename-", s".$extension")
file
}
def start(): Unit = {
Await.result(bindingFuture, Duration.Inf)
}
def stop(): Unit = {
Await.result(bindingFuture.flatMap(_.unbind()), Duration.Inf)
}
}
class Module(actorSystem: ActorSystem) extends AbstractModule with ScalaModule {
override def configure(): Unit = {
bind[ActorSystem].toInstance(actorSystem)
bind[Config].toInstance(ConfigFactory.load().resolve())
}
}