加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何在Akka-Stream 2.0流程开始时向ActorRef发送消息?

发布时间:2020-12-16 08:44:57 所属栏目:安全 来源:网络整理
导读:目标是在连接客户端并启动流后发送WSConnectEvent.使用akka-streams 1.0,我能够通过以下方式实现此目的: Flow(Source.actorRef[WSResponseEvent](65535,OverflowStrategy.fail)) { implicit builder = sdpSource = // Incoming SDP offer flow val fromWebs
目标是在连接客户端并启动流后发送WSConnectEvent.使用akka-streams 1.0,我能够通过以下方式实现此目的:

Flow(Source.actorRef[WSResponseEvent](65535,OverflowStrategy.fail)) {
  implicit builder =>
    sdpSource =>

      // Incoming SDP offer flow
      val fromWebsocket = builder.add(Flow[Message].collect {
        case TextMessage.Strict(txt) => {
          val event = txt.parseJson.convertTo[WSResponseEvent]
          WSMessageEvent(callUUID,userUUID,event.id,event.data)
        }
      })

      // Outgoing SDP answer flow
      val toWebsocket = builder.add(Flow[WSResponseEvent].map {
        case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
      })

      val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
      val callActorRef = Await.result(callActorSelection.resolveOne(),Duration.Inf);
      val callActorSink = Sink.actorRef[CallControlEvent](callActorRef,WSDisconnectEvent(callUUID,userUUID))

      // Join events,also sends actor for sending stuff
      val merge = builder.add(Merge[CallControlEvent](2))
      val actorAsSource = builder.materializedValue.map(actor => WSConnectEvent(callUUID,actor))

      fromWebsocket ~> merge.in(0)
      actorAsSource ~> merge.in(1)

      merge ~> callActorSink
      sdpSource ~> toWebsocket
      (fromWebsocket.inlet,toWebsocket.outlet)
}

在尝试将其升级为与Akka-Streams 2.0.1一起使用时,我更改为以下代码,但我不是孤独的接收WSConnectEvent消息.我不确定这是因为我的源设置不正确,还是我没有正确实现ActorRef.

val sdpSource = Source.actorRef[WSResponseEvent](65535,OverflowStrategy.fail)

Flow.fromGraph(
  GraphDSL.create() { implicit builder =>

    // Incoming SDP offer flow
    val fromWebsocket = builder.add(Flow[Message].collect {
      case TextMessage.Strict(txt) => {
        val event = txt.parseJson.convertTo[WSResponseEvent]
        WSMessageEvent(callUUID,event.data)
      }
    })

    // Outgoing SDP answer flow
    val toWebsocket = builder.add(Flow[WSResponseEvent].map {
      case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
    })

    val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
    val callActorRef = Await.result(callActorSelection.resolveOne(),Duration.Inf);
    val callActorSink = Sink.actorRef[CallControlEvent](callActorRef,userUUID))

    // Join events,also sends actor for sending stuff
    val merge = builder.add(Merge[CallControlEvent](2))
    val actorAsSource = sdpSource.mapMaterializedValue(WSConnectEvent(callUUID,_))

    fromWebsocket ~> merge.in(0)
    actorAsSource ~> merge.in(1)

    merge ~> callActorSink
    sdpSource ~> toWebsocket
    FlowShape(fromWebsocket.in,toWebsocket.out)
  }
)

解决方法

感谢johanandren的帮助,mapMaterializedValue不是正确的方法,而是我需要构造一个流来发送WSConnectEvent并将builder.materializeValue的输出通过它连接到端口中的’merge’,如下所示:

// Join events,also sends actor for sending stuff
val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID,_))

builder.materializedValue ~> actorConnected ~> merge.in(1)

完整的工作示例:

val sdpSource: Source[WSResponseEvent,ActorRef] = Source.actorRef[WSResponseEvent](65535,OverflowStrategy.fail)

Flow.fromGraph(GraphDSL.create(sdpSource) {
  implicit builder =>
    { (responseSource) =>
      import GraphDSL.Implicits._

      // Incoming SDP offer flow
      val fromWebsocket = builder.add(Flow[Message].mapAsync(1)(_ match {
        case tm: TextMessage => tm.textStream.runFold("")(_ + _).map(Some(_))
        case bm: BinaryMessage =>
          bm.dataStream.runWith(Sink.ignore)
          Future.successful(None)
      }).collect {
        case Some(txt) => {
          val event = txt.parseJson.convertTo[WSResponseEvent]
          WSMessageEvent(callUUID,2.minutes);
      val toCallActor = builder.add(Sink.actorRef[CallControlEvent](callActorRef,userUUID)))

      // Join events,also sends actor for sending stuff
      val merge = builder.add(Merge[CallControlEvent](2))
      val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID,_))

      fromWebsocket ~> merge.in(0)
      builder.materializedValue ~> actorConnected ~> merge.in(1)

      merge ~> toCallActor
      responseSource ~> toWebsocket

      FlowShape.of(fromWebsocket.in,toWebsocket.out)
    }
})

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读