scala – akka-http:从http路由发送元素到akka接收器
发布时间:2020-12-16 18:09:23 所属栏目:安全 来源:网络整理
导读:如何从Akka HTTP路由向Akka Sink发送元素/消息?我的HTTP路由仍然需要返回正常的HTTP响应. 我想这需要一个流分支/结.正常的HTTP路由来自HttpRequest – HttpResponse对象.我想添加一个分支/结点,以便HttpRequests可以触发事件到我的独立接收器以及生成正常的
如何从Akka HTTP路由向Akka Sink发送元素/消息?我的HTTP路由仍然需要返回正常的HTTP响应.
我想这需要一个流分支/结.正常的HTTP路由来自HttpRequest – > HttpResponse对象.我想添加一个分支/结点,以便HttpRequests可以触发事件到我的独立接收器以及生成正常的HttpResponse. 下面是一个非常简单的单一路线akka-http应用程序.为简单起见,我使用的是一个简单的println接收器.我的生产用例,显然会涉及一个不那么微不足道的下沉. def main(args: Array[String]): Unit = { implicit val actorSystem = ActorSystem("my-akka-http-test") val executor = actorSystem.dispatcher implicit val materializer = ActorMaterializer()(actorSystem) // I would like to send elements to this sink in response to HTTP GET operations. val sink: Sink[Any,Future[Done]] = Sink.foreach(println) val route: akka.http.scaladsl.server.Route = path("hello" / Segment) { p => get { // I'd like to send a message to an Akka Sink as well as return an HTTP response. complete { s"<h1>Say hello to akka-http. p=$p</h1>" } } } val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem) val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route),"localhost",8080) println("Server online at http://localhost:8080/") println("Press RETURN to stop...") scala.io.StdIn.readLine() bindingFuture .flatMap(_.unbind())(executor) // trigger unbinding from the port .onComplete(_ => Await.result(actorSystem.terminate(),Duration.Inf))(executor) // and shutdown when done } 编辑:或者在使用低级别的akka??-http API时,如何从特定的路由处理程序向特定的消息发送特定的消息? def main(args: Array[String]): Unit = { implicit val actorSystem = ActorSystem("my-akka-http-test") val executor = actorSystem.dispatcher implicit val materializer = ActorMaterializer()(actorSystem) // I would like to send elements to this sink in response to HTTP GET operations. val sink: Sink[Any,Future[Done]] = Sink.foreach(println) val requestHandler: HttpRequest => HttpResponse = { case HttpRequest(GET,Uri.Path("/"),_,_) => HttpResponse(entity = HttpEntity( ContentTypes.`text/html(UTF-8)`,"<html><body>Hello world!</body></html>")) case HttpRequest(GET,Uri.Path("/ping"),_) => HttpResponse(entity = "PONG!") case HttpRequest(GET,Uri.Path("/crash"),_) => sys.error("BOOM!") case r: HttpRequest => r.discardEntityBytes() // important to drain incoming HTTP Entity stream HttpResponse(404,entity = "Unknown resource!") } val serverSource = Http().bind(interface = "localhost",port = 8080) val bindingFuture: Future[Http.ServerBinding] = serverSource.to(Sink.foreach { connection => println("Accepted new connection from " + connection.remoteAddress) connection handleWithSyncHandler requestHandler // this is equivalent to // connection handleWith { Flow[HttpRequest] map requestHandler } }).run() println("Server online at http://localhost:8080/") println("Press RETURN to stop...") scala.io.StdIn.readLine() bindingFuture .flatMap(_.unbind())(executor) // trigger unbinding from the port .onComplete(_ => Await.result(actorSystem.terminate(),Duration.Inf))(executor) // and shutdown when done } 解决方法
如果您想将整个HttpRequest发送到您的接收器,我会说最简单的方法是使用alsoTo组合器.结果将是符合的
val mySink: Sink[HttpRequest,NotUsed] = ??? val handlerFlow = Flow[HttpRequest].alsoTo(mySink).via(RouteResult.route2HandlerFlow(route)) val bindingFuture = Http().bindAndHandle(handlerFlow,8080) 仅供参考:事实上它也隐藏了广播舞台. 相反,您需要有选择地从特定子路由向Sink发送消息,除了为每个传入请求实现新流之外别无选择.见下面的例子 val sink: Sink[Any,Future[Done]] = Sink.foreach(println) val route: akka.http.scaladsl.server.Route = path("hello" / Segment) { p => get { (extract(_.request) & extractMaterializer) { (req,mat) ? Source.single(req).runWith(sink)(mat) complete { s"<h1>Say hello to akka-http. p=$p</h1>" } } } } 此外,请记住,您始终可以完全抛弃高级DSL,并使用lower-level streams DSL为整个路径建模.这将导致更详细的代码 – 但会让您完全控制流的实现. 编辑:下面的例子 val sink: Sink[Any,Future[Done]] = Sink.foreach(println) val handlerFlow = Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val partition = b.add(Partition[HttpRequest](2,{ case HttpRequest(GET,_) ? 0 case _ ? 1 })) val merge = b.add(Merge[HttpResponse](2)) val happyPath = Flow[HttpRequest].map{ req ? HttpResponse(entity = HttpEntity( ContentTypes.`text/html(UTF-8)`,"<html><body>Hello world!</body></html>")) } val unhappyPath = Flow[HttpRequest].map{ case HttpRequest(GET,_) => HttpResponse(entity = "PONG!") case HttpRequest(GET,_) => sys.error("BOOM!") case r: HttpRequest => r.discardEntityBytes() // important to drain incoming HTTP Entity stream HttpResponse(404,entity = "Unknown resource!") } partition.out(0).alsoTo(sink) ~> happyPath ~> merge partition.out(1) ~> unhappyPath ~> merge FlowShape(partition.in,merge.out) }) val bindingFuture = Http().bindAndHandle(handlerFlow,8080) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |