加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – spark kafka生产者可序列化

发布时间:2020-12-16 09:51:34 所属栏目:安全 来源:网络整理
导读:我想出了一个例外: ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSeria
我想出了一个例外:

ERROR yarn.ApplicationMaster: User class threw exception:
org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at
org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:889) at
org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:888) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at
org.apache.spark.rdd.RDD.foreach(RDD.scala:888) at
com.Boot$.test(Boot.scala:60) at com.Boot$.main(Boot.scala:36) at
com.Boot.main(Boot.scala) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606) at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
Caused by: java.io.NotSerializableException:
org.apache.kafka.clients.producer.KafkaProducer Serialization stack:
– object not serializable (class: org.apache.kafka.clients.producer.KafkaProducer,value:
org.apache.kafka.clients.producer.KafkaProducer@77624599)
– field (class: com.Boot$$anonfun$test$1,name: producer$1,type: class org.apache.kafka.clients.producer.KafkaProducer)
– object (class com.Boot$$anonfun$test$1,) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

//    @transient
val sparkConf = new SparkConf()

sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

//    @transient
val sc = new SparkContext(sparkConf)

val requestSet: RDD[String] = sc.textFile(s"hdfs:/user/bigdata/ADVERTISE-IMPRESSION-STAT*/*")

//    @transient
val props = new HashMap[String,Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,NearLineConfig.kafka_brokers)
//    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
//    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type","async")
props.put(ProducerConfig.BATCH_SIZE_CONFIG,"49152")

//    @transient
val producer: KafkaProducer[String,String] = new KafkaProducer[String,String](props)

requestSet.foreachPartition((partisions: Iterator[String]) => {
  partisions.foreach((line: String) => {
    try {
      producer.send(new ProducerRecord[String,String]("testtopic",line))
    } catch {
      case ex: Exception => {
        log.warn(ex.getMessage,ex)
      }
    }
  })
})

producer.close()

在这个程序中,我尝试从hdfs路径读取记录并将它们保存到kafka中.
??问题是,当我删除有关向kafka发送记录的代码时,它运行良好.
??我错过了什么?

解决方法

KafkaProducer不可序列化.您需要将实例的创建移动到foreachPartition中:

requestSet.foreachPartition((partitions: Iterator[String]) => {
  val producer: KafkaProducer[String,String](props)
  partitions.foreach((line: String) => {
    try {
      producer.send(new ProducerRecord[String,ex)
      }
    }
  })
})

请注意,KafkaProducer.send返回Future [RecordMetadata],如果无法序列??化键或值,则唯一可以传播的异常是SerializationException.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读