加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 从服务器关闭akka-http websocket连接

发布时间:2020-12-16 18:54:26 所属栏目:安全 来源:网络整理
导读:在我的场景中,客户端发送“再见”websocket消息,我需要在服务器端关闭先前建立的连接. 来自akka-http docs: Closing connections is possible by cancelling the incoming connection Flow from your server logic (e.g. by connecting its downstream to a
在我的场景中,客户端发送“再见”websocket消息,我需要在服务器端关闭先前建立的连接.

来自akka-http docs:

Closing connections is possible by cancelling the incoming connection Flow from your server logic (e.g. by connecting its downstream to a Sink.cancelled and its upstream to a Source.empty). It is also possible to shut down the server’s socket by cancelling the IncomingConnection source connections.

但是我不清楚如何做到这一点,考虑到在协商新连接时设置一次Sink和Source:

(get & path("ws")) {
  optionalHeaderValueByType[UpgradeToWebsocket]() {
    case Some(upgrade) ?
      val connectionId = UUID()
      complete(upgrade.handleMessagesWithSinkSource(sink,source))
    case None ?
      reject(ExpectedWebsocketRequestRejection)
  }
}

解决方法

提示:此答案基于akka-stream-experimental版本2.0-M2. API在其他版本中可能略有不同.

关闭连接的简单方法是使用PushStage:

import akka.stream.stage._

val closeClient = new PushStage[String,String] {
  override def onPush(elem: String,ctx: Context[String]) = elem match {
    case "goodbye" ?
      // println("Connection closed")
      ctx.finish()
    case msg ?
      ctx.push(msg)
  }
}

在客户端或服务器端(通常是流经Flow的每个元素)接收的每个元素都通过这样的Stage组件.在Akka中,完全抽象称为GraphStage,更多信息可以在official documentation中找到.

使用PushStage,我们可以查看具体的传入元素的值,并相应地转换上下文.在上面的示例中,一旦收到再见消息,我们就完成了上下文,否则我们只是通过push方法转发值.

现在,我们可以通过transform方法将closeClient组件连接到任意流:

val connection = Tcp().outgoingConnection(address,port)

val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("n"),maximumFrameLength = 256,allowTruncation = true))
  .map(_.utf8String)
  .transform(() ? closeClient)
  .map(_ ? StdIn.readLine("> "))
  .map(_ + "n")
  .map(ByteString(_))

connection.join(flow).run()

上面的流接收ByteString并返回ByteString,这意味着它可以通过join方法连接到连接.在流程内部,我们首先将字节转换为字符串,然后再将它们发送到closeClient.如果PushStage没有完成流,则元素在流中被转发,在那里它被丢弃并被来自stdin的一些输入替换,然后通过线路发回.如果流完成,则将删除阶段组件之后的所有其他流处理步骤 – 该流现在已关闭.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读