加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

GoLang之Concurrency顺序管道模式

发布时间:2020-12-16 18:57:50 所属栏目:大数据 来源:网络整理
导读:2013-12-14 wcdj 本文介绍go利用管道如何进行并发计算,需要注意go的管道是双向的,而UNIX管道是单向的。 PS: 在测试时自己建立了一个后缀为_test.go的文件,build后会提示如下错误: 一句话解释:在go中文件名后缀为_test.go的都是单元测试文件。 具体可参

2013-12-14 wcdj


本文介绍go利用管道如何进行并发计算,需要注意go的管道是双向的,而UNIX管道是单向的。

PS: 在测试时自己建立了一个后缀为_test.go的文件,build后会提示如下错误:

一句话解释:在go中文件名后缀为_test.go的都是单元测试文件。

具体可参考:http://segmentfault.com/q/1010000000159135


本例参考《go语言程序设计》第七章的一个例子,并添加了一些注释。

package main

import (
	"flag"
	"fmt"
	"log"
	"os"
	"path/filepath"
	"runtime"
	"strings"
)

func main() {

	// Use all the machine's cores
	runtime.GOMAXPROCS(runtime.NumCPU())
	log.SetFlags(0)

	// 处理命令行参数
	algorithm,minSize,maxSize,suffixes,files := handleCommandLine()

	// 开始计算操作
	if algorithm == 1 {
		// 算法1是并行计算,通过创建各个的goroutine

		// step1: 先通过source函数处理文件列表,并把处理结果返回到管道里
		// step2: 将符合后缀的文件放到管道里
		// step3: 将符合大小的文件放到管道里
		// step4: 从管道获取结果数据
		sink(filterSize(minSize,filterSuffixes(suffixes,source(files))))
	} else {

		// 算法2是串行计算
		channel1 := source(files)
		channel2 := filterSuffixes(suffixes,channel1)
		channel3 := filterSize(minSize,channel2)
		sink(channel3)
	}
}

// 命令行参数解析操作
func handleCommandLine() (algorithm int,maxSize int64,files []string) {

	// 将命令行参数绑定到对应的变量中
	// algorithm默认为1
	flag.IntVar(&algorithm,"algorithm",1,"1 or 2")
	// minSize和maxSize默认为-1,表示没有限制
	flag.Int64Var(&minSize,"min",-1,"minimum file size (-1 means no minimum)")
	flag.Int64Var(&maxSize,"max","maximum file size (-1 means no maximum)")
	// suffixes后缀列表默认为空
	var suffixesOpt *string = flag.String("suffixes","","comma-separated list of file suffixes")

	// 命令行预处理
	flag.Parse()

	if algorithm != 1 && algorithm != 2 {
		algorithm = 1
	}
	if minSize > maxSize && maxSize != -1 {
		// Fatalln is equivalent to Println() followed by a call to os.Exit(1)
		log.Fatalln("minimum size must be < maximum size")
	}

	// 将后缀列表用逗号分隔,返回suffixes后缀切片
	suffixes = []string{}
	if *suffixesOpt != "" {
		suffixes = strings.Split(*suffixesOpt,",")
	}

	// Args returns the non-flag command-line arguments
	// 认为非命令选项的参数全为文件参数
	files = flag.Args()
	return algorithm,files
}

// 创建管道,处理文件列表并把结果返回到管道里
func source(files []string) <-chan string {
	out := make(chan string,1000)
	go func() {
		for _,filename := range files {
			out <- filename
		}
		close(out)
	}()
	return out
}

// 将符合后缀的文件放到管道里
// 根据后缀切片处理管道里的文件,同样再把结果返回到管道里
// make the buffer the same size as for files to maximize throughput
func filterSuffixes(suffixes []string,in <-chan string) <-chan string {
	out := make(chan string,cap(in))
	go func() {
		for filename := range in {

			// 没有限制后缀的话,则直接将文件塞到管道里
			if len(suffixes) == 0 {
				out <- filename
				continue
			}

			// 获取文件列表的后缀,且全部转换为小写
			// Ext returns the file name extension used by path. The extension is the suffix beginning at the final dot in the final element of path; it is empty if there is no dot
			ext := strings.ToLower(filepath.Ext(filename))
			for _,suffix := range suffixes {
				if ext == suffix {
					out <- filename
					break
				}
			}
		}
		close(out)
	}()
	return out
}

// 将符合文件大小的文件放到管道里
// make the buffer the same size as for files to maximize throughput
func filterSize(minimum,maximum int64,cap(in))
	go func() {
		for filename := range in {

			// 对文件大小没有限制,直接将文件塞到管道里
			if minimum == -1 && maximum == -1 {
				out <- filename // don't do a stat call it not needed
				continue
			}

			// 使用操作系统的接口获取文件大小等信息
			// Stat returns a FileInfo describing the named file. If there is an error,it will be of type *PathError
			/*
							type FileInfo interface {
				        Name() string       // base name of the file
				        Size() int64        // length in bytes for regular files; system-dependent for others
				        Mode() FileMode     // file mode bits
				        ModTime() time.Time // modification time
				        IsDir() bool        // abbreviation for Mode().IsDir()
				        Sys() interface{}   // underlying data source (can return nil)
				}
			*/
			finfo,err := os.Stat(filename)
			if err != nil {
				continue // ignore files we can't process
			}
			size := finfo.Size()
			if (minimum == -1 || minimum > -1 && minimum <= size) &&
				(maximum == -1 || maximum > -1 && maximum >= size) {
				out <- filename
			}
		}
		close(out)
	}()
	return out
}

// 从管道获取结果数据
func sink(in <-chan string) {
	for filename := range in {
		fmt.Println(filename)
	}
}

/*
output:
mba:go gerryyang$ ./filter_t -min 1 -suffixes ".cpp" ../c++11/range_for.cpp ../c++11/test ../c++11/test.cpp routines.go
../c++11/range_for.cpp
../c++11/test.cpp
mba:go gerryyang$ ./filter_t -min 1 -max -2 -suffixes ".cpp" ../c++11/range_for.cpp ../c++11/test ../c++11/test.cpp routines.go jjjj 
minimum size must be < maximum size
*/


在网上发现一个go的学习站点,有空可以逛下:http://blog.studygolang.com/

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读