scala – 如何在Akka-Stream 2.0流程开始时向ActorRef发送消息?
发布时间:2020-12-16 08:44:57 所属栏目:安全 来源:网络整理
导读:目标是在连接客户端并启动流后发送WSConnectEvent.使用akka-streams 1.0,我能够通过以下方式实现此目的: Flow(Source.actorRef[WSResponseEvent](65535,OverflowStrategy.fail)) { implicit builder = sdpSource = // Incoming SDP offer flow val fromWebs
目标是在连接客户端并启动流后发送WSConnectEvent.使用akka-streams 1.0,我能够通过以下方式实现此目的:
Flow(Source.actorRef[WSResponseEvent](65535,OverflowStrategy.fail)) { implicit builder => sdpSource => // Incoming SDP offer flow val fromWebsocket = builder.add(Flow[Message].collect { case TextMessage.Strict(txt) => { val event = txt.parseJson.convertTo[WSResponseEvent] WSMessageEvent(callUUID,userUUID,event.id,event.data) } }) // Outgoing SDP answer flow val toWebsocket = builder.add(Flow[WSResponseEvent].map { case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint) }) val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}") val callActorRef = Await.result(callActorSelection.resolveOne(),Duration.Inf); val callActorSink = Sink.actorRef[CallControlEvent](callActorRef,WSDisconnectEvent(callUUID,userUUID)) // Join events,also sends actor for sending stuff val merge = builder.add(Merge[CallControlEvent](2)) val actorAsSource = builder.materializedValue.map(actor => WSConnectEvent(callUUID,actor)) fromWebsocket ~> merge.in(0) actorAsSource ~> merge.in(1) merge ~> callActorSink sdpSource ~> toWebsocket (fromWebsocket.inlet,toWebsocket.outlet) } 在尝试将其升级为与Akka-Streams 2.0.1一起使用时,我更改为以下代码,但我不是孤独的接收WSConnectEvent消息.我不确定这是因为我的源设置不正确,还是我没有正确实现ActorRef. val sdpSource = Source.actorRef[WSResponseEvent](65535,OverflowStrategy.fail) Flow.fromGraph( GraphDSL.create() { implicit builder => // Incoming SDP offer flow val fromWebsocket = builder.add(Flow[Message].collect { case TextMessage.Strict(txt) => { val event = txt.parseJson.convertTo[WSResponseEvent] WSMessageEvent(callUUID,event.data) } }) // Outgoing SDP answer flow val toWebsocket = builder.add(Flow[WSResponseEvent].map { case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint) }) val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}") val callActorRef = Await.result(callActorSelection.resolveOne(),Duration.Inf); val callActorSink = Sink.actorRef[CallControlEvent](callActorRef,userUUID)) // Join events,also sends actor for sending stuff val merge = builder.add(Merge[CallControlEvent](2)) val actorAsSource = sdpSource.mapMaterializedValue(WSConnectEvent(callUUID,_)) fromWebsocket ~> merge.in(0) actorAsSource ~> merge.in(1) merge ~> callActorSink sdpSource ~> toWebsocket FlowShape(fromWebsocket.in,toWebsocket.out) } ) 解决方法
感谢johanandren的帮助,mapMaterializedValue不是正确的方法,而是我需要构造一个流来发送WSConnectEvent并将builder.materializeValue的输出通过它连接到端口中的’merge’,如下所示:
// Join events,also sends actor for sending stuff val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID,_)) builder.materializedValue ~> actorConnected ~> merge.in(1) 完整的工作示例: val sdpSource: Source[WSResponseEvent,ActorRef] = Source.actorRef[WSResponseEvent](65535,OverflowStrategy.fail) Flow.fromGraph(GraphDSL.create(sdpSource) { implicit builder => { (responseSource) => import GraphDSL.Implicits._ // Incoming SDP offer flow val fromWebsocket = builder.add(Flow[Message].mapAsync(1)(_ match { case tm: TextMessage => tm.textStream.runFold("")(_ + _).map(Some(_)) case bm: BinaryMessage => bm.dataStream.runWith(Sink.ignore) Future.successful(None) }).collect { case Some(txt) => { val event = txt.parseJson.convertTo[WSResponseEvent] WSMessageEvent(callUUID,2.minutes); val toCallActor = builder.add(Sink.actorRef[CallControlEvent](callActorRef,userUUID))) // Join events,also sends actor for sending stuff val merge = builder.add(Merge[CallControlEvent](2)) val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID,_)) fromWebsocket ~> merge.in(0) builder.materializedValue ~> actorConnected ~> merge.in(1) merge ~> toCallActor responseSource ~> toWebsocket FlowShape.of(fromWebsocket.in,toWebsocket.out) } }) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |