scala – 使用ProtoBuf字段的Spark,Kryo序列化问题
发布时间:2020-12-16 08:45:18 所属栏目:安全 来源:网络整理
导读:在转换RDD时运行与protobuf字段序列化有关的spark作业时,我发现错误. com.esotericsoftware.kryo.KryoException:java.lang.UnsupportedOperationException 序列化跟踪: otherAuthors_(com.thomsonreuters.kraken.medusa.dbor.proto.Book $DBBooks) 此时似
在转换RDD时运行与protobuf字段序列化有关的spark作业时,我发现错误.
com.esotericsoftware.kryo.KryoException:java.lang.UnsupportedOperationException 此时似乎创建了错误: val booksPerTier: Iterable[(TimeTier,RDD[DBBooks])] = allTiers.map { tier => (tier,books.filter(b => isInTier(endOfInterval,tier,b) && !isBookPublished(o)).mapPartitions( it => it.map{ord => (ord.getAuthor,ord.getPublisherName,getGenre(ord.getSourceCountry))})) } val averagesPerAuthor = booksPerTier.flatMap { case (tier,opt) => opt.map(o => (tier,o._1,PublisherCompanyComparison,o._3)).countByValue() } val averagesPerPublisher = booksPerTier.flatMap { case (tier,PublisherComparison(o._2),o._3)).countByValue() } 该字段是protobuf中指定的列表,如下所示: otherAuthors_ = java.util.Collections.emptyList() 正如您所看到的,代码实际上并未使用Book Protobuf中的那个字段,尽管它仍然通过网络传输. 有没有人对此有任何建议? 解决方法
好的,老问题,但这里是后代的答案.默认的kryo序列化程序不适用于某些集合.有一个第三方库可以帮助它:
kryo-serializers
在您的情况下,您可能需要在创建spark配置时提供自定义kryo registrator: val conf = new SparkConf() conf.set("spark.kryo.registrator","MyKryoRegistrator") 在您的registrator中进行所需的自定义注册: class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register( Collections.EMPTY_LIST.getClass(),new CollectionsEmptyListSerializer() ); // Probably should use proto serializer for your proto classes kryo.register( Book.class,new ProtobufSerializer() ); } } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |