加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

haskell – 流媒体库中的惯用语预取

发布时间:2020-12-15 04:38:51 所属栏目:Java 来源:网络整理
导读:我正在使用 streaming库但是会接受使用管道或管道的答案. 说我有 import Streaming (Stream,Of)import qualified Streaming.Prelude as SstreamChunks :: Int - Stream (Of Thing) IO ()streamChunks lastID = do flip fix 0 $go thingID - unless (thingID
我正在使用 streaming库但是会接受使用管道或管道的答案.

说我有

import Streaming (Stream,Of)
import qualified Streaming.Prelude as S

streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
  flip fix 0 $go thingID ->
    unless (thingID > lastID) $do
      thing <- highLatencyGet thingID
      S.yield thing
      go (thingID+1)

为了减少延迟,我想分叉highLatencyGet以在处理消费者中的前一个Thing的同时检索下一个Thing.

显然,我可以将我的函数转换为创建一个新的MVar并在调用yield等之前分配下一批.

但我想知道是否有一种惯用的(可组合的)方法来做到这一点,这样它就可以打包在一个库中,并且可以在任意IO Streams上使用.理想情况下,我们也可以配置预取值,例如:

prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()

解决方法

此解决方案使用管道,但它可以很容易地适应使用流媒体.确切地说,它需要管道,管道 – 并发和异步包.

它不能以“直接”的方式工作.它不仅仅是简单地转换生产者,还需要一个消耗生产者的“折叠功能”.这种延续传递方式对于设置和拆除并发机制是必要的.

import Pipes
import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
import Control.Concurrent.Async (Concurrently(..))
import Control.Exception (finally)

prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
prefetching bufsize source foldfunc = do
    (outbox,inbox,seal) <- spawn' (bounded bufsize)
    let cutcord effect = effect `finally` atomically seal
    runConcurrently $
        Concurrently (cutcord (runEffect (source >-> toOutput outbox)))
        *>
        Concurrently (cutcord (foldfunc (fromInput inbox)))

原始生产者的输出被重定向到有界队列.同时,我们将producer-folding函数应用于从队列中读取的生产者.

每当并发操作完成时,我们会小心地立即关闭通道以避免另一侧悬空.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读