加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Kafka分区键无法正常工作

发布时间:2020-12-16 18:37:25 所属栏目:安全 来源:网络整理
导读:我正在努力如何正确使用分区键机制.我的逻辑是将分区号设置为3,然后创建三个分区键为“0”,“1”,“2”,然后使用分区键创建三个KeyedMessage,如 KeyedMessage(主题,“0”,消息) KeyedMessage(主题,消息) 在此之后,创建一个生产者实例以发送所有KeyedMessage.
我正在努力如何正确使用分区键机制.我的逻辑是将分区号设置为3,然后创建三个分区键为“0”,“1”,“2”,然后使用分区键创建三个KeyedMessage,如

> KeyedMessage(主题,“0”,消息)
> KeyedMessage(主题,消息)

在此之后,创建一个生产者实例以发送所有KeyedMessage.

我希望每个KeyedMessage都应该根据不同的分区键进入不同的分区,这意味着

> KeyedMessage(主题,消息)转到分区0
> KeyedMessage(主题,消息)转到分区1
> KeyedMessage(主题,消息)转到分区2

我正在使用Kafka-web-console来观察主题状态,但结果并不像我期望的那样. KeyedMessage仍然随机进入分区,有时候两个KeyedMessage将进入同一个分区,即使它们有不同的分区键.

为了使我的问题更清楚,我想发布一些我现有的Scala代码,并且我正在使用Kafka 0.8.2-beta和Scala 2.10.4.

这是生产者代码,我没有使用自定义partitioner.class:

val props = new Properties()

  val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec

  props.put("compression.codec",codec.toString)
  props.put("producer.type",if(synchronously) "sync" else "async")
  props.put("metadata.broker.list",brokerList)
  props.put("batch.num.messages",batchSize.toString)
  props.put("message.send.max.retries",messageSendMaxRetries.toString)
  props.put("request.required.acks",requestRequiredAcks.toString)
  props.put("client.id",clientId.toString)

  val producer = new Producer[AnyRef,AnyRef](new ProducerConfig(props))

  def kafkaMesssage(message: Array[Byte],partition: Array[Byte]): KeyedMessage[AnyRef,AnyRef] = {
     if (partition == null) {
       new KeyedMessage(topic,message)
     } else {
       new KeyedMessage(topic,partition,message)
     }
  }

  def send(message: String,partition: String = null): Unit = send(message.getBytes("UTF8"),if (partition == null) null else partition.getBytes("UTF8"))

  def send(message: Array[Byte],partition: Array[Byte]): Unit = {
    try {
      producer.send(kafkaMesssage(message,partition))
    } catch {
      case e: Exception =>
        e.printStackTrace
        System.exit(1)
    }       
  }

以下是我如何使用生产者,创建生成器实例,然后使用此实例发送三条消息.目前我将分区键创建为Integer,然后将其转换为Byte Arrays:

val testMessage = UUID.randomUUID().toString
  val testTopic = "sample1"
  val groupId_1 = "testGroup"

  print("starting sample broker testing")
  val producer = new KafkaProducer(testTopic,"localhost:9092")

  val numList = List(0,1,2);
  for (a <- numList) {
    // Create a partition key as Byte Array
    var key = java.nio.ByteBuffer.allocate(4).putInt(a).array()
    //Here I give a Array[Byte] key
    //so the second "send" function of producer will be called
    producer.send(testMessage.getBytes("UTF8"),key)
  }

不确定我的逻辑是不正确还是我没有正确理解分区键机制.任何人都可以提供一些示例代码或解释会很棒!

解决方法

有同样的问题 – 只需切换到ByteArrayParitioner:

props.put("partitioner.class","kafka.producer.ByteArrayPartitioner")

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读