加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

在Play Scala中使用Iteratees和Enumerators将数据流式传输到S3

发布时间:2020-12-16 09:55:03 所属栏目:安全 来源:网络整理
导读:我正在 Scala中构建一个Play Framework应用程序,我想将一个字节数组流式传输到S3.我正在使用 Play-S3库来执行此操作.文档部分的“Multipart文件上传”与此处相关: // Retrieve an upload ticketval result:Future[BucketFileUploadTicket] = bucket initiat
我正在 Scala中构建一个Play Framework应用程序,我想将一个字节数组流式传输到S3.我正在使用 Play-S3库来执行此操作.文档部分的“Multipart文件上传”与此处相关:

// Retrieve an upload ticket
val result:Future[BucketFileUploadTicket] =
  bucket initiateMultipartUpload BucketFile(fileName,mimeType)

// Upload the parts and save the tickets
val result:Future[BucketFilePartUploadTicket] =
  bucket uploadPart (uploadTicket,BucketFilePart(partNumber,content))

// Complete the upload using both the upload ticket and the part upload tickets
val result:Future[Unit] =
  bucket completeMultipartUpload (uploadTicket,partUploadTickets)

我试图在我的应用程序中执行相同的操作,但使用Iteratees和Enumerators.

流和异步性使事情变得有点复杂,但这是我到目前为止(注意uploadTicket在代码的前面定义):

val partNumberStream = Stream.iterate(1)(_ + 1).iterator
val partUploadTicketsIteratee = Iteratee.fold[Array[Byte],Future[Vector[BucketFilePartUploadTicket]]](Future.successful(Vector.empty[BucketFilePartUploadTicket])) { (partUploadTickets,bytes) =>
  bucket.uploadPart(uploadTicket,BucketFilePart(partNumberStream.next(),bytes)).flatMap(partUploadTicket => partUploadTickets.map( _ :+ partUploadTicket))
}
(body |>>> partUploadTicketsIteratee).andThen {
  case result =>
    result.map(_.map(partUploadTickets => bucket.completeMultipartUpload(uploadTicket,partUploadTickets))) match {
      case Success(x) => x.map(d => println("Success"))
      case Failure(t) => throw t
    }
}

一切都编译和运行没有事故.实际上,“成功”会被打印出来,但S3上没有任何文件显示出来.

解决方法

您的代码可能存在多个问题.由map方法调用引起的有点难以理解.您的未来构图可能有问题.另一个问题可能是由于所有块(除了最后一块)至少应为5MB.

下面的代码尚未经过测试,但显示了不同的方法. iteratee方法是一种可以创建小构建块并将它们组合成一个操作管道的方法.

为了使代码编译,我添加了一个特征和一些方法

trait BucketFilePartUploadTicket
val uploadPart: (Int,Array[Byte]) => Future[BucketFilePartUploadTicket] = ???
val completeUpload: Seq[BucketFilePartUploadTicket] => Future[Unit] = ???
val body: Enumerator[Array[Byte]] = ???

这里我们创建几个部分

// Create 5MB chunks
val chunked = {
  val take5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5)
  Enumeratee.grouped(take5MB transform Iteratee.consume())
}

// Add a counter,used as part number later on
val zipWithIndex = Enumeratee.scanLeft[Array[Byte]](0 -> Array.empty[Byte]) {
  case ((counter,_),bytes) => (counter + 1) -> bytes
}

// Map the (Int,Array[Byte]) tuple to a BucketFilePartUploadTicket
val uploadPartTickets = Enumeratee.mapM[(Int,Array[Byte])](uploadPart.tupled)

// Construct the pipe to connect to the enumerator
// the ><> operator is an alias for compose,it is more intuitive because of 
// it's arrow like structure
val pipe = chunked ><> zipWithIndex ><> uploadPartTickets

// Create a consumer that ends by finishing the upload
val consumeAndComplete = 
  Iteratee.getChunks[BucketFilePartUploadTicket] mapM completeUpload

只需连接部件即可完成运行

// This is the result,a Future[Unit]
val result = body through pipe run consumeAndComplete

请注意,我没有测试任何代码,可能在我的方法中犯了一些错误.然而,这显示了处理问题的不同方式,并且应该可以帮助您找到一个好的解决方案.

请注意,此方法在进入下一部分之前等待一部分完成上载.如果从服务器到亚马逊的连接比从浏览器到服务器的连接慢,则此机制将减慢输入.

你可以采取另一种方法,你不要等待零件上传的未来完成.这将导致在使用Future.sequence上传期货序列转化为包含结果的顺序一个未来的另一步.结果将是一旦有足够的数据就将一个部件发送到亚马逊的机制.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读