加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

golang常见的几种并发模型框架

发布时间:2020-12-16 18:06:58 所属栏目:大数据 来源:网络整理
导读:在golang中,经常使用协程做高并发,本文列举了几种常见并发模型。 package mainimport ("fmt""math/rand""os""runtime""sync""sync/atomic""time")type Scenario struct {Name stringDescription []stringExamples []stringRunExample func()}var s1 = Scen

在golang中,经常使用协程做高并发,本文列举了几种常见并发模型。

package main

import (
	"fmt"
	"math/rand"
	"os"
	"runtime"
	"sync"
	"sync/atomic"
	"time"
)

type Scenario struct {
	Name        string
	Description []string
	Examples    []string
	RunExample  func()
}

var s1 = &Scenario{
	Name: "s1",Description: []string{
		"简单并发执行任务",},Examples: []string{
		"比如并发的请求后端某个接口",RunExample: RunScenario1,}

var s2 = &Scenario{
	Name: "s2",Description: []string{
		"持续一定时间的高并发模型",Examples: []string{
		"在规定时间内,持续的高并发请求后端服务, 防止服务死循环",RunExample: RunScenario2,}

var s3 = &Scenario{
	Name: "s3",Description: []string{
		"基于大数据量的并发任务模型,goroutine worker pool",Examples: []string{
		"比如技术支持要给某个客户删除几个TB/GB的文件",RunExample: RunScenario3,}

var s4 = &Scenario{
	Name: "s4",Description: []string{
		"等待异步任务执行结果(goroutine+select+channel)",Examples: []string{
		"",RunExample: RunScenario4,}

var s5 = &Scenario{
	Name: "s5",Description: []string{
		"定时的反馈结果(Ticker)",Examples: []string{
		"比如测试上传接口的性能,要实时给出指标: 吞吐率,IOPS,成功率等",RunExample: RunScenario5,}

var Scenarios []*Scenario

func init() {
	Scenarios = append(Scenarios,s1)
	Scenarios = append(Scenarios,s2)
	Scenarios = append(Scenarios,s3)
	Scenarios = append(Scenarios,s4)
	Scenarios = append(Scenarios,s5)
}

// 常用的并发与同步场景
func main() {
	if len(os.Args) == 1 {
		fmt.Println("请选择使用场景 ==> ")
		for _,sc := range Scenarios {
			fmt.Printf("场景: %s,",sc.Name)
			printDescription(sc.Description)
		}
		return
	}
	for _,arg := range os.Args[1:] {
		sc := matchScenario(arg)
		if sc != nil {
			printDescription(sc.Description)
			printExamples(sc.Examples)
			sc.RunExample()
		}
	}
}

func printDescription(str []string) {
	fmt.Printf("场景描述: %s n",str)
}

func printExamples(str []string) {
	fmt.Printf("场景举例: %s n",str)
}

func matchScenario(name string) *Scenario {
	for _,sc := range Scenarios {
		if sc.Name == name {
			return sc
		}
	}
	return nil
}

var doSomething = func(i int) string {
	time.Sleep(time.Millisecond * time.Duration(10))
	fmt.Printf("Goroutine %d do things .... n",i)
	return fmt.Sprintf("Goroutine %d",i)
}

var takeSomthing = func(res string) string {
	time.Sleep(time.Millisecond * time.Duration(10))
	tmp := fmt.Sprintf("Take result from %s.... n",res)
	fmt.Println(tmp)
	return tmp
}

// 场景1: 简单并发任务

func RunScenario1() {
	count := 10
	var wg sync.WaitGroup

	for i := 0; i < count; i++ {
		wg.Add(1)
		go func(index int) {
			defer wg.Done()
			doSomething(index)
		}(i)
	}

	wg.Wait()
}

// 场景2: 按时间来持续并发

func RunScenario2() {
	timeout := time.Now().Add(time.Second * time.Duration(10))
	n := runtime.NumCPU()

	waitForAll := make(chan struct{})
	done := make(chan struct{})
	concurrentCount := make(chan struct{},n)

	for i := 0; i < n; i++ {
		concurrentCount <- struct{}{}
	}

	go func() {
		for time.Now().Before(timeout) {
			<-done
			concurrentCount <- struct{}{}
		}

		waitForAll <- struct{}{}
	}()

	go func() {
		for {
			<-concurrentCount
			go func() {
				doSomething(rand.Intn(n))
				done <- struct{}{}
			}()
		}
	}()

	<-waitForAll
}

// 场景3:以 worker pool 方式 并发做事/发送请求

func RunScenario3() {
	numOfConcurrency := runtime.NumCPU()
	taskTool := 10
	jobs := make(chan int,taskTool)
	results := make(chan int,taskTool)
	var wg sync.WaitGroup

	// workExample
	workExampleFunc := func(id int,jobs <-chan int,results chan<- int,wg *sync.WaitGroup) {
		defer wg.Done()
		for job := range jobs {
			res := job * 2
			fmt.Printf("Worker %d do things,produce result %d n",id,res)
			time.Sleep(time.Millisecond * time.Duration(100))
			results <- res
		}
	}

	for i := 0; i < numOfConcurrency; i++ {
		wg.Add(1)
		go workExampleFunc(i,jobs,results,&wg)
	}

	totalTasks := 100

	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < totalTasks; i++ {
			n := <-results
			fmt.Printf("Got results %d n",n)
		}
		close(results)
	}()

	for i := 0; i < totalTasks; i++ {
		jobs <- i
	}
	close(jobs)
	wg.Wait()
}

// 场景4: 等待异步任务执行结果(goroutine+select+channel)

func RunScenario4() {
	sth := make(chan string)
	result := make(chan string)
	go func() {
		id := rand.Intn(100)
		for {
			sth <- doSomething(id)
		}
	}()
	go func() {
		for {
			result <- takeSomthing(<-sth)
		}
	}()

	select {
	case c := <-result:
		fmt.Printf("Got result %s ",c)
	case <-time.After(time.Duration(30 * time.Second)):
		fmt.Errorf("指定时间内都没有得到结果")
	}
}

var doUploadMock = func() bool {
	time.Sleep(time.Millisecond * time.Duration(100))
	n := rand.Intn(100)
	if n > 50 {
		return true
	} else {
		return false
	}
}

// 场景5: 定时的反馈结果(Ticker)
// 测试上传接口的性能,要实时给出指标: 吞吐率,成功率等

func RunScenario5() {
	totalSize := int64(0)
	totalCount := int64(0)
	totalErr := int64(0)

	concurrencyCount := runtime.NumCPU()
	stop := make(chan struct{})
	fileSizeExample := int64(10)

	timeout := 10 // seconds to stop

	go func() {
		for i := 0; i < concurrencyCount; i++ {
			go func(index int) {
				for {
					select {
					case <-stop:
						return
					default:
						break
					}

					res := doUploadMock()
					if res {
						atomic.AddInt64(&totalCount,1)
						atomic.AddInt64(&totalSize,fileSizeExample)
					} else {
						atomic.AddInt64(&totalErr,1)
					}
				}
			}(i)
		}
	}()

	t := time.NewTicker(time.Second)
	index := 0
	for {
		select {
		case <-t.C:
			index++
			tmpCount := atomic.LoadInt64(&totalCount)
			tmpSize := atomic.LoadInt64(&totalSize)
			tmpErr := atomic.LoadInt64(&totalErr)
			fmt.Printf("吞吐率: %d,成功率: %d n",tmpSize/int64(index),tmpCount*100/(tmpCount+tmpErr))
			if index > timeout {
				t.Stop()
				close(stop)
				return
			}
		}

	}
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读