scala – 从服务器关闭akka-http websocket连接
在我的场景中,客户端发送“再见”websocket消息,我需要在服务器端关闭先前建立的连接.
来自akka-http docs:
但是我不清楚如何做到这一点,考虑到在协商新连接时设置一次Sink和Source: (get & path("ws")) { optionalHeaderValueByType[UpgradeToWebsocket]() { case Some(upgrade) ? val connectionId = UUID() complete(upgrade.handleMessagesWithSinkSource(sink,source)) case None ? reject(ExpectedWebsocketRequestRejection) } } 解决方法
提示:此答案基于akka-stream-experimental版本2.0-M2. API在其他版本中可能略有不同.
关闭连接的简单方法是使用PushStage: import akka.stream.stage._ val closeClient = new PushStage[String,String] { override def onPush(elem: String,ctx: Context[String]) = elem match { case "goodbye" ? // println("Connection closed") ctx.finish() case msg ? ctx.push(msg) } } 在客户端或服务器端(通常是流经Flow的每个元素)接收的每个元素都通过这样的Stage组件.在Akka中,完全抽象称为GraphStage,更多信息可以在official documentation中找到. 使用PushStage,我们可以查看具体的传入元素的值,并相应地转换上下文.在上面的示例中,一旦收到再见消息,我们就完成了上下文,否则我们只是通过push方法转发值. 现在,我们可以通过transform方法将closeClient组件连接到任意流: val connection = Tcp().outgoingConnection(address,port) val flow = Flow[ByteString] .via(Framing.delimiter( ByteString("n"),maximumFrameLength = 256,allowTruncation = true)) .map(_.utf8String) .transform(() ? closeClient) .map(_ ? StdIn.readLine("> ")) .map(_ + "n") .map(ByteString(_)) connection.join(flow).run() 上面的流接收ByteString并返回ByteString,这意味着它可以通过join方法连接到连接.在流程内部,我们首先将字节转换为字符串,然后再将它们发送到closeClient.如果PushStage没有完成流,则元素在流中被转发,在那里它被丢弃并被来自stdin的一些输入替换,然后通过线路发回.如果流完成,则将删除阶段组件之后的所有其他流处理步骤 – 该流现在已关闭. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |