f# – 并行流水线
(fileNameToCharStream "bigfile" |>> fuse [length; splitBy (fun x -> x = ' ' || x = 'n') removeEmpty |>> length; splitBy (fun x -> x = 'n') keepEmpty |>> length; ]) (*fuse "fuses" the three functions to run concurrently*) |> run 2 (*forces to run in parallel on two threads*) |> (fun [num_chars; num_words; num_lines] -> printfn "%d %d %d" num_chars num_words,num_lines)) 我想以下列方式使这段代码工作: 我该如何编程指定的功能和操作符|>>?
看起来你的要求相当多.我将由你来决定字符串操作,但我将向你展示如何定义一个并行执行一系列操作的操作符.
第1步:编写保险丝功能 您的保险丝功能似乎使用多个功能映射单个输入,这很容易编写如下: //val fuse : seq<('a -> 'b)> -> 'a -> 'b list let fuse functionList input = [ for f in functionList -> f input] 请注意,所有映射函数都需要具有相同的类型. 第2步:定义运算符以并行执行函数 标准并行映射函数可以写成如下: //val pmap : ('a -> 'b) -> seq<'a> -> 'b array let pmap f l = seq [for a in l -> async { return f a } ] |> Async.Parallel |> Async.RunSynchronously 据我所知,Async.Parallel将并行执行异步操作,其中在任何给定时间执行的并行任务的数量等于机器上的核心数量(如果我错了,有人可以纠正我).因此,在双核机器上,调用此函数时,我的机器上最多应运行2个线程.这是一件好事,因为我们不期望通过每个核心运行多个线程来实现任何加速(实际上额外的上下文切换可能会减慢速度). 我们可以定义一个运算符|>>在pmap和fuse方面: //val ( |>> ) : seq<'a> -> seq<('a -> 'b)> -> 'b list array let (|>>) input functionList = pmap (fuse functionList) input 所以|>> operator接受一堆输入并使用大量不同的输出映射它们.到目前为止,如果我们把所有这些放在一起,我们得到以下(在fsi中): > let countOccurrences compareChar source = source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) let length (s : string) = s.Length let testData = "Juliet is awesome|Someone should give her a medal".Split('|') let testOutput = testData |>> [length; countOccurrences 'J'; countOccurrences 'o'];; val countOccurrences : 'a -> seq<'a> -> int val length : string -> int val testData : string [] = [|"Juliet is awesome"; "Someone should give her a medal"|] val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|] testOutput包含两个元素,两个元素都是并行计算的. 第3步:将元素聚合为单个输出 好的,现在我们有数组中每个元素表示的部分结果,我们希望将我们的部分结果合并为一个聚合.我假设数组中的每个元素都应该合并相同的函数,因为输入中的每个元素都具有相同的数据类型. 这是我为这份工作写的一个非常难看的功能: > let reduceMany f input = input |> Seq.reduce (fun acc x -> [for (a,b) in Seq.zip acc x -> f a b ]);; val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list > reduceMany (+) testOutput;; val it : int list = [48; 1; 4] reduceMany采用n长度序列序列,并返回n长度数组作为输出.如果你能想出一个更好的方法来编写这个函数,请成为我的客人:) 要解码上面的输出: > 48 =我的两个输入字符串的长度之和.注意原始字符串是49个字符,但是将它拆分为“|”每个“|”吃一个字符. 第4步:把所有东西放在一起 let pmap f l = seq [for a in l -> async { return f a } ] |> Async.Parallel |> Async.RunSynchronously let fuse functionList input = [ for f in functionList -> f input] let (|>>) input functionList = pmap (fuse functionList) input let reduceMany f input = input |> Seq.reduce (fun acc x -> [for (a,b) in Seq.zip acc x -> f a b ]) let countOccurrences compareChar source = source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) let length (s : string) = s.Length let testData = "Juliet is awesome|Someone should give her a medal".Split('|') let testOutput = testData |>> [length; countOccurrences 'J'; countOccurrences 'o'] |> reduceMany (+) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |