scala – 如何动态添加元素到源?
发布时间:2020-12-16 09:19:54 所属栏目:安全 来源:网络整理
导读:我有示例代码生成一个未绑定的源并使用它: 对象Main { def main(args : Array[String]): Unit = { implicit val system = ActorSystem("Sys") import system.dispatcher implicit val materializer = ActorFlowMaterializer() val source: Source[String] =
我有示例代码生成一个未绑定的源并使用它:
对象Main { def main(args : Array[String]): Unit = { implicit val system = ActorSystem("Sys") import system.dispatcher implicit val materializer = ActorFlowMaterializer() val source: Source[String] = Source(() => { Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)}) }) source.runForeach((item:String) => { println(item) }) .onComplete{ _ => system.shutdown() } } } 我想创建类实现: trait MySources { def addToSource(item: String) def getSource() : Source[String] } 我需要使用多个线程,例如: class MyThread(mySources: MySources) extends Thread { override def run(): Unit = { for(i <- 1 to 1000000) { // here will be infinite loop mySources.addToSource(i.toString) } } } 并预期全部代码: object Main { def main(args : Array[String]): Unit = { implicit val system = ActorSystem("Sys") import system.dispatcher implicit val materializer = ActorFlowMaterializer() val sources = new MySourcesImplementation() for(i <- 1 to 100) { (new MyThread(sources)).start() } val source = sources.getSource() source.runForeach((item:String) => { println(item) }) .onComplete{ _ => system.shutdown() } } } 如何实现MySources? 解决方法
拥有非有限源的一种方法是使用特殊类型的actor作为源,它们混合在ActorPublisher特征中.如果您创建这些类型的演员之一,然后通过调用ActorPublisher.apply进行包装,则最终会出现一个Reactive Streams Publisher实例,并且可以使用Source从中生成源代码.之后,您只需要确保您的ActorPublisher类正确处理用于向下游发送元素的“活动流”协议,并且您很好.一个非常简单的例子如下:
import akka.actor._ import akka.stream.actor._ import akka.stream.ActorFlowMaterializer import akka.stream.scaladsl._ object DynamicSourceExample extends App{ implicit val system = ActorSystem("test") implicit val materializer = ActorFlowMaterializer() val actorRef = system.actorOf(Props[ActorBasedSource]) val pub = ActorPublisher[Int](actorRef) Source(pub). map(_ * 2). runWith(Sink.foreach(println)) for(i <- 1 until 20){ actorRef ! i.toString Thread.sleep(1000) } } class ActorBasedSource extends Actor with ActorPublisher[Int]{ import ActorPublisherMessage._ var items:List[Int] = List.empty def receive = { case s:String => if (totalDemand == 0) items = items :+ s.toInt else onNext(s.toInt) case Request(demand) => if (demand > items.size){ items foreach (onNext) items = List.empty } else{ val (send,keep) = items.splitAt(demand.toInt) items = keep send foreach (onNext) } case other => println(s"got other $other") } } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |