bigdata – 未能在kafka-storm中向offsetkeeper写入偏移数据
发布时间:2020-12-14 05:16:20 所属栏目:大数据 来源:网络整理
导读:我正在建立一个风暴群来计算实时趋势和其他统计数据,但是通过允许kafka-spout最后读取的偏移(kafka的源代码),我在将这个“恢复”功能引入到这个项目中时遇到了一些问题. -spout来自 https://github.com/apache/incubator-storm/tree/master/external/storm-k
我正在建立一个风暴群来计算实时趋势和其他统计数据,但是通过允许kafka-spout最后读取的偏移(kafka的源代码),我在将这个“恢复”功能引入到这个项目中时遇到了一些问题. -spout来自
https://github.com/apache/incubator-storm/tree/master/external/storm-kafka)值得记住.我用这种方式开始我的kafka-spout:
BrokerHosts zkHost = new ZkHosts("localhost:2181"); SpoutConfig kafkaConfig = new SpoutConfig(zkHost,"test","","test"); kafkaConfig.forceFromStart = false; KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("test" + "spout",kafkaSpout,ESConfig.spoutParallelism); 默认设置应该这样做,但我认为在我的情况下它没有这样做,每次我启动我的项目时,PartitionManager尝试查找带偏移量的文件,然后找不到任何内容: 2014-06-25 11:57:08 INFO PartitionManager:73 - Read partition information from: /storm/partition_1 --> null 2014-06-25 11:57:08 INFO PartitionManager:86 - No partition information found,using configuration to determine offset 然后它开始从最新的可能偏移读取.如果我的项目永远不会失败,那就没关系,但不完全是我想要的. 我还看了一下PartitionManager类,它使用Zkstate类来编写偏移量,从这段代码片段: PartitionManeger public void commit() { long lastCompletedOffset = lastCompletedOffset(); if (_committedTo != lastCompletedOffset) { LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); Map<Object,Object> data = (Map<Object,Object>) ImmutableMap.builder() .put("topology",ImmutableMap.of("id",_topologyInstanceId,"name",_stormConf.get(Config.TOPOLOGY_NAME))) .put("offset",lastCompletedOffset) .put("partition",_partition.partition) .put("broker",ImmutableMap.of("host",_partition.host.host,"port",_partition.host.port)) .put("topic",_spoutConfig.topic).build(); _state.writeJSON(committedPath(),data); _committedTo = lastCompletedOffset; LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); } else { LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId); } } ZkState public void writeBytes(String path,byte[] bytes) { try { if (_curator.checkExists().forPath(path) == null) { _curator.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath(path,bytes); } else { _curator.setData().forPath(path,bytes); } } catch (Exception e) { throw new RuntimeException(e); } } 我可以看到,对于第一条消息,writeBytes方法进入if块并尝试创建路径,然后对于第二条消息,它进入else块,这似乎没问题.但是当我再次启动项目时,会出现上面提到的相同消息.无法找到分区信息. 解决方法
我有同样的问题.原来我在本地模式下运行,它使用内存zookeeper而不是Kafka正在使用的zookeeper.
为了确保KafkaSpout不将Storm的ZooKeeper用于存储偏移量的ZkState,除了ZkHosts之外,还需要设置SpoutConfig.zkServers,SpoutConfig.zkPort和SpoutConfig.zkRoot.例如 import org.apache.zookeeper.client.ConnectStringParser; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; import storm.kafka.KeyValueSchemeAsMultiScheme; ... final ConnectStringParser connectStringParser = new ConnectStringParser(zkConnectStr); final List<InetSocketAddress> serverInetAddresses = connectStringParser.getServerAddresses(); final List<String> serverAddresses = new ArrayList<>(serverInetAddresses.size()); final Integer zkPort = serverInetAddresses.get(0).getPort(); for (InetSocketAddress serverInetAddress : serverInetAddresses) { serverAddresses.add(serverInetAddress.getHostName()); } final ZkHosts zkHosts = new ZkHosts(zkConnectStr); zkHosts.brokerZkPath = kafkaZnode + zkHosts.brokerZkPath; final SpoutConfig spoutConfig = new SpoutConfig(zkHosts,inputTopic,kafkaZnode,kafkaConsumerGroup); spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(inputKafkaKeyValueScheme); spoutConfig.zkServers = serverAddresses; spoutConfig.zkPort = zkPort; spoutConfig.zkRoot = kafkaZnode; (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |