k8s与监控--从telegraf改造谈golang多协程精确控制
从telegraf改造谈golang多协程精确控制前言telegraf是infuxdb公司开源出来的一个基于插件机制的收集metrics的项目。整个架构和elastic公司的日志收集系统极其类似,具备良好的扩展性。与现在流行的各种exporter+promethues监控方案相比:
目前telegraf改造工作基本上是两大部分:
在改造改造无停机动态调度input就涉及到golang多协程精确控制的问题。 一些golang常用并发手段sync包下WaitGroup具体事例: var wg sync.WaitGroup wg.Add(len(a.Config.Outputs)) for _,o := range a.Config.Outputs { go func(output *models.RunningOutput) { defer wg.Done() err := output.Write() if err != nil { log.Printf("E! Error writing to output [%s]: %sn",output.Name,err.Error()) } }(o) } wg.Wait() WaitGroup内部维护了一个counter,当counter数值为0时,表明添加的任务都已经完成。 func (wg *WaitGroup) Add(delta int) 添加任务,delta参数表示添加任务的数量。 func (wg *WaitGroup) Done() 任务执行完成,调用Done方法,一般使用姿势都是defer wg.Done(),此时counter中会减一。 func (wg *WaitGroup) Wait() 通过使用sync.WaitGroup,可以阻塞主线程,直到相应数量的子线程结束。 chan struct{},控制协程退出启动协程的时候,传递一个shutdown chan struct{},需要关闭该协程的时候,直接close(shutdown)。struct{}在golang中是一个消耗接近0的对象。 // gatherer runs the inputs that have been configured with their own // reporting interval. func (a *Agent) gatherer( shutdown chan struct{},kill chan struct{},input *models.RunningInput,interval time.Duration,metricC chan telegraf.Metric,) { defer panicRecover(input) GatherTime := selfstat.RegisterTiming("gather","gather_time_ns",map[string]string{"input": input.Config.Name},) acc := NewAccumulator(input,metricC) acc.SetPrecision(a.Config.Agent.Precision.Duration,a.Config.Agent.Interval.Duration) ticker := time.NewTicker(interval) defer ticker.Stop() for { internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration,shutdown) start := time.Now() gatherWithTimeout(shutdown,kill,input,acc,interval) elapsed := time.Since(start) GatherTime.Incr(elapsed.Nanoseconds()) select { case <-shutdown: return case <-kill: return case <-ticker.C: continue } } } 借助chan 实现指定数量的协程或动态调整协程数量当然这里必须是每个协程是幂等,也就是所有协程做的是同样的工作。 for i := 0; i < s.workers; i++ { go func() { wQuit := make(chan struct{}) s.pool <- wQuit s.sFlowWorker(wQuit) }() } 关闭协程: func (s *SFlow) sFlowWorker(wQuit chan struct{}) { LOOP: for { select { case <-wQuit: break LOOP case msg,ok = <-sFlowUDPCh: if !ok { break LOOP } } // 此处执行任务操作 } 动态调整: for n = 0; n < 10; n++ { if len(s.pool) > s.workers { wQuit := <-s.pool close(wQuit) } } 多协程精确控制在改造telegraf过程中,要想动态调整input,每个input都是唯一的,分属不同类型插件。就必须实现精准控制指定的协程的启停。 // DelInput add input func (a *Agent) DelInput(inputs []*models.RunningInput) error { a.storeMutex.Lock() defer a.storeMutex.Unlock() for _,v := range inputs { if _,ok := a.kills[v.Config.ID]; !ok { return fmt.Errorf("input: %s,未找到,无法删除",v.Config.ID) } } for _,input := range inputs { if kill,ok := a.kills[input.Config.ID]; ok { delete(a.kills,input.Config.ID) close(kill) } } return nil } 添加任务: // AddInput add input func (a *Agent) AddInput(shutdown chan struct{},inputs []*models.RunningInput) error { a.storeMutex.Lock() defer a.storeMutex.Unlock() for _,ok := a.kills[v.Config.ID]; ok { return fmt.Errorf("input: %s,已经存在无法新增",input := range inputs { interval := a.Config.Agent.Interval.Duration // overwrite global interval if this plugin has it's own. if input.Config.Interval != 0 { interval = input.Config.Interval } if input.Config.ID == "" { continue } a.wg.Add(1) kill := make(chan struct{}) a.kills[input.Config.ID] = kill go func(in *models.RunningInput,interv time.Duration) { defer a.wg.Done() a.gatherer(shutdown,in,interv,a.metricC) }(input,interval) } return nil } 总结简单介绍了一下telegraf项目。后续的优化和改造工作还在继续。主要是分布式telegraf的调度算法。毕竟集中化所有exporter以后,telegraf的负载能力受单机能力限制,而且也不符合高可用的使用目标。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |