Go语言并发目录遍历
发布时间:2020-12-16 09:36:25 所属栏目:大数据 来源:网络整理
导读:在本节中,我们将构建一个程序,根据命令行指定的输入,报告一个或多个目录的磁盘使用情况,类似于 UNIX 的 du 命令。该程序大多数工作是由下面的 walkDir 函数完成,它使用 dirents 辅助函数来枚举目录中的条目,如下所示: // wakjDir 递归地遍历以 dir 为
在本节中,我们将构建一个程序,根据命令行指定的输入,报告一个或多个目录的磁盘使用情况,类似于 UNIX 的du 命令。该程序大多数工作是由下面的 walkDir 函数完成,它使用 dirents 辅助函数来枚举目录中的条目,如下所示:// wakjDir 递归地遍历以 dir 为根目录的整个文件树,并在 filesizes 上发送每个已找到文件的大小 func walkDir(dir string,fileSizes chan<- int64) { for _,entry := range dirents(dir) { if entry.IsDir() { subdir := filepath.Join(dir,entry.Name()) walkDir(subdir,fileSizes) } else { fileSizes <- entry.Size() } } } // dirents 返回 dir 目录中的条目 func dirents(dir string) []os.FileInfo { entries,err := ioutil.ReadDir(dir) if err != nil { fmt.Fprintf(os.Stderr,"du1: %vn",err) return nil } return entries }ioutil.ReadDir 函数返回一个 os.FileInfo 类型的 slice,针对单个文件同样的信息可以通过调用 os.Stat 函数来返回。对每一个子目录,walkDir 递归调用它自己,对于每一个文件,walkDir 发送一条消息到 fileSizes 通道,消息的内容为文件所占用的字节数。 程序的完整代码如下所示,代码中 main 函数使用两个 goroutine,后台 goroutine 调用 walkDir 遍历命令行上指定的每一个目录,最后关闭 fileSizes 通道;主 goroutine 计算从通道中接收的文件的大小的和,最后输出总数。 package main import ( ??? "flag" ??? "fmt" ??? "io/ioutil" ??? "os" ??? "path/filepath" ) func main() { ??? // 确定初始目录 ??? flag.Parse() ??? roots := flag.Args() ??? if len(roots) == 0 { ??????? roots = []string{"."} ??? } ??? // 遍历文件树 ??? fileSizes := make(chan int64) ??? go func() { ??????? for _,root := range roots { ??????????? walkDir(root,fileSizes) ??????? } ??????? close(fileSizes) ??? }() ??? // 输出结果 ??? var nfiles,nbytes int64 ??? for size := range fileSizes { ??????? nfiles++ ??????? nbytes += size ??? } ??? printDiskUsage(nfiles,nbytes) } func printDiskUsage(nfiles,nbytes int64) { ??? fmt.Printf("%d files? %.1f GBn",nfiles,float64(nbytes)/1e9) } // wakjDir 递归地遍历以 dir 为根目录的整个文件树,并在 filesizes 上发送每个已找到的文件的大小 func walkDir(dir string,fileSizes chan<- int64) { ??? for _,entry := range dirents(dir) { ??????? if entry.IsDir() { ??????????? subdir := filepath.Join(dir,entry.Name()) ??????????? walkDir(subdir,fileSizes) ??????? } else { ??????????? fileSizes <- entry.Size() ??????? } ??? } } // dirents 返回 dir 目录中的条目 func dirents(dir string) []os.FileInfo { ??? entries,err := ioutil.ReadDir(dir) ??? if err != nil { ??????? fmt.Fprintf(os.Stderr,err) ??????? return nil ??? } ??? return entries }在输出结果前,程序等待较长时间:
go run main.go D:/code -v 标识的时候周期性的输出当前目录的总和,如果只想看到最终的结果省略-v 即可。package main import ( ??? "flag" ??? "fmt" ??? "io/ioutil" ??? "os" ??? "path/filepath" ??? "time" ) var verbose = flag.Bool("v",false,"显示详细进度") func main() { ??? // ...启动后台 goroutine... ??? // 确定初始目录 ??? flag.Parse() ??? roots := flag.Args() ??? if len(roots) == 0 { ??????? roots = []string{"."} ??? } ??? // 遍历文件树 ??? fileSizes := make(chan int64) ??? go func() { ??????? for _,fileSizes) ??????? } ??????? close(fileSizes) ??? }() ??? // 定期打印结果 ??? var tick <-chan time.Time ??? if *verbose { ??????? tick = time.Tick(500 * time.Millisecond) ??? } ??? var nfiles,nbytes int64 loop: ??? for { ??????? select { ??????? case size,ok := <-fileSizes: ??????????? if !ok { ??????????????? break loop // fileSizes 关闭 ??????????? } ??????????? nfiles++ ??????????? nbytes += size ??????? case <-tick: ??????????? printDiskUsage(nfiles,nbytes) ??????? } ??? } ??? printDiskUsage(nfiles,nbytes) // 最终总数 } func printDiskUsage(nfiles,err) ??????? return nil ??? } ??? return entries }因为这个程序没有使用 range 循环,所以第一个 select 情况必须显式判断 fileSizes 通道是否已经关闭,使用两个返回值的形式进行接收操作。如果通道已经关闭,程序退出循环。标签化的 break 语句将跳出 select 和 for 循环的逻辑。没有标签的 break 只能跳出 select 的逻辑,导致循环的下一次迭代。 运行结果如下所示:
go run main.go -v D: 所以,下面为每一个 walkDir 的调用创建一个新的 goroutine。它使用 sync.WaitGroup 来为当前存活的 walkDir 调用计数,一个 goroutine 在计数器减为 0 的时候关闭 fileSizes 通道。 package main import ( ??? "flag" ??? "fmt" ??? "io/ioutil" ??? "os" ??? "path/filepath" ??? "sync" ??? "time" ) var verbose = flag.Bool("v","显示详细进度") func main() { ??? // ...确定根目录... ??? flag.Parse() ??? // 确定初始目录 ??? roots := flag.Args() ??? if len(roots) == 0 { ??????? roots = []string{"."} ??? } ??? // 并行遍历每一个文件树 ??? fileSizes := make(chan int64) ??? var n sync.WaitGroup ??? for _,root := range roots { ??????? n.Add(1) ??????? go walkDir(root,&n,fileSizes) ??? } ??? go func() { ??????? n.Wait() ??????? close(fileSizes) ??? }() ??? // 定期打印结果 ??? var tick <-chan time.Time ??? if *verbose { ??????? tick = time.Tick(500 * time.Millisecond) ??? } ??? var nfiles,float64(nbytes)/1e9) } func walkDir(dir string,n *sync.WaitGroup,fileSizes chan<- int64) { ??? defer n.Done() ??? for _,entry := range dirents(dir) { ??????? if entry.IsDir() { ??????????? n.Add(1) ??????????? subdir := filepath.Join(dir,entry.Name()) ??????????? go walkDir(subdir,n,fileSizes) ??????? } else { ??????????? fileSizes <- entry.Size() ??????? } ??? } } // sema是一个用于限制目录并发数的计数信号量 var sema = make(chan struct{},20) // dirents返回directory目录中的条目 func dirents(dir string) []os.FileInfo { ??? sema <- struct{}{}??????? // 获取令牌 ??? defer func() { <-sema }() // 释放令牌 ??? entries,"du: %vn",err) ??????? return nil ??? } ??? return entries }尽管系统与系统之间有很多的不同,但是这个版本的速度比前一个版本快几倍。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |