Go并发模式:管道和取消
Go并发模式:管道和取消原地址:http://air.googol.im/2014/03/15/go-concurrency-patterns-pipelines-and-cancellation.html 译自http://blog.golang.org/pipelines。 这是Go官方blog的一篇文章,介绍了如何使用Go来编写并发程序,并按照程序的演化顺序,介绍了不同模式遇到的问题以及解决的问题。主要解释了用管道模式链接不同的线程,以及如何在某个线程取消工作时,保证所有线程以及管道资源的正常回收。 Go并发模式:管道和取消 作者:Sameer Ajmani,blog.golang.org,写于2014年3月13日。 介绍Go本身提供的并发特性,可以轻松构建用于处理流数据的管道,从而高效利用I/O和多核CPU。这篇文章就展示了这种管道的例子,并关注当操作失败时要处理的一些细节,并介绍了如何干净的处理错误的技巧。 什么是管道?Go语言里没有明确定义管道,而只是把管道当作一类并发程序。简单来说,管道是一系列由channel联通的状态(stage),而每个状态是一组运行相同函数的Goroutine。每个状态上,Goroutine
每个语态都会有任意个流入或者流出channel,除了第一个状态(只有流出channel)和最后一个状态(只有流入channel)。第一个状态有时被称作源或者生产者;最后一个状态有时被称作槽(sink)或者消费者。 我们先从一个简单的管道例子开始解释这些想法和技术。之后,我们再来看一些更真实的例子。 求平方数考虑一个管道和三个状态。 第一个状态, funcgen(nums...int)<-chanint{ out:=make(chanint)gofunc(){for_,n:=rangenums{ out<-n }close(out) }()returnout } 第二个状态, funcsq(in<-chanint)<-chanint{ out:=make(chanint)gofunc(){forn:=rangein{ out<-n*n }close(out) }()returnout } 主函数建立起管道,并执行最终的状态:从第二个状态接收所有的数值并打印,直到channel被关闭: funcmain(){//建立管道 c:=gen(2,3) out:=sq(c)//产生输出 fmt.Println(<-out)//4 fmt.Println(<-out)//9} 因为 funcmain(){//建立管道并产生输出 forn:=rangesq(sq(gen(2,3))){ fmt.Println(n)//16和81 } } 扇出,扇入多个函数可以同时从一个channel接收数据,直到channel关闭,这种情况被称作扇出。这是一种将工作分布给一组工作者的方法,目的是并行使用CPU和I/O。 一个函数同时接收并处理多个channel输入并转化为一个输出channel,直到所有的输入channel都关闭后,关闭输出channel。这种情况称作扇入。 我们可以将我们的管道改为同时执行两个 funcmain(){ in:=gen(2,3)//在两个从in里读取数据的Goroutine间分配sq的工作 c1:=sq(in) c2:=sq(in)//输出从c1和c2合并的数据 forn:=rangemerge(c1,c2){ fmt.Println(n)//4和9,或者9和4 } }
往一个已经关闭的channel输出会产生异常(panic),所以一定要保证所有数据发送完成后再执行关闭。 funcmerge(cs...<-chanint)<-chanint{varwgsync.WaitGroup out:=make(chanint)//为cs中每个输入channel启动输出Goroutine。output从c中复制数值,直到c被关闭 //之后调用wg.Done output:=func(c<-chanint){forn:=rangec{ out<-n } wg.Done() } wg.Add(len(cs))for_,c:=rangecs{gooutput(c) }//启动一个Goroutine,当所有outputGoroutine都工作完后(wg.Done),关闭out, //保证只关闭一次。这个Goroutine必须在wg.Add之后启动 gofunc(){ wg.Wait()close(out) }()returnout } 突然关闭我们的管道函数里有个模式:
这个模式使得每个接收状态可以写为一个 但是实际的管道,状态不能总是接收所有的流入数值。有时这是设计决定的:接收者可能只需要一部分数值做进一步处理。更常见的情况是,一个状态会由于从早先的状态流入的数值有误而退出。不管哪种情况,接收者都不应该继续等待剩下的数值,而且我们希望早先的状态可以停止生产后续状态不需要的数据。 在我们的管道例子里,如果一个状态无法处理所有的流入数值,试图发送那些数值的Goroutine会被永远阻塞住: //处理输出的第一个数值 out:=merge(c1,c2) fmt.Println(<-out)//4或者9 return //由于我们不再接收从out输出的第二个数值,其中一个输出Goroutine会由于试图发送数值而挂起} 这是资源泄漏:Goroutine会占用内存和运行时资源,而且Goroutine栈里的堆引用会一直持有数据,这些数据无法被垃圾回收。Goroutine本身也无法被垃圾回收,它们必须靠自己退出(而不是被其他人杀死)。 即便下游的状态无法接收所有的流入数值,我们依然需要让管道里的上游状态正常退出。一种方法是修改流出channel,使其含有缓冲区。缓冲区可以持有固定数量的数值,当缓冲区有空间时,发送操作会立刻完成(不会产生阻塞)。 在创建channel时,如果已经知道要发送数值的数量,缓冲区可以简化代码。比如,我们可以让 funcgen(nums...int)<-chanint{ out:=make(chanint,len(nums))for_,n:=rangenums{ out<-n }close(out)returnout } 回到我们管道的阻塞问题上来,我们可以考虑给 funcmerge(cs...<-chanint)<-chanint{varwgsync.WaitGroup out:=make(chanint,1)//1个空间足够应付未读的输入 //...其余未变... 这个改动当然修正了程序中阻塞Goroutine的问题,但这不是好的代码。缓冲区的大小为1,依赖于我们已经知道我们将要 不使用缓冲区的话,我们需要提供一种方法,让下游状态通知发送者,下游状态将停止接收输入。 明确的取消当 funcmain(){ in:=gen(2,3)//发布sq的工作到两个都从in里读取数据的Goroutine c1:=sq(in) c2:=sq(in)//处理来自output的第一个数值 done:=make(chanstruct{},2) out:=merge(done,c1,c2) fmt.Println(<-out)//4或者9 //通知其他发送者,该退出了 done<-struct{}{} done<-struct{}{} } 发送Goroutine将发送操作替换为一个 funcmerge(done<-chanstruct{},cs...<-chanint)<-chanint{varwgsync.WaitGroup out:=make(chanint)//为每个cs中的输入channel启动一个outputGoroutine。outpu从c里复制数值直到c被关闭 //或者从done里接收到数值,之后output调用wg.Done output:=func(c<-chanint){forn:=rangec{select{caSEOut<-n:case<-done: } } wg.Done() }//...其余的不变... 但是这种方法有个问题:下游的接收者需要知道潜在会被阻塞的上游发送者的数量。追踪这些数量不仅枯燥,还容易出错。 我们需要一种方法,让不知道也不限制数量的Goroutine,停止往它们下游发送数据的行为。在Go里,我们可以通过关闭channel来实现这个工作,因为channel被关闭时,接收工作会立刻执行,并产生一个符合类型的0值。 这就是说, funcmain(){//构建donechannel,整个管道里分享done,并在管道退出时关闭这个channel //以此通知所有Goroutine该推出了。 done:=make(chanstruct{})deferclose(done) in:=gen(done,2,3)//发布sq的工作到两个都从in里读取数据的Goroutine c1:=sq(done,in) c2:=sq(done,in)//处理来自output的第一个数值 out:=merge(done,c2) fmt.Println(<-out)//4或者9 //done会通过defer调用而关闭} 管道里的每个状态现在都可以随意的提早退出了: funcsq(done<-chanstruct{},in<-chanint)<-chanint{ out:=make(chanint)gofunc(){deferclose(out)forn:=rangein{select{caSEOut<-n*n:case<-done:return } } }()returnout } 下面列出了构建管道的指南:
管道要么保证足够能存下所有发送数据的缓冲区,要么接收来自接收者明确的要放弃channel的信号,来保证释放发送者。 对目录做摘要来考虑一个更现实的管道。 MD5是一个摘要算法,经常在对文件的校验的时候使用。命令行上使用 我们的程序类似 我们的主函数包含一个 funcmain(){//计算指定目录下所有文件的MD5值,之后按照目录名排序并打印结果 m,err:=MD5All(os.Args[1])iferr!=nil{ fmt.Println(err)return }varpaths[]string forpath:=rangem{ paths=append(paths,path) } sort.Strings(paths)for_,path:=rangepaths{ fmt.Printf("%x%sn",m[path],path) } }
//MD5All读取文件目录root下所有文件,并返回从文件路径到文件内容MD5值的映射。如果扫描目录//出错或者任何操作失败,MD5All返回失败。funcMD5All(rootstring)(map[string][md5.Size]byte,error){ m:=make(map[string][md5.Size]byte) err:=filepath.Walk(root,func(pathstring,infoos.FileInfo,errerror)error{iferr!=nil{returnerr }ifinfo.IsDir(){returnnil } data,err:=ioutil.ReadFile(path)iferr!=nil{returnerr } m[path]=md5.Sum(data)returnnil })iferr!=nil{returnnil,err }returnm,nil} 并行摘要在 typeresultstruct{ pathstring sum[md5.Size]byte errerror }
funcsumFiles(done<-chanstruct{},rootstring)(<-chanresult,<-chanerror){//对每个常规文件,启动一个Goroutine计算文件内容并发送结果到c。发送walk的结果到errc c:=make(chanresult) errc:=make(chanerror,1)gofunc(){varwgsync.WaitGroup err:=filepath.Walk(root,errerror)error{iferr!=nil{returnerr }ifinfo.IsDir(){returnnil } wg.Add(1)gofunc(){ data,err:=ioutil.ReadFile(path)select{casec<-result{path,md5.Sum(data),err}:case<-done: } wg.Done() }()//如果done被关闭了,停止walk select{case<-done:returnerrors.New("walkcanceled")default:returnnil } })//walk已经返回,所有wg.Add的工作都做完了。开启新进程,在所有发送完成后 //关闭c。 gofunc(){ wg.Wait()close(c) }()//因为errc有缓冲区,所以这里不需要select。 errc<-err }()returnc,errc }
funcMD5All(rootstring)(map[string][md5.Size]byte,error){//MD5All在返回时关闭donechannel;这个可能在从c和errc收到所有的值之前被调用 done:=make(chanstruct{})deferclose(done) c,errc:=sumFiles(done,root) m:=make(map[string][md5.Size]byte)forr:=rangec{ifr.err!=nil{returnnil,r.err } m[r.path]=r.sum }iferr:=<-errc;err!=nil{returnnil,nil} 受限的并发在 我们可以通过控制并行读取的文件数量来限制内存的申请。在 第一个状态, funcwalkFiles(done<-chanstruct{},rootstring)(<-chanstring,<-chanerror){ paths:=make(chanstring) errc:=make(chanerror,1)gofunc(){//在Walk之后关闭pathschannel deferclose(paths)//因为errc有缓冲区,所以这里不需要select。 errc<-filepath.Walk(root,errerror)error{iferr!=nil{returnerr }ifinfo.IsDir(){returnnil }select{casepaths<-path:case<-done:returnerrors.New("walkcanceled") }returnnil }) }()returnpaths,errc } 中间的状态启动固定数量的 funcdigester(done<-chanstruct{},paths<-chanstring,cchan<-result){forpath:=rangepaths{ data,err}:case<-done:return } } } 不象之前的例子, //启动固定数量的Goroutine来读取并对文件做摘要。 c:=make(chanresult)varwgsync.WaitGroupconstnumDigesters=20 wg.Add(numDigesters)fori:=0;i<numDigesters;i++{gofunc(){ digester(done,paths,c) wg.Done() }() }gofunc(){ wg.Wait()close(c) }() 我们也可以让每个 最终从 m:=make(map[string][md5.Size]byte)forr:=rangec{ifr.err!=nil{returnnil,r.err } m[r.path]=r.sum }//检查Walk是否失败 iferr:=<-errc;err!=nil{returnnil,nil} 结论这篇文章展示了使用Go构建流数据管道的技术。要慎重处理这种管道产生的错误,因为管道里的每个状态都可能因为向下游发送数值而阻塞,而下游的状态却不再关心输入的数据。我们展示了如何将关闭channel作为“完成”信号广播给所有由管道启动的Goroutine,并且定义了正确构建管道的指南。 进一步阅读: Go并发模式(视频)展示了Go的并发特性的基础知识,并演示了应用这些知识的方法。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |