加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – akka-http:从http路由发送元素到akka接收器

发布时间:2020-12-16 18:09:23 所属栏目:安全 来源:网络整理
导读:如何从Akka HTTP路由向Akka Sink发送元素/消息?我的HTTP路由仍然需要返回正常的HTTP响应. 我想这需要一个流分支/结.正常的HTTP路由来自HttpRequest – HttpResponse对象.我想添加一个分支/结点,以便HttpRequests可以触发事件到我的独立接收器以及生成正常的
如何从Akka HTTP路由向Akka Sink发送元素/消息?我的HTTP路由仍然需要返回正常的HTTP响应.

我想这需要一个流分支/结.正常的HTTP路由来自HttpRequest – > HttpResponse对象.我想添加一个分支/结点,以便HttpRequests可以触发事件到我的独立接收器以及生成正常的HttpResponse.

下面是一个非常简单的单一路线akka-http应用程序.为简单起见,我使用的是一个简单的println接收器.我的生产用例,显然会涉及一个不那么微不足道的下沉.

def main(args: Array[String]): Unit = {
  implicit val actorSystem = ActorSystem("my-akka-http-test")
  val executor = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()(actorSystem)

  // I would like to send elements to this sink in response to HTTP GET operations.
  val sink: Sink[Any,Future[Done]] = Sink.foreach(println)

  val route: akka.http.scaladsl.server.Route =
    path("hello" / Segment) { p =>
      get {
        // I'd like to send a message to an Akka Sink as well as return an HTTP response.
        complete {
          s"<h1>Say hello to akka-http. p=$p</h1>"
        }
      }
    }

  val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
  val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route),"localhost",8080)

  println("Server online at http://localhost:8080/")
  println("Press RETURN to stop...")
  scala.io.StdIn.readLine()

  bindingFuture
    .flatMap(_.unbind())(executor) // trigger unbinding from the port
    .onComplete(_ => Await.result(actorSystem.terminate(),Duration.Inf))(executor) // and shutdown when done
}

编辑:或者在使用低级别的akka??-http API时,如何从特定的路由处理程序向特定的消息发送特定的消息?

def main(args: Array[String]): Unit = {
  implicit val actorSystem = ActorSystem("my-akka-http-test")
  val executor = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()(actorSystem)

  // I would like to send elements to this sink in response to HTTP GET operations.
  val sink: Sink[Any,Future[Done]] = Sink.foreach(println)

  val requestHandler: HttpRequest => HttpResponse = {
    case HttpRequest(GET,Uri.Path("/"),_,_) =>
      HttpResponse(entity = HttpEntity(
        ContentTypes.`text/html(UTF-8)`,"<html><body>Hello world!</body></html>"))

    case HttpRequest(GET,Uri.Path("/ping"),_) =>
      HttpResponse(entity = "PONG!")

    case HttpRequest(GET,Uri.Path("/crash"),_) =>
      sys.error("BOOM!")

    case r: HttpRequest =>
      r.discardEntityBytes() // important to drain incoming HTTP Entity stream
      HttpResponse(404,entity = "Unknown resource!")
  }

  val serverSource = Http().bind(interface = "localhost",port = 8080)

  val bindingFuture: Future[Http.ServerBinding] =
    serverSource.to(Sink.foreach { connection =>
      println("Accepted new connection from " + connection.remoteAddress)

      connection handleWithSyncHandler requestHandler
      // this is equivalent to
      // connection handleWith { Flow[HttpRequest] map requestHandler }
    }).run()

  println("Server online at http://localhost:8080/")
  println("Press RETURN to stop...")
  scala.io.StdIn.readLine()

  bindingFuture
    .flatMap(_.unbind())(executor) // trigger unbinding from the port
    .onComplete(_ => Await.result(actorSystem.terminate(),Duration.Inf))(executor) // and shutdown when done
}

解决方法

如果您想将整个HttpRequest发送到您的接收器,我会说最简单的方法是使用alsoTo组合器.结果将是符合的

val mySink: Sink[HttpRequest,NotUsed] = ???

val handlerFlow = Flow[HttpRequest].alsoTo(mySink).via(RouteResult.route2HandlerFlow(route))

val bindingFuture = Http().bindAndHandle(handlerFlow,8080)

仅供参考:事实上它也隐藏了广播舞台.

相反,您需要有选择地从特定子路由向Sink发送消息,除了为每个传入请求实现新流之外别无选择.见下面的例子

val sink: Sink[Any,Future[Done]] = Sink.foreach(println)

val route: akka.http.scaladsl.server.Route =
  path("hello" / Segment) { p =>
    get {

      (extract(_.request) & extractMaterializer) { (req,mat) ?
        Source.single(req).runWith(sink)(mat)

        complete {
          s"<h1>Say hello to akka-http. p=$p</h1>"
        }
      }
    }
  }

此外,请记住,您始终可以完全抛弃高级DSL,并使用lower-level streams DSL为整个路径建模.这将导致更详细的代码 – 但会让您完全控制流的实现.

编辑:下面的例子

val sink: Sink[Any,Future[Done]] = Sink.foreach(println)

val handlerFlow =
  Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val partition = b.add(Partition[HttpRequest](2,{
      case HttpRequest(GET,_) ? 0
      case _                                        ? 1
    }))
    val merge = b.add(Merge[HttpResponse](2))

    val happyPath = Flow[HttpRequest].map{ req ?
      HttpResponse(entity = HttpEntity(
        ContentTypes.`text/html(UTF-8)`,"<html><body>Hello world!</body></html>"))
    }        

    val unhappyPath = Flow[HttpRequest].map{
      case HttpRequest(GET,_) =>
      HttpResponse(entity = "PONG!")

      case HttpRequest(GET,_) =>
      sys.error("BOOM!")

      case r: HttpRequest =>
        r.discardEntityBytes() // important to drain incoming HTTP Entity stream
        HttpResponse(404,entity = "Unknown resource!")
    }

    partition.out(0).alsoTo(sink) ~> happyPath   ~> merge
    partition.out(1)              ~> unhappyPath ~> merge

    FlowShape(partition.in,merge.out)
  })

val bindingFuture = Http().bindAndHandle(handlerFlow,8080)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读