scala – Kafka分区键无法正常工作
我正在努力如何正确使用分区键机制.我的逻辑是将分区号设置为3,然后创建三个分区键为“0”,“1”,“2”,然后使用分区键创建三个KeyedMessage,如
> KeyedMessage(主题,“0”,消息) 在此之后,创建一个生产者实例以发送所有KeyedMessage. 我希望每个KeyedMessage都应该根据不同的分区键进入不同的分区,这意味着 > KeyedMessage(主题,消息)转到分区0 我正在使用Kafka-web-console来观察主题状态,但结果并不像我期望的那样. KeyedMessage仍然随机进入分区,有时候两个KeyedMessage将进入同一个分区,即使它们有不同的分区键. 为了使我的问题更清楚,我想发布一些我现有的Scala代码,并且我正在使用Kafka 0.8.2-beta和Scala 2.10.4. 这是生产者代码,我没有使用自定义partitioner.class: val props = new Properties() val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec props.put("compression.codec",codec.toString) props.put("producer.type",if(synchronously) "sync" else "async") props.put("metadata.broker.list",brokerList) props.put("batch.num.messages",batchSize.toString) props.put("message.send.max.retries",messageSendMaxRetries.toString) props.put("request.required.acks",requestRequiredAcks.toString) props.put("client.id",clientId.toString) val producer = new Producer[AnyRef,AnyRef](new ProducerConfig(props)) def kafkaMesssage(message: Array[Byte],partition: Array[Byte]): KeyedMessage[AnyRef,AnyRef] = { if (partition == null) { new KeyedMessage(topic,message) } else { new KeyedMessage(topic,partition,message) } } def send(message: String,partition: String = null): Unit = send(message.getBytes("UTF8"),if (partition == null) null else partition.getBytes("UTF8")) def send(message: Array[Byte],partition: Array[Byte]): Unit = { try { producer.send(kafkaMesssage(message,partition)) } catch { case e: Exception => e.printStackTrace System.exit(1) } } 以下是我如何使用生产者,创建生成器实例,然后使用此实例发送三条消息.目前我将分区键创建为Integer,然后将其转换为Byte Arrays: val testMessage = UUID.randomUUID().toString val testTopic = "sample1" val groupId_1 = "testGroup" print("starting sample broker testing") val producer = new KafkaProducer(testTopic,"localhost:9092") val numList = List(0,1,2); for (a <- numList) { // Create a partition key as Byte Array var key = java.nio.ByteBuffer.allocate(4).putInt(a).array() //Here I give a Array[Byte] key //so the second "send" function of producer will be called producer.send(testMessage.getBytes("UTF8"),key) } 不确定我的逻辑是不正确还是我没有正确理解分区键机制.任何人都可以提供一些示例代码或解释会很棒! 解决方法
有同样的问题 – 只需切换到ByteArrayParitioner:
props.put("partitioner.class","kafka.producer.ByteArrayPartitioner") (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |