scala – Redis on Spark:任务不可序列化
发布时间:2020-12-16 18:45:55 所属栏目:安全 来源:网络整理
导读:我们在Spark上使用Redis来缓存我们的键值对.这是代码: import com.redis.RedisClientval r = new RedisClient("192.168.1.101",6379)val perhit = perhitFile.map(x = { val arr = x.split(" ") val readId = arr(0).toInt val refId = arr(1).toInt val st
我们在Spark上使用Redis来缓存我们的键值对.这是代码:
import com.redis.RedisClient val r = new RedisClient("192.168.1.101",6379) val perhit = perhitFile.map(x => { val arr = x.split(" ") val readId = arr(0).toInt val refId = arr(1).toInt val start = arr(2).toInt val end = arr(3).toInt val refStr = r.hmget("refStr",refId).get(refId).split(",")(1) val readStr = r.hmget("readStr",readId).get(readId) val realend = if(end > refStr.length - 1) refStr.length - 1 else end val refOneStr = refStr.substring(start,realend) (readStr,refOneStr,refId,start,realend,readId) }) 但编译器给了我这样的反馈: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.map(RDD.scala:270) at com.ynu.App$.main(App.scala:511) at com.ynu.App.main(App.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: com.redis.RedisClient at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 12 more 有人可以告诉我如何序列化Redis的数据.谢谢你们. 解决方法
在Spark中,RDD上的函数(如此处的map)被序列化并发送给执行程序进行处理.这意味着这些操作中包含的所有元素都应该是可序列化的.
此处的Redis连接不可序列化,因为它打开与目标DB的TCP连接,这些连接绑定到创建它的机器. 解决方案是在本地执行上下文中在执行程序上创建这些连接.几乎没有办法做到这一点.想到的两个是: > rdd.mapPartitions:允许您一次处理整个分区,因此分摊创建连接的成本) mapPartitions更容易,因为它只需要对程序结构进行一些小改动: val perhit = perhitFile.mapPartitions{partition => val r = new RedisClient("192.168.1.101",6379) // create the connection in the context of the mapPartition operation val res = partition.map{ x => ... val refStr = r.hmget(...) // use r to process the local data } r.close // take care of resources res } 单例连接管理器可以使用一个对象来建模,该对象包含对连接的惰性引用(注意:可变引用也可以工作). object RedisConnection extends Serializable { lazy val conn: RedisClient = new RedisClient("192.168.1.101",6379) } 然后,此对象可用于为每个工作JVM实例化1个连接,并在操作闭包中用作Serializable对象. val perhit = perhitFile.map{x => val param = f(x) val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data } } 使用单例对象的优点是开销较少,因为JVM只创建一次连接(而不是每个RDD分区一次) 还有一些缺点: >清理连接很棘手(关闭挂钩/定时器) (*)代码用于说明目的.未编译或测试. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |