加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

golang基础-kafka、zookeeper搭建、go终端发送数据给kafka

发布时间:2020-12-16 09:43:06 所属栏目:大数据 来源:网络整理
导读:zookeeper搭建 kafka搭建 kafka链接zookeeper 在go终端写入kafka zookeeper搭建 1、安装JAVA-JDK,从oracle下载最新的SDK安装(我用的是1.8的) 2、安装zookeeper3.3.6,下载地址:http://apache.fayea.com/zookeeper/ 3、重命名conf/zoo_sample.cfg 为conf/zo
        • zookeeper搭建
        • kafka搭建
        • kafka链接zookeeper
        • 在go终端写入kafka

zookeeper搭建

1、安装JAVA-JDK,从oracle下载最新的SDK安装(我用的是1.8的)
2、安装zookeeper3.3.6,下载地址:http://apache.fayea.com/zookeeper/
3、重命名conf/zoo_sample.cfg 为conf/zoo.cfg
4、编辑 conf/zoo.cfg,修改dataDir=D:zookeeper-3.3.6data
4、运行bin/zkServer.cmd

启动zookeeper如下

kafka搭建

1、打开链接:http://kafka.apache.org/downloads.html下载kafka2.1.2
2、打开config目录下的server.properties, 修改log.dirs为D:kafka_logs,

############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
#log.dirs=E:developkafkakafka_logs

3、修改advertised.host.name=服务器ip

# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

advertised.host.name=192.168.11.28

4、启动kafka ./bin/windows/kafka-server-start.bat ./config/server.preperties

kafka链接zookeeper

在go终端写入kafka

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    msg := &sarama.ProducerMessage{}
    msg.Topic = "nginx_log"
    msg.Value = sarama.StringEncoder("this is a good test,my message is good")

    client,err := sarama.NewSyncProducer([]string{"192.168.11.28:9092"},config)
    if err != nil {
        fmt.Println("producer close,err:",err)
        return
    }

    defer client.Close()

    pid,offset,err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send message failed,",err)
        return
    }

    fmt.Printf("pid:%v offset:%vn",pid,offset)
}

终端执行如下:我这里有个问题

PS E:golanggo_prosrcsafly> go run demo.go
producer close,err: kafka: client has run out of available brokers to talk to (Is your cluster reacha
ble?)
PS E:golanggo_prosrcsafly>

可能是我在启动kafka时,出现如下的问题:具体还没解决?网友有知道解决方法的话,告知下多谢~~~
Invalid character ‘:’ in value part of property
ogDirectory=E:tmpkafka-logs我用的默认的、或者修改成自定义配置都会出这个问题

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
#log.dirs=E:developkafkakafka_logs
[2017-11-18 20:13:02,517] WARN Error processing kafka.log:type=LogManager,name=LogDirectoryOffline,logDirectory=E:tmpkafka-logs (com.yammer.metrics.reporting.JmxReporter)
javax.management.MalformedObjectNameException: Invalid character ':' in value part of property
        at javax.management.ObjectName.construct(ObjectName.java:618)
        at javax.management.ObjectName.<init>(ObjectName.java:1382)
        at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)
        at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)
        at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)
        at com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)
        at kafka.metrics.KafkaMetricsGroup.newGauge(KafkaMetricsGroup.scala:74)
        at kafka.metrics.KafkaMetricsGroup.newGauge$(KafkaMetricsGroup.scala:73)
        at kafka.log.LogManager.newGauge(LogManager.scala:50)
        at kafka.log.LogManager.$anonfun$new$1(LogManager.scala:122)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at kafka.log.LogManager.<init>(LogManager.scala:116)
        at kafka.log.LogManager$.apply(LogManager.scala:814)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:222)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
        at kafka.Kafka$.main(Kafka.scala:92)
        at kafka.Kafka.main(Kafka.scala)
[2017-11-18 20:13:02,522] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读