scala – 通过Spark写入HBase:任务不可序列化
我正在尝试使用Spark 1.0在HBase(0.96.0-hadoop2)中编写一些简单数据,但我不断遇到序列化问题.这是相关的代码:
import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import java.util.Properties import java.io.FileInputStream import org.apache.hadoop.hbase.client.Put object PutRawDataIntoHbase{ def main(args: Array[String]): Unit = { var propFileName = "hbaseConfig.properties" if(args.size > 0){ propFileName = args(0) } /** Load properties here **/ val theData = sc.textFile(prop.getProperty("hbase.input.filename")) .map(l => l.split("t")) .map(a => Array("%010d".format(a(9).toInt)+ "-" + a(0),a(1))) val tableName = prop.getProperty("hbase.table.name") val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.rootdir",prop.getProperty("hbase.rootdir")) hbaseConf.addResource(prop.getProperty("hbase.site.xml")) val myTable = new HTable(hbaseConf,tableName) theData.foreach(a=>{ var p = new Put(Bytes.toBytes(a(0))) p.add(Bytes.toBytes(hbaseColFamily),Bytes.toBytes("col"),Bytes.toBytes(a(1))) myTable.put(p) }) } } 运行代码会导致: Failed to run foreach at putDataIntoHBase.scala:79 Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable 用map替换foreach不会崩溃,但我也不会写. 解决方法
HBaseConfiguration类表示与HBase服务器的连接池.显然,它无法序列化并发送到工作节点.由于HTable使用此池与HBase服务器通信,因此无法对其进行序列化.
基本上,有三种方法可以解决这个问题: 在每个工作节点上打开连接. 注意使用foreachPartition方法: val tableName = prop.getProperty("hbase.table.name") <......> theData.foreachPartition { iter => val hbaseConf = HBaseConfiguration.create() <... configure HBase ...> val myTable = new HTable(hbaseConf,tableName) iter.foreach { a => var p = new Put(Bytes.toBytes(a(0))) p.add(Bytes.toBytes(hbaseColFamily),Bytes.toBytes(a(1))) myTable.put(p) } } 请注意,每个工作节点必须能够访问HBase服务器,并且必须预先安装或通过ADD_JARS提供所需的jar. 另请注意,由于为每个分区打开了连接池,因此最好将分区数大致减少到工作节点数(使用coalesce函数).也可以在每个工作节点上共享一个HTable实例,但这并不是那么简单. 将所有数据序列化为一个盒子并将其写入HBase 即使数据不适合内存,也可以使用单台计算机从RDD写入所有数据.详细解释在这个答案中解释:Spark: Best practice for retrieving big data from RDD to local machine 当然,它比分布式写入要慢,但它很简单,不会带来痛苦的序列化问题,如果数据大小合理,可能是最好的方法. 使用HadoopOutputFormat 可以为HBase创建自定义HadoopOutputFormat或使用现有的HadoopOutputFormat.我不确定是否存在符合您需求的东西,但谷歌应该在这里提供帮助. 附:顺便说一下,地图调用不会崩溃,因为它没有得到评估:在你调用带副作用的函数之前,不会评估RDD.例如,如果你调用theData.map(….).persist,它也会崩溃. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |