加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

golang基础-beego读取配置_输出log日志、tailf组件读取log、配置

发布时间:2020-12-16 09:42:49 所属栏目:大数据 来源:网络整理
导读:1加载配置文件loadConf封装结构体 2初始化beego的log组件 3初始化tailf 4初始化kafka 5tailf读取 6发送数据kafka 7启动zookeeperkafka测试 8查看测试效果 代码区 在前面3篇博文中已经学习了 golang基础-tailf日志组件使用 golang基础-beego读取配置、log日志
        • 1加载配置文件loadConf封装结构体
        • 2初始化beego的log组件
        • 3初始化tailf
        • 4初始化kafka
        • 5tailf读取
        • 6发送数据kafka
        • 7启动zookeeperkafka测试
        • 8查看测试效果
        • 代码区

在前面3篇博文中已经学习了

golang基础-tailf日志组件使用

golang基础-beego读取配置、log日志输出

golang基础-kafka、zookeeper搭建、go终端发送数据给kafka

今天我们来整合这些demo,写一个log日志收集发送kafka的小项目

项目的流程框架图

项目的结构图:

1、加载配置文件loadConf,封装结构体

[logs]
log_level=debug
log_path=E:golanggo_prologslogagent.log

[collect]
log_path=E:golanggo_prologslogagent.log
topic=nginx_log
chan_size=100

[kafka]
server_addr=192.168.21.8:9092

简单说下配置信息的作用
[logs]是log输出级别,以及log输出的文件地址路径
[collect]是要读取的log日志地址,然后利用topic,启动goroutine发送给kafka
[kafka]kafka关联的ip端口

我们将配置信息封装成结构体,然后在定义一个全局变量来进行使用

var (
    appConfig *Config
)

type Config struct {
    logLevel string
    logPath string

    chanSize int
    kafkaAddr string
    collectConf []tailf.CollectConf
}

在结构体中collectConf 是一个数组,因为我们发送kafka时候,可能是多个不同路径+topic(此例我们只用了一个)

type CollectConf struct { LogPath string Topic string }

2、初始化beego的log组件

func initLogger()(err error) {

    config := make(map[string]interface{})
    config["filename"] = appConfig.logPath
    config["level"] = convertLogLevel(appConfig.logLevel)

    configStr,err := json.Marshal(config)
    if err != nil {
        fmt.Println("initLogger failed,marshal err:",err)
        return
    }

    logs.SetLogger(logs.AdapterFile,string(configStr))
    //{"filename":"E:golanggo_prologslogagent.log","level":7}
    fmt.Println(string(configStr))
    return
}

以上代码就简单了,之前运行过demo的
http://www.52php.cn/article/p-dnrlmbej-bqx.html

3、初始化tailf

在初始化goroutine模块,输出log日志,我们需要设计几个结构体
在初始化配置信息中提到了结构体Config,里面的 collectConf []tailf.CollectConf
我们在封装如下2个结构体

TailObj 结构体是利用tail.Lines读取CollectConf路径下的信息

type TailObj struct { tail *tail.Tail conf CollectConf }

TailObjMgr 结构体是tail.Lines读取CollectConf路径下的信息时候, 存放到chan管道中,tailObjs 这可能是多个不同路径+topic(此例我们只用了一个)

type TailObjMgr struct { tailObjs []*TailObj msgChan chan *TextMsg }

然后将tailf初始化的操作贴出来

func InitTail(conf []CollectConf,chanSize int) (err error) {

    if len(conf) == 0 {
        err = fmt.Errorf("invalid config for log collect,conf:%v",conf)
        return
    }

    tailObjMgr = &TailObjMgr{
        msgChan: make(chan*TextMsg,chanSize),}
    ////appConfig.collectConf [{E:golanggo_prologslogagent.log nginx_log}]
    for _,v := range conf {
        obj := &TailObj{
            conf: v,}
        //v--- {E:golanggo_prologslogagent.log nginx_log}
        fmt.Println("v---",v)
        tails,errTail := tail.TailFile(v.LogPath,tail.Config{
            ReOpen:    true,Follow:    true,//Location: &tail.SeekInfo{Offset: 0,Whence: 2},
            MustExist: false,Poll:      true,})

        if errTail != nil {
            err = errTail
            return
        }

        obj.tail = tails
        tailObjMgr.tailObjs = append(tailObjMgr.tailObjs,obj)

        go readFromTail(obj)
    }

    return
}

func readFromTail(tailObj *TailObj) {
    for true {
        line,ok := <-tailObj.tail.Lines
        if !ok {
            logs.Warn("tail file close reopen,filename:%sn",tailObj.tail.Filename)
            time.Sleep(100 * time.Millisecond)
            continue
        }

        textMsg := &TextMsg{
            Msg:line.Text,Topic: tailObj.conf.Topic,}

        tailObjMgr.msgChan <- textMsg
    }
}

4、初始化kafka

/*初始化kafka*/
func InitKafka(addr string) (err error){

    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    client,err = sarama.NewSyncProducer([]string{addr},config)
    if err != nil {
        logs.Error("init kafka producer failed,err:",err)  
        return
    }
    //记录步骤信息
    logs.Debug("init kafka succ")
    return
}

以前写过关于kafka的例子,就不再详细介绍了

5、tailf读取

从管道中读取即可

//从chan中取出
        msg := tailf.GetOneLine()
        fmt.Println(msg)
func GetOneLine()(msg *TextMsg) {
    msg = <- tailObjMgr.msgChan
    return
}

利用fmt.Println(msg)进行测试,我同时输出到控制台上

6、发送数据kafka

func SendToKafka(data,topic string)(err error) { msg := &sarama.ProducerMessage{}
    msg.Topic = topic
    msg.Value = sarama.StringEncoder(data)

    pid,offset,err := client.SendMessage(msg)
    if err != nil {
        logs.Error("send message failed,err:%v data:%v topic:%v",err,data,topic)
        return
    }

    logs.Debug("send succ,pid:%v offset:%v,topic:%vn",pid,topic)
    return
}

发送kafka的操作也是写过demo例子的,这里就不在详细介绍了

7、启动zookeeper,kafka测试


8、查看测试效果

终端输出+log日志输出目录

以下是输出到kafka的效果图(但是没有输出,我这kafka配置有问题,我用的最新的jdk也找不到错误),虽然没有输出,但是kafka控制台显示了相关的接受信息

可能是如下的错误,有知道解决方案的网友感谢告知

代码区

main.go

package main

import(
    "fmt"
    "github.com/astaxie/beego/logs"
    "logagent/kafka"
    "logagent/tailf"
    // "time"
)

func main() {
    /* 加载配置文件logagent.conf信息 */
    filename := "E:/golang/go_pro/logagent.conf"
    err := loadConf("ini",filename)
    if err != nil {
        fmt.Printf("load conf failed,err:%vn",err)
        panic("load conf failed")
        return
    }

    /* 初始化beego/logs的一些功能,设定输出目录 */
    err = initLogger()
    if err != nil {
        fmt.Printf("load logger failed,err)
        panic("load logger failed")
        return
    }

    /*先测试将log输出配置正确,输出到logagent.log中*/
    logs.Debug("load conf succ,config:%v",appConfig)

    /*初始化tailf日志组件 */
    //appConfig.collectConf [{E:golanggo_prologslogagent.log nginx_log}]
    fmt.Println("appConfig.collectConf",appConfig.collectConf)
    err = tailf.InitTail(appConfig.collectConf,appConfig.chanSize)
    if err != nil {
        logs.Error("init tail failed,err:%v",err)
        return
    }
    /*先测试将tailf配置正确,输出到logagent.log中*/
    logs.Debug("initialize tailf succ")

    /*初始kafka的工作*/
    err = kafka.InitKafka(appConfig.kafkaAddr)
    if err != nil {
        logs.Error("init tail failed,err)
        return
    }

    logs.Debug("initialize all succ")

    err = serverRun()
    if err != nil {
        logs.Error("serverRUn failed,err)
        return
    }

    logs.Info("program exited")
}

config.go

package main


import(
    "fmt"
    "errors"
    "github.com/astaxie/beego/config"
    "logagent/tailf"
)

var (
    appConfig *Config
)

type Config struct {
    logLevel string
    logPath string

    chanSize int
    kafkaAddr string
    collectConf []tailf.CollectConf
}

func loadCollectConf(conf config.Configer) (err error ) {

    var cc tailf.CollectConf
    cc.LogPath = conf.String("collect::log_path")
    if len(cc.LogPath) == 0 {
        err = errors.New("invalid collect::log_path")
        return
    }

    cc.Topic = conf.String("collect::topic")
    if len(cc.LogPath) == 0 {
        err = errors.New("invalid collect::topic")
        return
    }

    appConfig.collectConf = append(appConfig.collectConf,cc)
    return
}

/*
    加载配置文件信息
    [logs]
    log_level=debug
    log_path=E:golanggo_prologslogagent.log
    [collect]
    log_path=E:golanggo_prologslogagent.log
    topic=nginx_log

    chan_size=100
    [kafka]
    server_addr=192.168.21.8:9092
*/
func loadConf(confType,filename string) (err error) {

    conf,err := config.NewConfig(confType,filename)
    if err != nil {
        fmt.Println("new config failed,err)
        return
    }
    /*定义一个全局变量保存
    var appConfig *Config
    */
    appConfig = &Config{}
    appConfig.logLevel = conf.String("logs::log_level")
    if len(appConfig.logLevel) == 0 {
        appConfig.logLevel = "debug"
    }

    appConfig.logPath = conf.String("logs::log_path")
    if len(appConfig.logPath) == 0 {
        appConfig.logPath = "E:golanggo_prologslogagent.log"
    }

    appConfig.chanSize,err = conf.Int("collect::chan_size")
    if err != nil {
        appConfig.chanSize = 100
    }

    appConfig.kafkaAddr = conf.String("kafka::server_addr")
    if len(appConfig.kafkaAddr) == 0 {
        err = fmt.Errorf("invalid kafka addr")
        return
    }

    err = loadCollectConf(conf)
    if err != nil {
        fmt.Printf("load collect conf failed,err)
        return
    }
    return 
}

log.go

package main


import (
    "encoding/json"
    "fmt"
    "github.com/astaxie/beego/logs"
)

func convertLogLevel(level string) int {

    switch (level) {
        case "debug":
            return logs.LevelDebug
        case "warn":
            return logs.LevelWarn
        case "info":
            return logs.LevelInfo
        case "trace":
            return logs.LevelTrace
    }

    return  logs.LevelDebug
}

/* 初始化beego/logs的一些功能,设定输出目录 */
func initLogger()(err error) {

    config := make(map[string]interface{})
    config["filename"] = appConfig.logPath
    config["level"] = convertLogLevel(appConfig.logLevel)

    configStr,"level":7}
    fmt.Println(string(configStr))
    return
}

tailf.go

package tailf

import (
    "github.com/hpcloud/tail"
    "github.com/astaxie/beego/logs"
    "fmt"
    "time"
)

type CollectConf struct {
    LogPath string
    Topic   string
}
/*{E:golanggo_prologslogagent.log nginx_log} 每条配置 */
type TailObj struct {
    tail *tail.Tail
    conf CollectConf
}

type TextMsg struct {
    Msg string
    Topic string
}

type TailObjMgr struct {
    tailObjs []*TailObj
    msgChan chan *TextMsg
}

var (
    tailObjMgr* TailObjMgr
)

func GetOneLine()(msg *TextMsg) {
    msg = <- tailObjMgr.msgChan
    return
}
/*初始化Tail组件一些功能*/
func InitTail(conf []CollectConf,}

        tailObjMgr.msgChan <- textMsg
    }
}

kafka.go

package kafka

import(
    "github.com/Shopify/sarama"
    "github.com/astaxie/beego/logs"
)

var (
    client sarama.SyncProducer 
)

/*初始化kafka*/
func InitKafka(addr string) (err error){

    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    client,err)  
        return
    }
    //记录步骤信息
    logs.Debug("init kafka succ")
    return
}

/* 发送到kafak */
func SendToKafka(data,topic string)(err error) {

    msg := &sarama.ProducerMessage{}
    msg.Topic = topic
    msg.Value = sarama.StringEncoder(data)

    pid,err := client.SendMessage(msg)
    if err != nil {
        logs.Error("send message failed,data,topic)
        return
    }

    logs.Debug("send succ,topic)
    return
}

server.go

package main
import(
    "logagent/tailf"
    "logagent/kafka"
    "github.com/astaxie/beego/logs"
    "time"
    "fmt"
)


func serverRun() (err error){
    for {
        //从chan中取出
        msg := tailf.GetOneLine()
        fmt.Println(msg)

        err = kafka.SendToKafka(msg.Msg,msg.Topic)

        if err != nil {
            logs.Error("send to kafka failed,err)
            time.Sleep(time.Second)
            continue
        }
    }
    return
}

logagent.conf

[logs]
log_level=debug
log_path=E:golanggo_prologslogagent.log

[collect]
log_path=E:golanggo_prologslogagent.log
topic=nginx_log
chan_size=100

[kafka]
server_addr=192.168.21.8:9092

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读