haskell – 流媒体库中的惯用语预取
发布时间:2020-12-15 04:38:51 所属栏目:Java 来源:网络整理
导读:我正在使用 streaming库但是会接受使用管道或管道的答案. 说我有 import Streaming (Stream,Of)import qualified Streaming.Prelude as SstreamChunks :: Int - Stream (Of Thing) IO ()streamChunks lastID = do flip fix 0 $go thingID - unless (thingID
我正在使用
streaming库但是会接受使用管道或管道的答案.
说我有 import Streaming (Stream,Of) import qualified Streaming.Prelude as S streamChunks :: Int -> Stream (Of Thing) IO () streamChunks lastID = do flip fix 0 $go thingID -> unless (thingID > lastID) $do thing <- highLatencyGet thingID S.yield thing go (thingID+1) 为了减少延迟,我想分叉highLatencyGet以在处理消费者中的前一个Thing的同时检索下一个Thing. 显然,我可以将我的函数转换为创建一个新的MVar并在调用yield等之前分配下一批. 但我想知道是否有一种惯用的(可组合的)方法来做到这一点,这样它就可以打包在一个库中,并且可以在任意IO Streams上使用.理想情况下,我们也可以配置预取值,例如: prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO () 解决方法
此解决方案使用管道,但它可以很容易地适应使用流媒体.确切地说,它需要管道,管道 – 并发和异步包.
它不能以“直接”的方式工作.它不仅仅是简单地转换生产者,还需要一个消耗生产者的“折叠功能”.这种延续传递方式对于设置和拆除并发机制是必要的. import Pipes import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically) import Control.Concurrent.Async (Concurrently(..)) import Control.Exception (finally) prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r prefetching bufsize source foldfunc = do (outbox,inbox,seal) <- spawn' (bounded bufsize) let cutcord effect = effect `finally` atomically seal runConcurrently $ Concurrently (cutcord (runEffect (source >-> toOutput outbox))) *> Concurrently (cutcord (foldfunc (fromInput inbox))) 原始生产者的输出被重定向到有界队列.同时,我们将producer-folding函数应用于从队列中读取的生产者. 每当并发操作完成时,我们会小心地立即关闭通道以避免另一侧悬空. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |