scala – 链中的Akka-http-client请求
我想使用akka-http-client链接http请求作为Stream.链中的每个http请求都取决于先前请求的成功/响应,并使用它来构造新请求.如果请求不成功,则Stream应返回不成功请求的响应.
如何在akka-http中构建这样的流? 解决方法
如果您正在制作网络爬虫,请查看
this post.此答案解决了一个更简单的情况,例如下载分页资源,其中指向下一页的链接位于当前页面响应的标题中.
您可以使用Source.unfoldAsync方法创建链接源 – 其中一个项目指向下一个项目.这需要一个函数,它接受一个元素S并返回Future [Option [(S,E)]]来确定该流是否应该继续发出E类型的元素,将状态传递给下一个调用. 在你的情况下,这有点像: >采取初始HttpRequest 但是,有一个皱纹,即如果它不包含指向下一个请求的指针,它将不会从流中发出响应. 为了解决这个问题,你可以将函数传递给unfoldAsync返回Future [Option [(Option [HttpRequest],HttpResponse)]].这允许您处理以下情况: >当前的响应是错误的 接下来是一些带注释的代码,它概述了这种方法,但首先是一个初步的: 当将HTTP请求流式传输到Akka流中的响应时,您需要确保消耗响应主体,否则会发生坏事(死锁等).如果您不需要主体,则可以忽略它,但在这里我们使用函数将HttpEntity从(潜在)流转换为严格实体: import scala.concurrent.duration._ def convertToStrict(r: HttpResponse): Future[HttpResponse] = r.entity.toStrict(10.minutes).map(e => r.withEntity(e)) 接下来,从HttpResponse创建Option [HttpRequest]的几个函数.此示例使用类似Github的分页链接的方案,其中Links标头包含,例如:< https://api.github.com/...u0026gt; REL = “下一步”: def nextUri(r: HttpResponse): Seq[Uri] = for { linkHeader <- r.header[Link].toSeq value <- linkHeader.values params <- value.params if params.key == "rel" && params.value() == "next" } yield value.uri def getNextRequest(r: HttpResponse): Option[HttpRequest] = nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET,next)) 接下来,我们将传递给unfoldAsync的实际函数.它使用Akka HTTP Http().singleRequest()API来获取HttpRequest并生成Future [HttpResponse]: def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest],HttpResponse)]] = reqOption match { case Some(req) => Http().singleRequest(req).flatMap { response => // handle the error case. Here we just return the errored response // with no next item. if (response.status.isFailure()) Future.successful(Some(None -> response)) // Otherwise,convert the response to a strict response by // taking up the body and looking for a next request. else convertToStrict(response).map { strictResponse => getNextRequest(strictResponse) match { // If we have no next request,return Some containing an // empty state,but the current value case None => Some(None -> strictResponse) // Otherwise,pass on the request... case next => Some(next -> strictResponse) } } } // Finally,there's no next request,end the stream by // returning none as the state. case None => Future.successful(None) } 请注意,如果我们收到错误响应,则流将不会继续,因为我们在下一个状态中返回None. 您可以调用此方法来获取HttpResponse对象流,如下所示: val initialRequest = HttpRequest(HttpMethods.GET,"http://www.my-url.com") Source.unfoldAsync[Option[HttpRequest],HttpResponse]( Some(initialRequest)(chainRequests) 至于返回最后一个(或错误的)响应的值,您只需要使用 def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest],HttpResponse]( Some(initialRequest))(chainRequests) .map(_.status) .runWith(Sink.last) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |