加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何动态添加元素到源?

发布时间:2020-12-16 09:19:54 所属栏目:安全 来源:网络整理
导读:我有示例代码生成一个未绑定的源并使用它: 对象Main { def main(args : Array[String]): Unit = { implicit val system = ActorSystem("Sys") import system.dispatcher implicit val materializer = ActorFlowMaterializer() val source: Source[String] =
我有示例代码生成一个未绑定的源并使用它:

对象Main {

def main(args : Array[String]): Unit = {

  implicit val system = ActorSystem("Sys")
  import system.dispatcher

  implicit val materializer = ActorFlowMaterializer()

  val source: Source[String] = Source(() => {
     Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})
    })

  source.runForeach((item:String) => { println(item) })
  .onComplete{ _ => system.shutdown() }
 }

}

我想创建类实现:

trait MySources {
    def addToSource(item: String)
    def getSource() : Source[String]
}

我需要使用多个线程,例如:

class MyThread(mySources: MySources) extends Thread {
  override def run(): Unit = {
    for(i <- 1 to 1000000) { // here will be infinite loop
        mySources.addToSource(i.toString)
    }
  }
}

并预期全部代码:

object Main {
  def main(args : Array[String]): Unit = {
    implicit val system = ActorSystem("Sys")
    import system.dispatcher

    implicit val materializer = ActorFlowMaterializer()

    val sources = new MySourcesImplementation()

    for(i <- 1 to 100) {
      (new MyThread(sources)).start()
    }

    val source = sources.getSource()

    source.runForeach((item:String) => { println(item) })
    .onComplete{ _ => system.shutdown() }
  }
}

如何实现MySources?

解决方法

拥有非有限源的一种方法是使用特殊类型的actor作为源,它们混合在ActorPublisher特征中.如果您创建这些类型的演员之一,然后通过调用ActorPublisher.apply进行包装,则最终会出现一个Reactive Streams Publisher实例,并且可以使用Source从中生成源代码.之后,您只需要确保您的ActorPublisher类正确处理用于向下游发送元素的“活动流”协议,并且您很好.一个非常简单的例子如下:

import akka.actor._
import akka.stream.actor._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._

object DynamicSourceExample extends App{

  implicit val system = ActorSystem("test")
  implicit val materializer = ActorFlowMaterializer()

  val actorRef = system.actorOf(Props[ActorBasedSource])
  val pub = ActorPublisher[Int](actorRef)

  Source(pub).
    map(_ * 2).
    runWith(Sink.foreach(println))

  for(i <- 1 until 20){
    actorRef ! i.toString
    Thread.sleep(1000)
  }

}

class ActorBasedSource extends Actor with ActorPublisher[Int]{
  import ActorPublisherMessage._
  var items:List[Int] = List.empty

  def receive = {
    case s:String =>
      if (totalDemand == 0) 
        items = items :+ s.toInt
      else
        onNext(s.toInt)    

    case Request(demand) =>  
      if (demand > items.size){
        items foreach (onNext)
        items = List.empty
      }
      else{
        val (send,keep) = items.splitAt(demand.toInt)
        items = keep
        send foreach (onNext)
      }


    case other =>
      println(s"got other $other")
  }


}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读