加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用类型类建模生产者 – 消费者语义?

发布时间:2020-12-16 18:33:42 所属栏目:安全 来源:网络整理
导读:如果系统中的某些实体可以充当数据或事件的生成者,而其他实体可以充当消费者,那么将这些“正交关注点”外化到生产者和消费者类型类中是否有意义? 我可以看到Haskell管道库使用这种方法,并且欣赏这个问题对于来自Haskell背景的人来说可能看起来非常基本,但是
如果系统中的某些实体可以充当数据或事件的生成者,而其他实体可以充当消费者,那么将这些“正交关注点”外化到生产者和消费者类型类中是否有意义?

我可以看到Haskell管道库使用这种方法,并且欣赏这个问题对于来自Haskell背景的人来说可能看起来非常基本,但是会对Scala视角和示例感兴趣,因为我看不到很多.

解决方法

你应该看看Matt Might的 this article.

它为您提供了Producer,Consumer,Transducer(您提到的haskell库中的Pipe)的简单实现,以及如何使用它们来创建Web服务器的示例.

基本上每个Producer都扩展了Runnable,并有一个私有缓冲区来输出元素.缓冲区是一个java ArrayBlockingQueue,它是线程安全的.

每个Consumer也是一个Runnable,并且具有使用类似架构的输入缓冲区.

将Consumer连接到Producer时,您将创建另一个Runnable.
在启动时,它将启动Producer和Consumer(它们是Runnable)并将在它们之间传输数据.

将传感器链接到Producer时,它会创建一个新的Producer.

因此,如果您遵循他的实现,您应该能够以haskell的方式编写:

listen ==> connect ==> process ==> reply

以下是从上面的链接复制和改进的一些代码:

import java.util.concurrent.ArrayBlockingQueue

trait Coroutine extends Runnable {
    def start() {
        val myThread = new Thread(this)
        myThread.start()
    }
}

trait Producer[O] extends Coroutine {
     private val outputs = new ArrayBlockingQueue[O](1024)
     protected def put(output: O): Unit = outputs.put(output)
     def next(): O = outputs.take()

     def ==>[I >: O](consumer: Consumer[I]): Coroutine = {
         val that = this
         new Coroutine {
             def run() {
                 while (true) { val o = that.next(); consumer.accept(o) }
             }

             override def start() {
                 that.start()
                 consumer.start()
                 super.start()
             }
         }
     }
}

trait Consumer[I] extends Coroutine {
    private val inputs = new ArrayBlockingQueue[I] (1024)
    def accept(input : I): Unit = inputs.put(input)
    protected def get(): I = inputs.take()
}

以下是如何使用它:

case class IntProducer(zero: Int) extends Producer[Int]{
    def run(): Unit = {
         var i = zero
         while(true) { put(i); i += 1?}
    }
}

object Printer extends Consumer[Any]{
    def run(): Unit = {
         while(true) { println(get()) }
    }
}

val pip = IntProducer(0) ==> Printer
pip.start()

要查看更多示例以及如何处理`Transducer,请查看my Gist.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读