并发并行与Go并发编程
并发与并行
GreenThread
几个概念
GoroutineGoroutine 其实就是前面 GreenThread 系列解决方案的一种演进和实现。
Goroutine 调度器 这个图一般讲 Goroutine 调度器的地方都会引用,想要仔细了解的可以看看原博客(小编:点击阅读原文获取)。这里只说明几点:
Goroutine 是银弹么? Goroutine 很大程度上降低了并发的开发成本,是不是我们所有需要并发的地方直接 go func 就搞定了呢? Go 通过 Goroutine 的调度解决了 CPU 利用率的问题。但遇到其他的瓶颈资源如何处理?比如带锁的共享资源,比如数据库连接等。互联网在线应用场景下,如果每个请求都扔到一个 Goroutine 里,当资源出现瓶颈的时候,会导致大量的 Goroutine 阻塞,最后用户请求超时。这时候就需要用 Goroutine 池来进行控流,同时问题又来了:池子里设置多少个 Goroutine 合适? 所以这个问题还是没有从更本上解决。 go没有严格的内置的logical processor数量限制,但是go的runtime默认限制了每个program最多使用10,000个线程,可以通过SetMaxThreads修改. 下图展示了Concurrency和Parallelism的区别goroutine使用go块go的用法很简单,如下. 如果没有最外面的括号{}(),会显示go块必须是一个函数调用.没有()只是一个函数的声明,有了()是一个调用(没有参数的) go func() {
for _,n := range nums {
out <- n
}
close(out)
}()
channelchannel默认上是阻塞的,也就是说,如果Channel满了,就阻塞写,如果Channel空了,就阻塞读。于是,我们就可以使用这种特性来同步我们的发送和接收端。
默认发送和接收操作是阻塞的,直到发送方和接收方都准备完毕。 func main() {
messages := make(chan string)
go func() { messages <- "ping" }()
msg := <-messages
fmt.Println(msg)
}
所以你要是这么写:是一辈子都不会执行到print的(会死锁) func main() {
messages := make(chan string)
messages <- "ping"
msg := <-messages
fmt.Println(msg)
}
所以在一个go程中,发送 select {
case messages <- "msg":
fmt.Println("sent message")
default:
fmt.Println("no message sent")
}
range用于channel的range是阻塞的.下面程序会显示deadloc,去掉注释就好了. queue := make(chan string,2)
//queue <- "one"
//queue <- "two"
//close(queue)
for elem := range queue {
fmt.Println(elem)
}
通道缓冲加了缓存之后,就像你向channel发送消息的时候( 但是你要这么写,利用通道缓冲,就可以.无缓冲的意味着只有在对应的接收( func main() {
message := make(chan string,1)
message <- "ping"
msg := <-message
fmt.Print(msg)
}
要是多发一个 func main() {
messages := make(chan string,2)
messages <- "buffered"
messages <- "channel"
fmt.Println(<-messages)
fmt.Println(<-messages)
}
通道同步使用通道同步,如果你把 func worker(done chan bool) {
fmt.Print("working...")
time.Sleep(time.Second) // working
fmt.Println("done")
done <- true
}
func main() {
done := make(chan bool,1)
go worker(done)
<-done //blocking 阻塞在这里,知道worker执行完毕
}
发送方向可以指定这个通道是不是只用来发送或者接收值。这个特性提升了程序的类型安全性。pong 函数允许通道(pings)来接收数据,另一通道(pongs)来发送数据。 func ping(pings chan<- string,msg string) {
pings <- msg
}
func pong(pings <-chan string,pongs chan<- string) {
msg := <-pings
pongs <- msg
}
func main() {
pings := make(chan string,1)
pongs := make(chan string,1)
ping(pings,"passed message")
pong(pings,pongs)
fmt.Println(<-pongs)
}
selectGo 的select 让你可以同时等待多个通道操作。(poll/epoll?) 注意select 要么写个死循环用超时,要不就定好次数.或者加上default让select变成非阻塞的 go func() {
time.Sleep(time.Second * 1)
c1 <- "one"
}()
go func() {
time.Sleep(time.Second * 2)
c2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received",msg1)
case msg2 := <-c2:
fmt.Println("received",msg2)
}
}
超时处理其中 select {
case res := <-c1:
fmt.Println(res)
case <-time.After(time.Second * 1):
fmt.Println("timeout 1")
}
非阻塞通道操作default,当监听的channel都没有准备好的时候,默认执行的. select {
case msg := <-messages:
fmt.Println("received message",msg)
default:
fmt.Println("no message received")
}
可以使用 select 语句来检测 chan 是否已经满了 ch := make (chan int,1)
ch <- 1
select {
case ch <- 2:
default:
fmt.Println("channel is full !")
}
通道关闭一个非空的通道也是可以关闭的,但是通道中剩下的值仍然可以被接收到 queue := make(chan string,2)
queue <- "one"
queue <- "two"
close(queue)
for elem := range queue {
fmt.Println(elem)
}
定时器在未来某一刻执行一次时使用的 定时器表示在未来某一时刻的独立事件。你告诉定时器需要等待的时间,然后它将提供一个用于通知的通道。可以显示的关闭 timer1 := time.NewTimer(time.Second * 2)
<-timer1.C
打点器当你想要在固定的时间间隔重复执行,定时的执行,直到我们将它停止 func main() {
//打点器和定时器的机制有点相似:一个通道用来发送数据。这里我们在这个通道上使用内置的 range 来迭代值每隔500ms 发送一次的值。
ticker := time.NewTicker(time.Millisecond * 500)
go func() {
for t := range ticker.C {
fmt.Println("Tick at",t)
}
}()
//打点器可以和定时器一样被停止。一旦一个打点停止了,将不能再从它的通道中接收到值。我们将在运行后 1600ms停止这个打点器。
time.Sleep(time.Millisecond * 1600)
ticker.Stop()
fmt.Println("Ticker stopped")
}
生成器类似于提供了一个服务,不过只是适用于调用不是很频繁 func rand_generator_2() chan int {
out := make(chan int)
go func() {
for {
out <- rand.Int()
}
}()
return out
}
func main() {
// 生成随机数作为一个服务
rand_service_handler := rand_generator_2()
fmt.Printf("%dn",<-rand_service_handler)
}
多路复用Apache使用处理每个连接都需要一个进程,所以其并发性能不是很好。而Nighx使用多路复用的技术,让一个进程处理多个连接,所以并发性能比较好。 多路复用技术可以用来整合多个通道。提升性能和操作的便捷。 其实就是整合了多个上面的生成器 func rand_generator_3() chan int {
rand_generator_1 := rand_generator_2()
rand_generator_2 := rand_generator_2()
out := make(chan int)
go func() {
for {
//读取生成器1中的数据,整合
out <- <-rand_generator_1
}
}()
go func() {
for {
//读取生成器2中的数据,整合
out <- <-rand_generator_2
}
}()
return out
}
Furture技术可以在不准备好参数的情况下调用函数。函数调用和函数参数准备这两个过程可以完全解耦。可以在调用的时候不关心数据是否准备好,返回值是否计算好的问题。让程序中的组件在准备好数据的时候自动跑起来。 这个最后取得 Furture技术可以和各个其他技术组合起来用。可以通过多路复用技术,监听多个结果Channel,当有结果后,自动返回。也可以和生成器组合使用,生成器不断生产数据,Furture技术逐个处理数据。Furture技术自身还可以首尾相连,形成一个并发的pipe filter。这个pipe filter可以用于读写数据流,操作数据流。 type query struct {
sql chan string
result chan string
}
func execQuery(q query) {
go func() {
sql := <-q.sql
q.result <- "get " + sql
}()
}
func main() {
q := query{make(chan string,1),make(chan string,1)}
execQuery(q)
//准备参数
q.sql <- "select * from table"
fmt.Println(<-q.result)
}
Chain Filter技术程序创建了10个Filter,每个分别过滤一个素数,所以可以输出前10个素数。 func Generate(ch chan<- int) {
for i := 2; ; i++ {
ch <- i
}
}
func Filter(in <-chan int,out chan<- int,prime int) {
for {
i := <-in // Receive value from 'in'.
if i%prime != 0 {
out <- i // Send 'i' to 'out'.
}
}
}
// The prime sieve: Daisy-chain Filter processes.
func main() {
ch := make(chan int) // Create a new channel.
go Generate(ch) // Launch Generate goroutine.
for i := 0; i < 10; i++ {
prime := <-ch
print(prime,"n")
ch1 := make(chan int)
go Filter(ch,ch1,prime)
ch = ch1
}
}
共享变量有些时候使用共享变量可以让代码更加简洁 type sharded_var struct {
reader chan int
writer chan int
}
func sharded_var_whachdog(v sharded_var) {//共享变量维护协程
go func() {
var value int = 0
for { //监听读写通道,完成服务
select {
case value = <-v.writer:
case v.reader <- value:
}
}
}()
}
func main() {
v := sharded_var{make(chan int),make(chan int)} //初始化,并开始维护协程
sharded_var_whachdog(v)
fmt.Println(<-v.reader)
v.writer <- 1
fmt.Println(<-v.reader)
}
Concurrency patterns下面介绍了一些常用的并发模式. Runner当你的程序会运行在后台,可以是cron job或者是Iron.io这样的worker-based云环境.这个程序就可以监控和中断你的程序,如果你的程序运行的太久了. 定义了三个channel来通知任务状态.
tasks是一个函数类型的slice,你可以往里面存放签名为func funcName(id int){}的函数,作为你的任务. New方法里的interrupt channel buffer设置为1,也就是说当用户重复ctrl+c的时候,程序也只会收到一个信号,其他的信号会被丢弃. 在run()方法中,在开始执行任务前( 在Start()方法中,在go块中执行run()方法,任何当前的goroutine会阻塞在select这边,直到收到run()返回的complete channel或者超时返回. // Runner runs a set of tasks within a given timeout and can be shut down on an operating system interrupt.
type Runner struct {
// interrupt channel reports a signal from the operating system.
interrupt chan os.Signal
// complete channel reports that processing is done.
complete chan error
// timeout reports that time has run out.
timeout <-chan time.Time
// tasks holds a set of functions that are executed
// synchronously in index order.
tasks []func(int) } // ErrTimeout is returned when a value is received on the timeout channel. var ErrTimeout = errors.New("received timeout") // ErrInterrupt is returned when an event from the OS is received. var ErrInterrupt = errors.New("received interrupt") // New returns a new ready-to-use Runner. func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal,complete: make(chan error),timeout: time.After(d),}
}
// Add attaches tasks to the Runner. A task is a function that takes an int ID. ...表示可以传入多个参数
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks,tasks...)
}
// Start runs all tasks and monitors channel events.
func (r *Runner) Start() error {
// We want to receive all interrupt based signals.
signal.Notify(r.interrupt,os.Interrupt)
// Run the different tasks on a different goroutine.
go func() {
r.complete <- r.run()
}()
select {
// Signaled when processing is done.
case err := <-r.complete:
return err
// Signaled when we run out of time.
case <-r.timeout:
return ErrTimeout
}
}
// run executes each registered task.
func (r *Runner) run() error {
for id,task := range r.tasks {
// Check for an interrupt signal from the OS.
if r.gotInterrupt() {
return ErrInterrupt
}
// Execute the registered task.
task(id)
}
return nil
}
// gotInterrupt verifies if the interrupt signal has been issued.
func (r *Runner) gotInterrupt() bool {
select {
// Signaled when an interrupt event is sent.
case <-r.interrupt:
// Stop receiving any further signals.
signal.Stop(r.interrupt)
return true
// Continue running as normal.
default:
return false
}
}
main方法 const timeout = 3 * time.Second
// main is the entry point for the program.
func main() {
log.Println("Starting work.")
// Create a new timer value for this run.
r := runner.New(timeout)
// Add the tasks to be run.
r.Add(createTask(),createTask(),createTask())
// Run the tasks and handle the result.
if err := r.Start(); err != nil {
switch err {
case runner.ErrTimeout:
log.Println("Terminating due to timeout.")
os.Exit(1)
case runner.ErrInterrupt:
log.Println("Terminating due to interrupt.")
os.Exit(2)
}
}
log.Println("Process ended.")
}
// createTask returns an example task that sleeps for the specified
// number of seconds based on the id.
func createTask() func(int) {
return func(id int) {
log.Printf("Processor - Task #%d.",id)
time.Sleep(time.Duration(id) * time.Second)
}
}
Pooling当你有一些特定的资源要共享,比如数据库连接或者内存buffers,这个模式就非常有用 goroutine要用一个资源,就去pool中去拿,用完了就还回去. 例子中的资源是只要实现了io.Closer接口即可.
New函数接受一个用来创建新资源的函数对象( Acquire函数先从pool中取资源,要是取不到用factory新建一个 func (p *Pool) Acquire() (io.Closer,error) {
select {
// Check for a free resource.
case r,_ := <-p.resources:
return r,nil
// Provide a new resource since there are none available.
default:
return p.factory()
}
}
Release函数:如果pool已经关闭,就直接return.否则就向resource这个buffered channel里发送要释放的资源.default语句是如果resource已经满了,就关闭这个pool. Close函数:当程序运行完关闭pool的时候,应该调用Close函数,这个函数首先关闭resource这个buffered channel,然后再把buffered channel中的任务关闭(io.Closer).注意这个加锁. // Pool manages a set of resources that can be shared safely by multiple goroutines.
// The resource being managed must implement the io.Closer interface.
type Pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer,error) closed bool } // ErrPoolClosed is returned when an Acquire returns on a closed pool. var ErrPoolClosed = errors.New("Pool has been closed.") // New creates a pool that manages resources. A pool requires a // function that can allocate a new resource and the size of the pool. func New(fn func() (io.Closer,error),size uint) (*Pool,error) {
if size <= 0 {
return nil,errors.New("Size value too small.")
}
return &Pool{
factory: fn,resources: make(chan io.Closer,size),},nil
}
// Acquire retrieves a resource from the pool.
func (p *Pool) Acquire() (io.Closer,error) {
select {
// Check for a free resource.
case r,ok := <-p.resources:
log.Println("Acquire:","Shared Resource")
if !ok {
return nil,ErrPoolClosed
}
return r,nil
// Provide a new resource since there are none available.
default:
log.Println("Acquire:","New Resource")
return p.factory()
}
}
// Release places a new resource onto the pool.
func (p *Pool) Release(r io.Closer) {
// Secure this operation with the Close operation.
p.m.Lock()
defer p.m.Unlock()
// If the pool is closed,discard the resource.
if p.closed {
r.Close()
return
}
select {
// Attempt to place the new resource on the queue.
case p.resources <- r:
log.Println("Release:","In Queue")
// If the queue is already at cap we close the resource.
default:
log.Println("Release:","Closing")
r.Close()
}
}
// Close will shutdown the pool and close all existing resources.
func (p *Pool) Close() {
// Secure this operation with the Release operation.
p.m.Lock()
defer p.m.Unlock()
// If the pool is already close,don't do anything.
if p.closed {
return
}
// Set the pool as closed.
p.closed = true
// Close the channel before we drain the channel of its
// resources. If we don't do this,we will have a deadlock.
close(p.resources)
// Close the resources
for r := range p.resources {
r.Close()
}
}
main const (
maxGoroutines = 25 // the number of routines to use.
pooledResources = 2 // number of resources in the pool
)
// dbConnection simulates a resource to share.
type dbConnection struct {
ID int32
}
// Close implements the io.Closer interface so dbConnection can be managed by the pool. Close performs any resource release management.
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection",dbConn.ID)
return nil
}
// idCounter provides support for giving each connection a unique id.
var idCounter int32
// createConnection is a factory method that will be called by the pool when a new connection is needed.
func createConnection() (io.Closer,error) {
id := atomic.AddInt32(&idCounter,1)
log.Println("Create: New Connection",id)
return &dbConnection{id},nil
}
// main is the entry point for all Go programs.
func main() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
// Create the pool to manage our connections.
p,err := pool.New(createConnection,pooledResources)
if err != nil {
log.Println(err)
}
// Perform queries using connections from the pool.
for query := 0; query < maxGoroutines; query++ {
// Each goroutine needs its own copy of the query value else they will all be sharing the same query variable.
go func(q int) {
performQueries(q,p)
wg.Done()
}(query)
}
// Wait for the goroutines to finish.
wg.Wait()
// Close the pool.
log.Println("Shutdown Program.")
p.Close()
}
// performQueries tests the resource pool of connections.
func performQueries(query int,p *pool.Pool) {
// Acquire a connection from the pool.
conn,err := p.Acquire()
if err != nil {
log.Println(err)
return
}
// Release the connection back to the pool.
defer p.Release(conn)
// Wait to simulate a query response.
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("Query: QID[%d] CID[%d]n",query,conn.(*dbConnection).ID)
}
WorkNew函数开启了固定个数(maxGoroutines)个goroutine,注意这边work是一个unbuffered channel.这个for range会阻塞直到channel中有值可以取.要是work这个channel被关闭了,这个for range就结束,然后调用wg.Done Run函数提交任务到pool中去 type Worker interface {
Task()
}
// Pool provides a pool of goroutines that can execute any Worker
// tasks that are submitted.
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
// New creates a new work pool.
func New(maxGoroutines int) *Pool {
p := Pool{
work: make(chan Worker),}
p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines; i++ {
go func() {
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
}
return &p
}
// Run submits work to the pool.
func (p *Pool) Run(w Worker) {
p.work <- w
}
// Shutdown waits for all the goroutines to shutdown.
func (p *Pool) Shutdown() {
close(p.work)
p.wg.Wait()
}
main // names provides a set of names to display.
var names = []string{
"steve","bob","mary","therese","jason",}
// namePrinter provides special support for printing names.
type namePrinter struct {
name string
}
// Task implements the Worker interface.
func (m *namePrinter) Task() {
log.Println(m.name)
time.Sleep(time.Second)
}
// main is the entry point for all Go programs.
func main() {
// Create a work pool with 2 goroutines.
p := work.New(2)
var wg sync.WaitGroup
wg.Add(100 * len(names))
for i := 0; i < 100; i++ {
// Iterate over the slice of names.
for _,name := range names {
// Create a namePrinter and provide the
// specific name.
np := namePrinter{
name: name,}
go func() {
// Submit the task to be worked on. When RunTask
// returns we know it is being handled.
p.Run(&np)
wg.Done()
}()
}
}
wg.Wait()
// Shutdown the work pool and wait for all existing work
// to be completed.
p.Shutdown()
}
另一种worker的写法创建一个二级channel系统,一个来queue job,另外一个来控制使用多少个worker来并发操作JobQueue。把工作发送到工作队列中去 var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,JobChannel: make(chan Job),quit: make(chan bool)}
}
// Start method starts the run loop for the worker,listening for a quit channel in case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s",err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
我们创建一个Dispatcher,然后调用Run()函数创建一个worker池子,然后开始监听JobQueue中的job。 type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job,maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
} (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |