加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

03 . Go开发一个日志平台之Elasticsearch使用及kafka消费消息发

发布时间:2020-12-16 09:18:45 所属栏目:大数据 来源:网络整理
导读:Elasticsearch使用 详细使用请看我写的Go操作Elasticsearch专篇 https://www.cnblogs.com/you-men/p/13391265.html example1 package mainimport ("context""fmt""github.com/olivere/elastic/v7")var eshost = "http://192.168.43.176:9200"var client *ela

Elasticsearch使用

详细使用请看我写的Go操作Elasticsearch专篇

https://www.cnblogs.com/you-men/p/13391265.html

example1

package main

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
)


var eshost = "http://192.168.43.176:9200"
var client *elastic.Client

type Tyweet struct {
	User string
	Message string
}


//创建
func main() {
	var err error
	client,err = elastic.NewClient(elastic.SetSniff(false),elastic.SetURL(eshost))
	if err !=nil{
		fmt.Println("connect es error",err)
	}

	//使用结构体
	tweet := Tyweet{User: "youmen",Message: "Take Five"}
	_,err = client.Index().
		Index("user").
		Type("tweet").
		Id("1").
		BodyJson(tweet).
		Do(context.Background())
	if err != nil{
		// Handle error
		panic(err)
		return
	}
	fmt.Println("Insert index success")
}

example2

package main

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
)

var client *elastic.Client

var host = "http://192.168.43.176:9200"


type Employee struct {
	FirstName string   `json:"first_name"`
	LastName  string   `json:"last_name"`
	Age       int      `json:"age"`
	About     string   `json:"about"`
	Interests []string `json:"interests"`
}

//初始化
func init() {
	//errorlog := log.New(os.Stdout,"APP",log.LstdFlags)
	var err error
	//这个地方有个小坑 不加上elastic.SetSniff(false) 会连接不上
	client,elastic.SetURL(host))
	if err != nil {
		panic(err)
	}
	_,_,err = client.Ping(host).Do(context.Background())
	if err != nil {
		panic(err)
	}
	//fmt.Printf("Elasticsearch returned with code %d and version %sn",code,info.Version.Number)

	_,err = client.ElasticsearchVersion(host)
	if err != nil {
		panic(err)
	}
	//fmt.Printf("Elasticsearch version %sn",esversion)
}

//创建
func create() {

	//使用结构体
	e1 := Employee{"Jane","Smith",32,"I like to collect rock albums",[]string{"music"}}
	put1,err := client.Index().
		Index("megacorp").
		Type("employee").
		Id("1").
		BodyJson(e1).
		Do(context.Background())
	if err != nil {
		panic(err)
	}
	fmt.Printf("Indexed tweet %s to index s%s,type %sn",put1.Id,put1.Index,put1.Type)

	//使用字符串
	e2 := `{"first_name":"John","last_name":"Smith","age":25,"about":"I love to go rock climbing","interests":["sports","music"]}`
	put2,err := client.Index().
		Index("megacorp").
		Type("employee").
		Id("2").
		BodyJson(e2).
		Do(context.Background())
	if err != nil {
		panic(err)
	}
	fmt.Printf("Indexed tweet %s to index s%s,put2.Id,put2.Index,put2.Type)

	e3 := `{"first_name":"Douglas","last_name":"Fir","age":35,"about":"I like to build cabinets","interests":["forestry"]}`
	put3,err := client.Index().
		Index("megacorp").
		Type("employee").
		Id("3").
		BodyJson(e3).
		Do(context.Background())
	if err != nil {
		panic(err)
	}
	fmt.Printf("Indexed tweet %s to index s%s,put3.Id,put3.Index,put3.Type)
}

func main()  {
	create()
}

kafka消费消息发送ES

kafka消费消息
package Initial

import (
	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
	"time"
)

func Run() (err error) {
	partitionList,err := kafkaClient.client.Partitions(kafkaClient.topic)
	if err != nil {
		logs.Error("Failed to get the list of partitions: ",err)
		return
	}
	for partition := range partitionList {
		pc,errRet := kafkaClient.client.ConsumePartition(kafkaClient.topic,int32(partition),sarama.OffsetNewest)
		if errRet != nil {
			err = errRet
			logs.Error("Failed to start consumer for partition %d: %sn",partition,err)
			return
		}
		defer pc.AsyncClose()
		go func(pc sarama.PartitionConsumer) {
			kafkaClient.wg.Add(1)
			for msg := range pc.Messages() {
				logs.Debug("Partition:%d,Offset:%d,Key:%s,Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))

				// 发送日志到es
				err = sendToES(kafkaClient.topic,msg.Value)
				if err != nil{
					logs.Warn("send to es failed,err:%v",err)
				}
			}
			kafkaClient.wg.Done()
		}(pc)
	}
	kafkaClient.wg.Wait()
	time.Sleep(time.Hour)
	return
}
发送到es
package Initial

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
)

var esclient *elastic.Client


type LogMessage struct {
	App     string
	Topic   string
	Message string
}


type Tyweet struct {
	User    string
	Message string
}

//创建
func InitEs(addr string) (err error) {
	esclient,elastic.SetURL(addr))
	if err != nil {
		fmt.Println("connect es error",err)
	}
	return
}

func sendToES(topic string,data []byte) (err error) {
	msg := &LogMessage{}
	msg.Topic = topic
	msg.Message = string(data)

	_,err = esclient.Index().
		Index(topic).
		Type(topic).
		//Id(fmt.Sprintf("%d",i)).
		BodyJson(msg).
		Do(context.Background())
	if err != nil {
		// Handle error
		panic(err)
		return
	}
	return
}
验证数据是否kafka消息被消费并发送到es

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读