scala – 如何从多个文件写入组装Akka Streams接收器?
发布时间:2020-12-16 08:56:36 所属栏目:安全 来源:网络整理
导读:我正在尝试将基于akka流的流程集成到我的Play 2.5应用程序中.我们的想法是,您可以在照片中流式传输,然后将其作为原始文件,缩略图版本和水印版本写入磁盘. 我设法使用这样的图形来实现这一点: val byteAccumulator = Flow[ByteString].fold(new ByteStringBu
我正在尝试将基于akka流的流程集成到我的Play 2.5应用程序中.我们的想法是,您可以在照片中流式传输,然后将其作为原始文件,缩略图版本和水印版本写入磁盘.
我设法使用这样的图形来实现这一点: val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder,b) => {builder ++= b.toArray}) .map(_.result().toArray) def toByteArray = Flow[ByteString].map(b => b.toArray) val graph = Flow.fromGraph(GraphDSL.create() {implicit builder => import GraphDSL.Implicits._ val streamFan = builder.add(Broadcast[ByteString](3)) val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) val output = builder.add(Flow[ByteString].map(x => Success(Done))) val rawFileSink = FileIO.toFile(file) val thumbnailFileSink = FileIO.toFile(getFile(path,Thumbnail)) val watermarkedFileSink = FileIO.toFile(getFile(path,Watermarked)) streamFan.out(0) ~> rawFileSink streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in streamFan.out(2) ~> output.in byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink FlowShape(streamFan.in,output.out) }) graph } 然后我使用这样的累加器将它连接到我的播放控制器: val sink = Sink.head[Try[Done]] val photoStorageParser = BodyParser { req => Accumulator(sink).through(graph).map(Right.apply) } 问题是我的两个处理过的文件接收器没有完成,我得到的两个处理文件的大小都是零,但不是原始的.我的理论是累加器只等待我的扇出的一个输出,所以当输入流完成并且我的byteAccumulator吐出完整的文件时,到处理完成时,播放已从输出获得物化值. 所以,我的问题是: 解决方法
好的,经过一点帮助(安德烈亚斯在正确的轨道上),我已经到达了这个解决方案,它可以解决问题:
val rawFileSink = FileIO.toFile(file) val thumbnailFileSink = FileIO.toFile(getFile(path,Thumbnail)) val watermarkedFileSink = FileIO.toFile(getFile(path,Watermarked)) val graph = Sink.fromGraph(GraphDSL.create(rawFileSink,thumbnailFileSink,watermarkedFileSink)((_,_,_)) { implicit builder => (rawSink,thumbSink,waterSink) => { val streamFan = builder.add(Broadcast[ByteString](2)) val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) streamFan.out(0) ~> rawSink streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink SinkShape(streamFan.in) } }) graph.mapMaterializedValue[Future[Try[Done]]](fs => Future.sequence(Seq(fs._1,fs._2,fs._3)).map(f => Success(Done))) 之后很容易从Play中调用它: val photoStorageParser = BodyParser { req => Accumulator(theSink).map(Right.apply) } def createImage(path: String) = Action(photoStorageParser) { req => Created } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |