scala – 使用Spark批量加载到Phoenix
发布时间:2020-12-16 08:44:32 所属栏目:安全 来源:网络整理
导读:我试图编写一些实用程序来通过Spark RDDs中的HFiles批量加载数据. 我从凤凰采取了CSVBulkLoadTool的模式.我设法生成了一些HFile并将它们加载到HBase中,但我看不到使用sqlline的行(例如使用hbase shell它是可能的).对于任何建议我都会感激不尽. BulkPhoenixLo
我试图编写一些实用程序来通过Spark RDDs中的HFiles批量加载数据.
我从凤凰采取了CSVBulkLoadTool的模式.我设法生成了一些HFile并将它们加载到HBase中,但我看不到使用sqlline的行(例如使用hbase shell它是可能的).对于任何建议我都会感激不尽. BulkPhoenixLoader.scala: class BulkPhoenixLoader[A <: ImmutableBytesWritable : ClassTag,T <: KeyValue : ClassTag](rdd: RDD[(A,T)]) { def createConf(tableName: String,inConf: Option[Configuration] = None): Configuration = { val conf = inConf.map(HBaseConfiguration.create).getOrElse(HBaseConfiguration.create()) val job: Job = Job.getInstance(conf,"Phoenix bulk load") job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputValueClass(classOf[KeyValue]) // initialize credentials to possibily run in a secure env TableMapReduceUtil.initCredentials(job) val htable: HTable = new HTable(conf,tableName) // Auto configure partitioner and reducer according to the Main Data table HFileOutputFormat2.configureIncrementalLoad(job,htable) conf } def bulkSave(tableName: String,outputPath: String,conf: Option[Configuration]) = { val configuration: Configuration = createConf(tableName,conf) rdd.saveAsNewAPIHadoopFile( outputPath,classOf[ImmutableBytesWritable],classOf[Put],classOf[HFileOutputFormat2],configuration) } } ExtendedProductRDDFunctions.scala: class ExtendedProductRDDFunctions[A <: scala.Product](data: org.apache.spark.rdd.RDD[A]) extends ProductRDDFunctions[A](data) with Serializable { def toHFile(tableName: String,columns: Seq[String],conf: Configuration = new Configuration,zkUrl: Option[String] = None): RDD[(ImmutableBytesWritable,KeyValue)] = { val config = ConfigurationUtil.getOutputConfiguration(tableName,columns,zkUrl,Some(conf)) val tableBytes = Bytes.toBytes(tableName) val encodedColumns = ConfigurationUtil.encodeColumns(config) val jdbcUrl = zkUrl.map(getJdbcUrl).getOrElse(getJdbcUrl(config)) val conn = DriverManager.getConnection(jdbcUrl) val query = QueryUtil.constructUpsertStatement(tableName,columns.toList.asJava,null) data.flatMap(x => mapRow(x,jdbcUrl,encodedColumns,tableBytes,query)) } def mapRow(product: Product,jdbcUrl: String,encodedColumns: String,tableBytes: Array[Byte],query: String): List[(ImmutableBytesWritable,KeyValue)] = { val conn = DriverManager.getConnection(jdbcUrl) val preparedStatement = conn.prepareStatement(query) val columnsInfo = ConfigurationUtil.decodeColumns(encodedColumns) columnsInfo.zip(product.productIterator.toList).zipWithIndex.foreach(setInStatement(preparedStatement)) preparedStatement.execute() val uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(conn,true) val hRows = uncommittedDataIterator.asScala.filter(kvPair => Bytes.compareTo(tableBytes,kvPair.getFirst) == 0 ).flatMap(kvPair => kvPair.getSecond.asScala.map( kv => { val byteArray = kv.getRowArray.slice(kv.getRowOffset,kv.getRowOffset + kv.getRowLength - 1) :+ 1.toByte (new ImmutableBytesWritable(byteArray,kv.getRowLength),kv) })) conn.rollback() conn.close() hRows.toList } def setInStatement(statement: PreparedStatement): (((ColumnInfo,Any),Int)) => Unit = { case ((c,v),i) => if (v != null) { // Both Java and Joda dates used to work in 4.2.3,but now they must be java.sql.Date val (finalObj,finalType) = v match { case dt: DateTime => (new Date(dt.getMillis),PDate.INSTANCE.getSqlType) case d: util.Date => (new Date(d.getTime),PDate.INSTANCE.getSqlType) case _ => (v,c.getSqlType) } statement.setObject(i + 1,finalObj,finalType) } else { statement.setNull(i + 1,c.getSqlType) } } private def getIndexTables(conn: Connection,qualifiedTableName: String) : List[(String,String)] = { val table: PTable = PhoenixRuntime.getTable(conn,qualifiedTableName) val tables = table.getIndexes.asScala.map(x => x.getIndexType match { case IndexType.LOCAL => (x.getTableName.getString,MetaDataUtil.getLocalIndexTableName(qualifiedTableName)) case _ => (x.getTableName.getString,x.getTableName.getString) }).toList tables } } 使用hbase中的实用工具加载生成的HFile,如下所示: hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles path/to/hfile tableName 解决方法
您可以将csv文件转换为Product的RDD并使用.saveToPhoenix方法.这通常是我如何将csv数据加载到phoenix中.
请参阅:https://phoenix.apache.org/phoenix_spark.html (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |