加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala,将回调模式转换为函数式内部迭代器

发布时间:2020-12-16 19:25:13 所属栏目:安全 来源:网络整理
导读:假设给出了这个API,我们无法更改它: object ProviderAPI { trait Receiver[T] { def receive(entry: T) def close() } def run(r: Receiver[Int]) { new Thread() { override def run() { (0 to 9).foreach { i = r.receive(i) Thread.sleep(100) } r.close
假设给出了这个API,我们无法更改它:

object ProviderAPI {

  trait Receiver[T] {
    def receive(entry: T)
    def close()
  }

  def run(r: Receiver[Int]) {
    new Thread() {
      override def run() {
        (0 to 9).foreach { i =>
          r.receive(i)
          Thread.sleep(100)
        }
        r.close()
      }
    }.start()
  }
}

在此示例中,ProviderAPI.run接收Receiver,调用receive(i)10次然后关闭.通常,ProviderAPI.run会根据可能无限的集合调用receive(i).

此API旨在以命令式样式使用,如外部迭代器.如果我们的应用程序需要过滤,映射和打印此输入,我们需要实现一个混合所有这些操作的Receiver:

object Main extends App {
  class MyReceiver extends ProviderAPI.Receiver[Int] {
    def receive(entry: Int) {
      if (entry % 2 == 0) {
        println("Entry#" + entry)
      }
    }
    def close() {}
  }

  ProviderAPI.run(new MyReceiver())
}

现在,问题是如何在函数式,内部迭代器中使用ProviderAPI(不改变ProviderAPI的实现,这是给我们的).请注意,ProviderAPI也可以无限次地调用receive(i),因此不能选择收集列表中的所有内容(同样,我们应该逐个处理每个结果,而不是先收集所有输入,然后再处理它) .

我问如何实现这样的ReceiverToIterator,以便我们可以在功能样式中使用ProviderAPI:

object Main extends App {
  val iterator = new ReceiverToIterator[Int]  // how to implement this?
  ProviderAPI.run(iterator)
  iterator
    .view
    .filter(_ % 2 == 0)
    .map("Entry#" + _)
    .foreach(println)
}

更新

这有四个解决方案:

> IteratorWithSemaphorSolution:我提议的解决方案解决方案首先附在问题上
> QueueIteratorSolution:根据nadavwr的建议使用BlockingQueue [Option [T]].
它允许生产者在被消费者阻止之前继续产生最多queueCapacity.
> PublishSubjectSolution:非常简单的解决方案,使用Netflix RxJava-Scala API中的PublishSubject.
> SameThreadReceiverToTraversable:通过放宽问题的约束,非常简单的解决方案

解决方法

更新:BlockingQueue的1个条目

你在这里实现的主要是Java的BlockingQueue,队列大小为1.

主要特点:超级阻塞.缓慢的消费者会扼杀你的制作人的表现.

更新:@ gzm0提到BlockingQueue不包括EOF.你必须使用BlockingQueue [Option [T]].

更新:这是一个代码片段.它可以与您的接收器配合使用.
其中一些灵感来自Iterator.buffered.请注意,peek是一个误导性的名称,因为它可能会阻止 – 所以hasNext也是如此.

// fairness enabled -- you probably want to preserve order...
// alternatively,disable fairness and increase buffer to be 'big enough'
private val queue = new java.util.concurrent.ArrayBlockingQueue[Option[T]](1,true)

// the following block provides you with a potentially blocking peek operation
// it should `queue.take` when the previous peeked head has been invalidated
// specifically,it will `queue.take` and block when the queue is empty
private var head: Option[T] = _
private var headDefined: Boolean = false
private def invalidateHead() { headDefined = false }
private def peek: Option[T] = {
  if (!headDefined) {
    head = queue.take()
    headDefined = true
  }
  head
}

def iterator = new Iterator[T] {

  // potentially blocking; only false upon taking `None`
  def hasNext = peek.isDefined

  // peeks and invalidates head; throws NoSuchElementException as appropriate
  def next: T = {
    val opt = peek; invalidateHead()
    if (opt.isEmpty) throw new NoSuchElementException
    else opt.get
  }
}

替代方案:Iteratees

基于迭代器的解决方案通常会涉及更多阻塞.从概念上讲,你可以在执行迭代的线程上使用continuation来避免阻塞线程,但是延续会破坏Scala的for-comprehensions,所以在这条道路上没有快乐.

或者,您可以考虑基于迭代的解决方案.迭代器与迭代器的不同之处在于,消费者不负责推进迭代 – 生产者是.使用迭代,消费者基本上折叠生产者推动的条目随着时间的推移.折叠每个下一个条目可以在线程池中进行折叠,因为在每次折叠完成后放弃该线程.

你不会为迭代的语法得到好处,学习曲线有点挑战,但如果你对使用foldLeft有信心,你最终会得到一个看起来合理的非阻塞解决方案.

要阅读有关iteratees的更多信息,我建议您查看PlayFramework 2.X’s iteratee reference.该文档描述了他们的独立iteratee库,它在Play的上下文之外100%可用. Scalaz 7还有一个全面的iteratee库.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读