scala – 如何使用Akka Streams和HTTP将HTTP资源下载到文件?
| 
 在过去的几天里,我一直试图找出使用Akka Streams和HTTP将HTTP资源下载到文件的最佳方法. 
  
  最初我从Future-Based Variant开始,看起来像这样: def downloadViaFutures(uri: Uri,file: File): Future[Long] = {
  val request = Get(uri)
  val responseFuture = Http().singleRequest(request)
  responseFuture.flatMap { response =>
    val source = response.entity.dataBytes
    source.runWith(FileIO.toFile(file))
  }
}这有点好,但是一旦我了解了更多关于纯Akka Streams的信息,我想尝试使用Flow-Based Variant从Source [HttpRequest]开始创建一个流.起初,这完全困扰了我,直到我偶然发现flatMapConcat流转换.这最终变得更加冗长: def responSEOrFail[T](in: (Try[HttpResponse],T)): (HttpResponse,T) = in match {
  case (responseTry,context) => (responseTry.get,context)
}
def responseToByteSource[T](in: (HttpResponse,T)): Source[ByteString,Any] = in match {
  case (response,_) => response.entity.dataBytes
}
def downloadViaFlow(uri: Uri,file: File): Future[Long] = {
  val request = Get(uri)
  val source = Source.single((request,()))
  val requestResponseFlow = Http().superPool[Unit]()
  source.
    via(requestResponseFlow).
    map(responSEOrFail).
    flatMapConcat(responseToByteSource).
    runWith(FileIO.toFile(file))
}然后我想要有点棘手并使用Content-Disposition标头. 回到基于未来的变体: def destinationFile(downloadDir: File,response: HttpResponse): File = {
  val fileName = response.header[ContentDisposition].get.value
  val file = new File(downloadDir,fileName)
  file.createNewFile()
  file
}
def downloadViaFutures2(uri: Uri,downloadDir: File): Future[Long] = {
  val request = Get(uri)
  val responseFuture = Http().singleRequest(request)
  responseFuture.flatMap { response =>
    val file = destinationFile(downloadDir,response)
    val source = response.entity.dataBytes
    source.runWith(FileIO.toFile(file))
  }
}但现在我不知道如何使用Future-Based Variant来做到这一点.这是我得到的: def responseToByteSourceWithDest[T](in: (HttpResponse,T),downloadDir: File): Source[(ByteString,File),_) =>
    val source = responseToByteSource(in)
    val file = destinationFile(downloadDir,response)
    source.map((_,file))
}
def downloadViaFlow2(uri: Uri,downloadDir: File): Future[Long] = {
  val request = Get(uri)
  val source = Source.single((request,()))
  val requestResponseFlow = Http().superPool[Unit]()
  val sourceWithDest: Source[(ByteString,Unit] = source.
    via(requestResponseFlow).
    map(responSEOrFail).
    flatMapConcat(responseToByteSourceWithDest(_,downloadDir))
  sourceWithDest.runWith(???)
}所以现在我有一个Source会为每个文件发出一个或多个(ByteString,File)元素(我说每个文件,因为没有理由原始Source必须是单个HttpRequest). 无论如何都要采取这些并将它们路由到动态的接收器? 我在想像flatMapConcat,比如: def runWithMap[T,Mat2](f: T => Graph[SinkShape[Out],Mat2])(implicit materializer: Materializer): Mat2 = ??? 这样我就可以完成downloadViaFlow2: def destToSink(destination: File): Sink[(ByteString,Future[Long]] = {
  val sink = FileIO.toFile(destination,true)
  Flow[(ByteString,File)].map(_._1).toMat(sink)(Keep.right)
}
sourceWithDest.runWithMap {
  case (_,file) => destToSink(file)
}解决方法
 该解决方案不需要flatMapConcat.如果您不需要文件写入的任何返回值,那么您可以使用 
  Sink.foreach:def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = {
  val file = destinationFile(downloadDir,httpResponse)
  httpResponse.entity.dataBytes.runWith(FileIO.toFile(file))
}
def downloadViaFlow2(uri: Uri,downloadDir: File) : Future[Unit] = {
  val request = HttpRequest(uri=uri)
  val source = Source.single((request,()))
  val requestResponseFlow = Http().superPool[Unit]()
  source.via(requestResponseFlow)
        .map(responSEOrFail)
        .map(_._1)
        .runWith(Sink.foreach(writeFile(downloadDir)))
}请注意,Sink.foreach从writeFile函数创建Futures.因此,没有太大的背压. writeFile可能会被硬盘驱动器放慢速度,但是流会继续生成Futures.要控制它,您可以使用 val parallelism = 10
source.via(requestResponseFlow)
      .map(responSEOrFail)
      .map(_._1)
      .mapAsyncUnordered(parallelism)(writeFile(downloadDir))
      .runWith(Sink.ignore)如果要累计总计数的Long值,则需要与Sink.fold结合使用: source.via(requestResponseFlow)
      .map(responSEOrFail)
      .map(_._1)
      .mapAsyncUnordered(parallelism)(writeFile(downloadDir))
      .runWith(Sink.fold(0L)(_ + _))折叠将保持运行总和,并在请求源干涸时发出最终值. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! | 
