scala – Spark:java.io.NotSerializableException:org.apache
发布时间:2020-12-16 18:20:50 所属栏目:安全 来源:网络整理
导读:我正在使用以下代码创建avro RDD. def convert2Avro(data : String,schema : Schema) : AvroKey[GenericRecord] = { var wrapper = new AvroKey[GenericRecord]() var record = new GenericData.Record(schema) record.put("empname","John") wrapper.datum(
我正在使用以下代码创建avro RDD.
def convert2Avro(data : String,schema : Schema) : AvroKey[GenericRecord] = { var wrapper = new AvroKey[GenericRecord]() var record = new GenericData.Record(schema) record.put("empname","John") wrapper.datum(record) return wrapper } 并创建avro RDD如下. var avroRDD = fieldsRDD.map(x =>(convert2Avro(x,schema))) 执行时,我在上面的行中得到以下异常 Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.map(RDD.scala:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 任何指针? 解决方法
Schema.ReocrdSchema类未实现可序列化.所以它无法通过网络传输.我们可以将模式转换为字符串并传递给方法,并在方法内部重建模式对象.
var schemaString = schema.toString var avroRDD = fieldsRDD.map(x =>(convert2Avro(x,schemaString))) 内部方法重构模式. def convert2Avro(data : String,schemaString : String) : AvroKey[GenericRecord] = { var schema = parser.parse(schemaString) var wrapper = new AvroKey[GenericRecord]() var record = new GenericData.Record(schema) record.put("empname","John") wrapper.datum(record) return wrapper } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |