golang中job队列实现方法
go语言通过使用goroutine和channel,可以非常方便的执行异步处理操作。 THE “NO-JOB-QUEUE” JOB QUEUE如果仅仅只需要执行一个异步操作,而不需要job queue,那么可以用下面代码 go process(job)
但是如果我们需要控制同时工作的job的数量或者对生产者的生产进行限制,就需要使用job queue. THE SIMPLEST JOB QUEUE下面是一个简单的job queue实现,worker从job queue中获取一个job来处理。 func worker(jobChan <-chan Job) {
for job := range jobChan {
process(job)
}
}
// make a channel with a capacity of 100.
jobChan := make(chan Job, 100)
// start the worker
go worker(jobChan)
// enqueue a job
jobChan <- job
上面代码,创建一个Job对象的channel用来存放和传递job,该channel在创建时设置容量为100.然后启动一个worker goroutine从channel中抽取一个job进行处理,worker一次处理一个job。通过<- job往channel中加入新的job。channerl中数据的in和out是线程安全的,开发人员无需担心互斥。 PRODUCER THROTTLINGjobChan创建时拥有100的容量,那么如果这个channel已经有了100个job,再执行 // enqueue a job
jobChan <- job
操作时,这个操作就会阻塞。这个模式可以帮助我们限制生产者生产数据的数量,避免生产的数据过多。 ENQUEUEING WITHOUT BLOCKING实现非阻塞的生产者模式,我们可以使用select // TryEnqueue tries to enqueue a job to the given job channel. Returns true if
// the operation was successful,and false if enqueuing would not have been
// possible without blocking. Job is not enqueued in the latter case.
func TryEnqueue(job Job,jobChan <-chan Job) bool {
select {
case jobChan <- job:
return true
default:
return false
}
}
当通道jobchan已经满的时候,jobChan <- job: 阻塞,程序跳到default中执行。 STOPPING THE WORKER如果job已经完了,如果优雅的告知worker停止,而不是阻塞的等待呢?我们可以使用close去关闭channel。 close(jobChan)
然后woker的代码就可以变为 for job := range jobChan {...}
在channerl关闭之前进入到jobChan的job会被woker读取出来进行处理,最后这个循环会自动退出。 WAITING FOR THE WORKER使用close函数,是主动让worker去停止,但是如果想要等待woker处理完,我们就要使用sync.WaitGroup // use a WaitGroup
var wg sync.WaitGroup
func worker(jobChan <-chan Job) {
defer wg.Done()
for job := range jobChan {
process(job)
}
}
// increment the WaitGroup before starting the worker
wg.Add(1)
go worker(jobChan)
// to stop the worker,first close the job channel
close(jobChan)
// then wait using the WaitGroup
wg.Wait()
wg.Add(1)让WaitGroup增加1,wg.Done()让WaitGroup减1,wg.Wait()会一直阻塞除非变为0 。 WAITING WITH A TIMEOUT如果不想要WaitGroup一直等,而是有个超时时间,我们可以用select实现 // WaitTimeout does a Wait on a sync.WaitGroup object but with a specified
// timeout. Returns true if the wait completed without timing out,false
// otherwise.
func WaitTimeout(wg *sync.WaitGroup,timeout time.Duration) bool {
ch := make(chan struct{})
go func() {
wg.Wait()
close(ch)
}()
select {
case <-ch:
return true
case <-time.After(timeout):
return false
}
}
// now use the WaitTimeout instead of wg.Wait()
WaitTimeout(&wg, 5 * time.Second)
如果WaitGroup先返回,那么close(ch)执行后,case<- ch:有效就会执行,否则当timeout到达后,case <-time.After(timeout): CANCELLING WORKERS如果想要worker立刻停止当前工作,而不是之前那样worker还会处理剩下的job,我们可以利用context // create a context that can be cancelled
ctx,cancel := context.WithCancel(context.Background())
// start the goroutine passing it the context
go worker(ctx,jobChan)
func worker(ctx context.Context,jobChan <-chan Job) {
for {
select {
case <-ctx.Done():
return
case job := <-jobChan:
process(job)
}
}
}
// Invoke cancel when the worker needs to be stopped. This *does not* wait
// for the worker to exit.
cancel()
首先 ctx,cancel := context.WithCancel(context.Background()) 创建一个context对象以及相关的cancel,当cancel被调用后,ctx.Done()变为可读,worker就会返回 CANCELLING WORKERS WITHOUT CONTEXT// create a cancel channel
cancelChan := make(chan struct{})
// start the goroutine passing it the cancel channel
go worker(jobChan,cancelChan)
func worker(jobChan <-chan Job,cancelChan <-chan struct{}) {
for {
select {
case <-cancelChan:
return
case job := <-jobChan:
process(job)
}
}
}
// to cancel the worker,close the cancel channel
close(cancelChan)
A POOL OF WORKERS使用多个worker可以提高程序的并发行,最简单的做法如下 for i:=0; i<workerCount; i++ {
go worker(jobChan)
}
然后多个worker会尝试从同一个channel中获取job,这个操作是安全的,作为开发者可以放心。 for i:=0; i<workerCount; i++ {
wg.Add(1)
go worker(jobChan)
}
// wait for all workers to exit
wg.Wait()
如果要cancel这些worker,可以再单独使用一个channel,用来给这些worker发通知,类似CANCELLING WORKERS WITHOUT CONTEXT那样 // create cancel channel
cancelChan := make(chan struct{})
// pass the channel to the workers,let them wait on it
for i:=0; i<workerCount; i++ {
go worker(jobChan,cancelChan)
}
// close the channel to signal the workers
close(cancelChan) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |