scala – Akka Stream在Flow中使用HttpResponse
我想利用一个简单的Flow从http服务中收集一些额外的数据,并用结果增强我的数据对象.以下说明了这个想法:
val httpClient = Http().superPool[User]() val cityRequest = Flow[User].map { user=> (HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))),User) } val cityResponse = Flow[(Try[HttpResponse],User)].map { case (Failure(ex),user) => user case (Success(resp),user) => { // << What to do here to get the value >> // val responseData = processResponseSomehowToGetAValue? val enhancedUser = new EnhancedUser(user.data,responseData) enhancedUser } } val processEnhancedUser = Flow[EnhancedUser].map { // e.g.: Asynchronously save user to a database } val useEnhancementGraph = userSource .via(getRequest) .via(httpClient) .via(getResponse) .via(processEnhancedUser) .to(Sink.foreach(println)) 我有一个问题要理解它们之间的机制和区别 以下想法没有向我解释: > http://doc.akka.io/docs/akka-http/current/scala/http/implications-of-streaming-http-entity.html 如何从响应中获取值到新用户对象中, 感谢帮助. 更新: 我正在使用远程akka http服务器评估代码,使用下面的代码在解析之前立即和10秒之间回答请求. 我在某个时间将.async添加到cityResponse解析器的末尾,结果输出花费的时间更长,但是正确. 这种行为的原因是什么?它与接受的答案如何契合? val cityResponse = Flow[(Try[HttpResponse],member) => member case (Success(response),member) => { Unmarshal(response.entity).to[String] onComplete { case Success(s) => member.city = Some(s) case Failure(ex) => member.city = None } } member }.async // <<-- This changed the behavior to be correct,why? 解决方法
您可以使用两种不同的策略,具体取决于您从“cityRequestEndpoint”获得的实体的性质:
基于流 处理这种情况的典型方法是始终假设来自源端点的实体可以包含N个数据,其中N是事先未知的.这通常是要遵循的模式,因为它是现实世界中最通用的,因此是“最安全的”. 第一步是将来自端点的HttpResponse转换为数据源: val convertResponseToByteStrSource : (Try[HttpResponse],User) => Source[(Option[ByteString],User),_] = (response,user) => response match { case Failure(_) => Source single (None -> user) case Success(r) => r.entity.dataBytes map (byteStr => Some(byteStr) -> user) } 上面的代码是我们不假设N的大小,r.entity.dataBytes可能是0 ByteString值的源,或者可能是无限数值.但我们的逻辑并不关心! 现在我们需要组合来自Source的数据.这是Flow.flatMapConcat的一个很好的用例,它采用源流并将其转换为值的流(类似于Iterables的flatMap): val cityByteStrFlow : Flow[(Try[HttpResponse],(Option[ByteString],_] = Flow[(Try[HttpResponse],User)] flatMapConcat convertResponseToByteStrSource 剩下要做的就是将(ByteString,User)的元组转换为EnhancedUser.注意:我假设下面的用户是EnhancedUser的子类,它是从问题逻辑中推断出来的: val convertByteStringToUser : (Option[ByteString],User) => EnhancedUser = (byteStr,user) => byteStr .map(s => EnhancedUser(user.data,s)) .getOrElse(user) val cityUserFlow : Flow[(Option[ByteString],EnhancedUser,_] = Flow[(ByteString,User)] map convertByteStringToUser 现在可以组合这些组件: val useEnhancementGraph = userSource .via(cityRequest) .via(httpClient) .via(cityByteStrFlow) .via(cityUserFlow) .via(processEnhancedUser) .to(Sink foreach println) 未来基础 我们可以使用Futures来解决问题,类似于您在原始问题中引用的堆栈问题.我不推荐这种方法有两个原因: >它假设只有1个ByteString来自端点.如果端点将多个值作为ByteStrings发送,则它们会连接在一起,并且在创建EnhancedUser时可能会出错. 要使用基于Future的方法,对原始代码的唯一重大更改是使用 val parallelism = 10 val timeout : FiniteDuration = ??? //you need to specify the timeout limit val convertResponseToFutureByteStr : (Try[HttpResponse],User) => Future[EnhancedUser] = _ match { case (Failure(ex),user) => Future successful user case (Success(resp),user) => resp .entity .toStrict(timeout) .map(byteStr => new EnhancedUser(user.data,byteStr)) } val cityResponse : Flow[(Try[HttpResponse],_] = Flow[(Try[HttpResponse],User)].mapAsync(parallelism)(convertResponseToFutureByteStr) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |