scala – Kryo:反序列化旧版本的类
我需要通过添加两个新参数来修改类.这个类是用Kryo序列化的.
每当我停止我的流时,我目前正在持续保存与此课程相关的信息,作为RDD. 当我重新启动流时,我加载了以前持久化的信息,并使用它们在我停止和重新启动之间保持一致. 由于类I持久化需要这些新参数,我通过添加新的kryo.writeObject(输出,对象,ObjectSerializer)和kryo.readObject(输入,classOf [Object],ObjectSerializer)来更改类和序列化程序.参数. 现在,每当我重新启动流时,我都会获得一个异常:“遇到未注册的类……”. 这似乎是显而易见的,因为我试图反序列化一个对象,这个对象在我停止流时我坚持的信息中没有包含. 有没有办法避免这种异常? 谢谢 编辑: 我找到了一些我以前没见过的有用的东西: 这个人通过简单地插入一个很长的定义他应该使用哪个版本的反序列化器来实现版本控制. 如果有人能提出更好的解决方案,请告诉我. 编辑2: 仍然有这种情况的问题. 解决方法
这个问题的解决方案是几个月前发现的.所以我想尽快发布这个问题的答案.
问题在于,由于代码中的错误,该类被使用标准的Kryo FieldSerializer序列化,该标准不是向前兼容的. 我们必须执行以下操作来反序列化旧类并将其转换为新的序列化类. 情况是: case class ClassA(field1 : Long,field2 : String) 它像这样序列化: object ClassASerializer extends Serializer[ClassA] with Serializable{ override def write(kryo: Kryo,output: Output,t: ClassA) = { output.writeLong { t.field1 } output.writeString { t.field2 } } override def read(kryo: Kryo,input: Input,aClass: Class[ClassA]) = classA( field1 = input.readLong(),field2 = input.readLong() ) 并且循环包含要使用序列化程序序列化的类的Seq,以便为所有类注册所有序列化程序. protected def registry: Seq[aClass: Class[A],serializer: Serializer[A]] = ... final def register(kryo: Kryo) = { registry.foreach { registrable => kryo.register(registrable.aClass,registrable.serializer) } } 需要通过添加新字段来修改该类,该字段是另一个案例类的实例. 为了执行此类更改,我们必须使用与Kryo库“可选”相关的注释, ... import com.esotericsoftware.kryo.serializers.FieldSerializer.Optional import scala.annotation.meta.field ... case class ClassA(field1 : Long,field2 : String,@(Optional @field)("field3") field3 : ClassB) 序列化程序被修改,例如在读取旧的序列化类时,它可以使用默认值实例化field3,并且在写入时,写入这样的默认值: object ClassASerializer extends Serializer[ClassA] with Serializable{ override def write(kryo: Kryo,t: ClassA) = { output.writeLong { t.field1 } output.writeString { t.field2 } kryo.writeObject(output,Option { t.field3 } getOrElse ClassB.default,ClassBSerializer) } override def read(kryo: Kryo,aClass: Class[ClassA]) = ClassA( field1 = input.readLong(),field2 = input.readLong(),field3 = ClassB.default ) 还修改了kryo序列化程序注册以注册可选字段: protected def registry: Seq[aClass: Class[A],serializer: Serializer[A]] = ... def optionals = Seq("field3") final def register(kryo: Kryo) = { optionals.foreach { optional => kryo.getContext.asInstanceOf[ObjectMap[Any,Any]].put(optional,true) } registry.foreach { registrable => kryo.register(registrable.aClass,registrable.serializer) } } 结果我们能够编写序列化类的新版本. 与此同时,我们纠正了通过FieldSerializer强制序列化的代码中的错误,但这不在问题的范围内. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |