Golang模仿七牛图片处理API
分析需求先看一下qiniu的接口是如何处理图片的,例如先截取视频第一秒的图片,再把图片缩略,最后存储到一个新的key,命令可以这么写 上面的操作算作一个 { "id": "xxxxx","pipeline": "xxx","code": 0,"desc": "The fop was completed successfully","reqid": "xTsAAFnxUbR5J10U","inputBucket": "xxx","inputKey": "xxxxx","items": [ { "cmd": "vframe/jpg/offset/1|imageMogr2/thumbnail/400x|saveas/ZmFtZS1wcml2YXRlOm1vbWVudC9jb3Zlci9zbmFwL3ZpZGVvL2M5YzdjZjQ5LTU3NGQtNGZjMS1iZDFkLTRkYjZkMzlkZWY1Ni8wLzA=","hash": "FhdN6V8EI4vW4XJGALSfxutvMEIv","key": "xx","returnOld": 0 },{ "cmd": "vframe/jpg/offset/1|imageMogr2/thumbnail/400x|imageMogr2/blur/45x8|saveas/ZmFtZS1wcml2YXRlOm1vbWVudC9jb3Zlci9zbmFwL3ZpZGVvL2M5YzdjZjQ5LTU3NGQtNGZjMS1iZDFkLTRkYjZkMzlkZWY1Ni8wLzBfYmx1cg==","hash": "FgNiRzrCsa7TZx1xVSb_4d5TiaK3","key": "xxx","returnOld": 0 } ] } 分解需求这个程序大致需要这么几个部分:
可以把 1 和 2,3 分开来看,1 比较独立,之前写过一个worker的模型,参考的是这篇文章 Handling 1 Million Requests per Minute with Go,比较详细,是用 go channel 作为queue的,我加了一个 beanstalk 作为 queue的 providor。还有一点改进是,文章中只提供了worker数量的设置,我再加了一个参数,设定每个worker可以并行执行的协程数。所以下面主要讲讲3, 2的解决办法 Pipe可以参考这个库 pipe,用法如下: p := pipe.Line( pipe.ReadFile("test.png"),resize(300,300),blur(0.5),) output,err := pipe.CombinedOutput(p) if err != nil { fmt.Printf("%vn",err) } buf := bytes.NewBuffer(output) img,_ := imaging.Decode(buf) imaging.Save(img,"test_a.png") 还是比较方便的,建一个 type Cmd struct { cmd string saveas string ops []Op err error } type Op interface { getPipe() pipe.Pipe } type ResizeOp struct { width,height int } func (c ResizeOp) getPipe() pipe.Pipe { return resize(c.width,c.height) } //使用方法 cmdStr := `file/test.png|thumbnail/x300|blur/20x8` cmd := Cmd{cmdStr,"test_b.png",nil,nil} cmd.parse() cmd.doOps() sync.WaitGroup单个cmd处理解决后,就是多个cmd的并行问题,没啥好想的,直接用 func main() { cmds := []string{} for i := 0; i < 10000; i++ { cmds = append(cmds,fmt.Sprintf("cmd-%d",i)) } results := handleCmds(cmds) fmt.Println(len(results)) // 10000 } func doCmd(cmd string) string { return fmt.Sprintf("cmd=%s",cmd) } func handleCmds(cmds []string) (results []string) { fmt.Println(len(cmds)) //10000 var count uint64 group := sync.WaitGroup{} lock := sync.Mutex{} for _,item := range cmds { // 计数加一 group.Add(1) go func(cmd string) { result := doCmd(cmd) atomic.AddUint64(&count,1) lock.Lock() results = append(results,result) lock.Unlock() // 计数减一 group.Done() }(item) } // 阻塞 group.Wait() fmt.Printf("count=%d n",count) // 10000 return } group本质大概是一个计数器,计数 > 0时, 我们建一个 type BenchCmd struct { cmds []Cmd waitGroup sync.WaitGroup errs []error lock sync.Mutex } func (b *BenchCmd) doCmds() { for _,item := range b.cmds { b.waitGroup.Add(1) go func(cmd Cmd) { cmd.parse() err := cmd.doOps() b.lock.Lock() b.errs = append(b.errs,err) b.lock.Unlock() b.waitGroup.Done() }(item) } b.waitGroup.Wait() } 最后的调用就像这样: var cmds []Cmd cmd_a := Cmd{`file/test.png|thumbnail/x300|blur/20x8`,"test_a.png",nil} cmd_b := Cmd{`file/test.png|thumbnail/500x1000|blur/20x108`,nil} cmd_c := Cmd{`file/test.png|thumbnail/300x300`,"test_c.png",nil} cmds = append(cmds,cmd_a) cmds = append(cmds,cmd_b) cmds = append(cmds,cmd_c) bench := BenchCmd{ cmds: cmds,waitGroup: sync.WaitGroup{},lock: sync.Mutex{},} bench.doCmds() fmt.Println(bench.errs) 这只是一个初级的实验,思考还不够全面,并且只是模仿API,qiniu应该不是这么做的,耦合更低,可能各个Cmd都有各自处理的集群,那 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |