scala – 使用Play 2.6和akka流的Websocket代理
发布时间:2020-12-16 09:57:02 所属栏目:安全 来源:网络整理
导读:我正在尝试使用Play和akka流创建一个简单的Websocket连接代理. 交通流量是这样的: (Client) request - - request (Server) Proxy (Client) response - - response (Server) 我按照一些例子后提出了以下代码: def socket = WebSocket.accept[String,String]
我正在尝试使用Play和akka流创建一个简单的Websocket连接代理.
交通流量是这样的: (Client) request -> -> request (Server) Proxy (Client) response <- <- response (Server) 我按照一些例子后提出了以下代码: def socket = WebSocket.accept[String,String] { request => val uuid = UUID.randomUUID().toString // wsOut - actor that deals with incoming websocket frame from the Client // wsIn - publisher of the frame for the Server val (wsOut: ActorRef,wsIn: Publisher[String]) = { val source: Source[String,ActorRef] = Source.actorRef[String](10,OverflowStrategy.dropTail) val sink: Sink[String,Publisher[String]] = Sink.asPublisher(fanout = false) source.toMat(sink)(Keep.both).run() } // sink that deals with the incoming messages from the Server val serverIncoming: Sink[Message,Future[Done]] = Sink.foreach[Message] { case message: TextMessage.Strict => println("The server has sent: " + message.text) } // source for sending a message over the WebSocket val serverOutgoing = Source.fromPublisher(wsIn).map(TextMessage(_)) // flow to use (note: not re-usable!) val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://0.0.0.0:6000")) // the materialized value is a tuple with // upgradeResponse is a Future[WebSocketUpgradeResponse] that // completes or fails when the connection succeeds or fails // and closed is a Future[Done] with the stream completion from the incoming sink val (upgradeResponse,closed) = serverOutgoing .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse] .toMat(serverIncoming)(Keep.both) // also keep the Future[Done] .run() // just like a regular http request we can access response status which is available via upgrade.response.status // status code 101 (Switching Protocols) indicates that server support WebSockets val connected = upgradeResponse.flatMap { upgrade => if (upgrade.response.status == StatusCodes.SwitchingProtocols) { Future.successful(Done) } else { throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") } } // in a real application you would not side effect here connected.onComplete(println) closed.foreach(_ => println("closed")) val actor = system.actorOf(WebSocketProxyActor.props(wsOut,uuid)) val finalFlow = { val sink = Sink.actorRef(actor,akka.actor.Status.Success(())) val source = Source.maybe[String] // what the client receives. How to connect with the serverIncoming sink ??? Flow.fromSinkAndSource(sink,source) } finalFlow 使用此代码,流量从客户端到代理服务器再到服务器,返回到代理服务器就是这样.它没有进一步向客户提供.我怎样才能解决这个问题 ? 或者我对这种方法完全错了?使用Bidiflow或Graph更好吗?我是akka流的新手,仍在努力解决问题. 解决方法
以下似乎有效.注意:我在同一个控制器中实现了服务器套接字和代理套接字,但您可以拆分它们或在不同的实例上部署相同的控制器.在两种情况下都需要更新“上”服务的ws url.
package controllers import javax.inject._ import akka.actor.{Actor,ActorRef,ActorSystem,Props} import akka.http.scaladsl.Http import akka.http.scaladsl.model.ws.{Message,TextMessage,WebSocketRequest,WebSocketUpgradeResponse} import akka.stream.Materializer import akka.stream.scaladsl.Flow import play.api.libs.streams.ActorFlow import play.api.mvc._ import scala.concurrent.{ExecutionContext,Future} import scala.language.postfixOps @Singleton class SomeController @Inject()(implicit exec: ExecutionContext,actorSystem: ActorSystem,materializer: Materializer) extends Controller { /*--- proxy ---*/ def websocketFlow: Flow[Message,Message,Future[WebSocketUpgradeResponse]] = Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9000/upper-socket")) def proxySocket: WebSocket = WebSocket.accept[String,String] { _ => Flow[String].map(s => TextMessage(s)) .via(websocketFlow) .map(_.asTextMessage.getStrictText) } /*--- server ---*/ class UpperService(socket: ActorRef) extends Actor { override def receive: Receive = { case s: String => socket ! s.toUpperCase() case _ => } } object UpperService { def props(socket: ActorRef): Props = Props(new UpperService(socket)) } def upperSocket: WebSocket = WebSocket.accept[String,String] { _ => ActorFlow.actorRef(out => UpperService.props(out)) } } 您将需要像这样设置路线: GET /upper-socket controllers.SomeController.upperSocket GET /proxy-socket controllers.SomeController.proxySocket 您可以通过向ws:// localhost:9000 / proxy-socket发送字符串来进行测试.答案是大写的字符串. 不活动1分钟后会有超时: akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [localhost:9000],no bytes passed in the last 1 minute 但请参阅:http://doc.akka.io/docs/akka-http/current/scala/http/common/timeouts.html如何配置此项. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |