scala – Akka-Http Websockets:如何向消费者发送相同的数据流
发布时间:2020-12-16 18:18:52 所属栏目:安全 来源:网络整理
导读:我有一个客户端可以连接的WebSocket我也有一个使用akka-streams的数据流.如何使所有客户端获得相同的数据.目前他们似乎正在争夺数据. 谢谢 解决方法 你可以做的一种方法是让一个演员扩展ActorPublisher并让它订阅 一些消息. class MyPublisher extends Actor
我有一个客户端可以连接的WebSocket我也有一个使用akka-streams的数据流.如何使所有客户端获得相同的数据.目前他们似乎正在争夺数据.
谢谢 解决方法
你可以做的一种方法是让一个演员扩展ActorPublisher并让它订阅
一些消息. class MyPublisher extends ActorPublisher[MyData]{ override def preStart = { context.system.eventStream.subscribe(self,classOf[MyData]) } override def receive: Receive = { case msg: MyData ? if (isActive && totalDemand > 0) { // Pushes the message onto the stream onNext(msg) } } } object MyPublisher { def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher()) } case class MyData(data:String) 然后,您可以使用该actor作为流的源: val dataSource = Source.actorPublisher[MyData](MyPublisher.props(someExcutionContext)) 然后,您可以从该数据源创建流并应用转换以将数据转换为websocket消息 val myFlow = Flow.fromSinkAndSource(Sink.ignore,dataSource map {d => TextMessage.Strict(d.data)}) 然后,您可以在路径处理中使用该流程. path("readings") { handleWebsocketMessages(myFlow) } 然后,您可以从原始流的处理中将数据发布到事件流,并且该actor的任何实例都会将其拾取并放入正在为其提供websocket的流中. val actorSystem = ActorSystem("foo") val otherSource = Source.fromIterator(() => List(MyData("a"),MyData("b")).iterator) otherSource.runForeach { msg ? actorSystem.eventStream.publish(MyData("data"))} 然后,每个套接字都有自己的actor实例,为它提供所有来自单个源的数据. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |