scala – 监控关闭的图形Akka Stream
发布时间:2020-12-16 19:11:45 所属栏目:安全 来源:网络整理
导读:如果我在Akka Stream中创建了RunningGraph,我怎么知道(从外面) 当所有节点因完成而取消时? 当所有节点因错误而停止时? 解决方法 我不认为有任何方法可以为任意图形执行此操作,但如果您控制了图形,则只需将监视接收器连接到每个节点的输出,这些输出可能会失
如果我在Akka Stream中创建了RunningGraph,我怎么知道(从外面)
>当所有节点因完成而取消时? 解决方法
我不认为有任何方法可以为任意图形执行此操作,但如果您控制了图形,则只需将监视接收器连接到每个节点的输出,这些输出可能会失败或完成(这些节点是至少一个输出),例如:
import akka.actor.Status // obtain graph parts (this can be done inside the graph building as well) val source: Source[Int,NotUsed] = ... val flow: Flow[Int,String,NotUsed] = ... val sink: Sink[String,NotUsed] = ... // create monitoring actors val aggregate = actorSystem.actorOf(Props[Aggregate]) val sourceMonitorActor = actorSystem.actorOf(Props(new Monitor("source",aggregate))) val flowMonitorActor = actorSystem.actorOf(Props(new Monitor("flow",aggregate))) // create the graph val graph = GraphDSL.create() { implicit b => import GraphDSL._ val sourceMonitor = b.add(Sink.actorRef(sourceMonitorActor,Status.Success(()))),val flowMonitor = b.add(Sink.actorRef(flowMonitorActor,Status.Success(()))) val bc1 = b.add(Broadcast[Int](2)) val bc2 = b.add(Broadcast[String](2)) // main flow source ~> bc1 ~> flow ~> bc2 ~> sink // monitoring branches bc1 ~> sourceMonitor bc2 ~> flowMonitor ClosedShape } // run the graph RunnableGraph.fromGraph(graph).run() class Monitor(name: String,aggregate: ActorRef) extends Actor { override def receive: Receive = { case Status.Success(_) => aggregate ! s"$name completed successfully" case Status.Failure(e) => aggregate ! s"$name completed with failure: ${e.getMessage}" case _ => } } class Aggregate extends Actor { override def receive: Receive = { case s: String => println(s) } } 也可以只创建一个监视actor并在所有监视接收器中使用它,但在这种情况下,您将无法在失败的流之间轻松区分. 并且在源和流上还有 import akka.Done import akka.actor.Status import akka.pattern.pipe val monitor = actorSystem.actorOf(Props[Monitor]) source .watchTermination()((f,_) => f pipeTo monitor) .via(flow).watchTermination((f,_) => f pipeTo monitor) .to(sink) .run() class Monitor extends Actor { override def receive: Receive = { case Done => println("stream completed") case Status.Failure(e) => println(s"stream failed: ${e.getMessage}") } } 您可以在将其值传递给actor之前转换未来,以区分流. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |