【原创】packetbeat 之“协议数据包分析每次输出结果均不同”问
问题描述通过 测试命令为 # ./packetbeat -c ./packetbeat.yml -e -I redis_xg-bjdev-rediscluster-2_prot-7101_20161222110723_20161222110733.pcap -E packetbeat.protocols.redis.ports=7101 -t 输出结果如下 ... // 第一次 2017/01/12 04:18:56.634130 logp.go:245: INFO Total non-zero values: libbeat.publisher.published_events=13859 redis.unmatched_responses=23 tcp.dropped_because_of_gaps=4 2017/01/12 04:18:56.634143 logp.go:246: INFO Uptime: 1.210802009s ... // 第二次 2017/01/12 04:21:10.543460 logp.go:245: INFO Total non-zero values: redis.unmatched_responses=23 libbeat.publisher.published_events=14619 tcp.dropped_because_of_gaps=4 2017/01/12 04:21:10.543478 logp.go:246: INFO Uptime: 1.216717998s ... // 第三次 2017/01/12 04:22:49.583149 logp.go:245: INFO Total non-zero values: libbeat.publisher.published_events=15006 tcp.dropped_because_of_gaps=4 redis.unmatched_responses=23 2017/01/12 04:22:49.583160 logp.go:246: INFO Uptime: 1.628717709s 从上面的输出中可以看到:
能够确认的是:
源码分析针对如下每次发生变化的输出日志,进行代码反查: 2017/01/12 04:18:56.634130 logp.go:245: INFO Total non-zero values: libbeat.publisher.published_events=13859 redis.unmatched_responses=23 tcp.dropped_because_of_gaps=4 2017/01/12 04:18:56.634143 logp.go:246: INFO Uptime: 1.210802009s 在 func LogTotalExpvars(cfg *Logging) { if cfg.Metrics.Enabled != nil && *cfg.Metrics.Enabled == false { return } vals := map[string]int64{} prevVals := map[string]int64{} // 将注册到 expvar 中的全部 Int 类型内容保存到 vals 中 snapshotExpvars(vals) // 构建“从开始运行到结束运行”的整个时间段内 // 所有 Int 类型 expvar 变量的 delta 差值字符串 metrics := buildMetricsOutput(prevVals,vals) // 输出“问题”打印 Info("Total non-zero values: %s",metrics) // 输出“从开始运行到结束运行”的时间长度 Info("Uptime: %s",time.Now().Sub(startTime)) }
在 func (b *Beat) launch(bt Creator) error { ... // 标识 packetbeat 开始运行 logp.Info("%s start running.",b.Name) // 标识 packetbeat 结束运行 defer logp.Info("%s stopped.",b.Name) // 在结束运行之前,输出当前基于 expvar 记录 // 的 metrics 变化值 defer logp.LogTotalExpvars(&b.Config.Logging) return beater.Run(b) }
推演:如果输出过程没有问题,那么只能是计算过程出了问题; 在 ... // Metrics that can retrieved through the expvar web interface. // 用于计算 publish_events 值的 expvar 变量 var ( publishedEvents = expvar.NewInt("libbeat.publisher.published_events") ) ... func (c *client) PublishEvent(event common.MapStr,opts ...ClientOption) bool { // 向 event 中添加自定义字段内容 c.annotateEvent(event) // 基于配置的 Processors 进行定制化 event 过滤 // 由于我没有配置这个,因为不会有 event 被过滤掉 publishEvent := c.filterEvent(event) if publishEvent == nil { return false } // 根据配置获取一种投递 event 的管道 ctx,pipeline := c.getPipeline(opts) // 将 publish_events 统计变量 +1 publishedEvents.Add(1) // 将 event 封装成 message 投递到管道中 return pipeline.publish(message{ client: c,context: ctx,datum: outputs.Data{Event: *publishEvent},}) } ... func (c *client) PublishEvents(events []common.MapStr,opts ...ClientOption) bool { data := make([]outputs.Data,len(events)) // 针对 N 个 event 的循环处理 for _,event := range events { c.annotateEvent(event) publishEvent := c.filterEvent(event) if publishEvent != nil { data = append(data,outputs.Data{Event: *publishEvent}) } } ctx,pipeline := c.getPipeline(opts) if len(data) == 0 { logp.Debug("filter","No events to publish") return true } // 将 publish_events 变量 +N publishedEvents.Add(int64(len(data))) return pipeline.publish(message{client: c,data: data}) } ...
那么谁调用了 在 ... func (p *PacketbeatPublisher) onTransaction(event common.MapStr) { // 确认 event 的有效性,即特定字段校验 if err := validateEvent(event); err != nil { logp.Warn("Dropping invalid event: %v",err) return } // 针对 event 中的地址信息进行统一化处理 if !p.normalizeTransAddr(event) { return } // 将 event 发布到管道中 p.client.PublishEvent(event) } func (p *PacketbeatPublisher) onFlow(events []common.MapStr) { pub := events[:0] // 循环处理 N 个 event for _,event := range events { if err := validateEvent(event); err != nil { logp.Warn("Dropping invalid event: %v",err) continue } if !p.addGeoIPToFlow(event) { continue } pub = append(pub,event) } p.client.PublishEvents(pub) } ...
func (p *PacketbeatPublisher) Start() { p.wg.Add(1) go func() { defer p.wg.Done() for { select { case <-p.done: return // 从名为 trans 的 channel 获取一个 event case event := <-p.trans: p.onTransaction(event) } } }() p.wg.Add(1) go func() { defer p.wg.Done() for { select { case <-p.done: return // 从名为 flows 的 channel 获取 N 个 event case events := <-p.flows: p.onFlow(events) } } }() }
相应代码如下 type PacketbeatPublisher struct { .... trans chan common.MapStr flows chan []common.MapStr } ... func NewPublisher( pub publisher.Publisher,hwm,bulkHWM int,ignoreOutgoing bool,) (*PacketbeatPublisher,error) { ... return &PacketbeatPublisher{ pub: pub,topo: topo,geoLite: topo.GeoLite(),ignoreOutgoing: ignoreOutgoing,client: pub.Connect(),done: make(chan struct{}),// trans channel 的 buffer 长度为 hwm trans: make(chan common.MapStr,hwm),// flows channel 的 buffer 长度为 bulkHWM flows: make(chan []common.MapStr,bulkHWM),},nil } 在 // init packetbeat components func (pb *packetbeat) init(b *beat.Beat) error { ... // This is required as init Beat is called before the beat publisher is initialised b.Config.Shipper.InitShipperConfig() // hwm 即 QueueSize 的值; // bulkHWM 即 BulkQueueSize 的值; pb.pub,err = publish.NewPublisher(b.Publisher,*b.Config.Shipper.QueueSize,*b.Config.Shipper.BulkQueueSize,pb.config.IgnoreOutgoing) if err != nil { return fmt.Errorf("Initializing publisher failed: %v",err) } ... } 在 type ShipperConfig struct { ... // internal publisher queue sizes QueueSize *int `config:"queue_size"` BulkQueueSize *int `config:"bulk_queue_size"` ... } ... // 默认值 const ( DefaultQueueSize = 1000 DefaultBulkQueueSize = 0 ) ... // 初始化函数 func (config *ShipperConfig) InitShipperConfig() { // TODO: replace by ucfg // QueueSize 的值在 packetbeat.yml 中定义 if config.QueueSize == nil || *config.QueueSize <= 0 { queueSize := DefaultQueueSize config.QueueSize = &queueSize } if config.BulkQueueSize == nil || *config.BulkQueueSize < 0 { bulkQueueSize := DefaultBulkQueueSize config.BulkQueueSize = &bulkQueueSize } } 在搞清楚了配置位置后,就剩下最后一个问题,trans 和 flows channel 中的消息来自哪里; func (p *PacketbeatPublisher) PublishTransaction(event common.MapStr) bool { select { case p.trans <- event: return true default: // drop event if queue is full // 这个注释说明很关键:如果 queue 满了,event 会被丢弃 return false } } func (p *PacketbeatPublisher) PublishFlows(event []common.MapStr) bool { select { case p.flows <- event: return true case <-p.done: // drop event,if worker has been stopped return false } } 在 // 将 request 和 response 进行关联 func (redis *redisPlugin) correlate(conn *redisConnectionData) { // drop responses with missing requests if conn.requests.empty() { for !conn.responses.empty() { debugf("Response from unknown transaction. Ignoring") unmatchedResponses.Add(1) conn.responses.pop() } return } // merge requests with responses into transactions for !conn.responses.empty() && !conn.requests.empty() { requ := conn.requests.pop() resp := conn.responses.pop() if redis.results != nil { // 构建 transaction 消息内容(JSON 格式) event := redis.newTransaction(requ,resp) // 将 event 发送到 trans channel 中 redis.results.PublishTransaction(event) } } }
至此,消息处理流程梳理完毕:
问题原因
解决办法
其他
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |