加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala操作HBase2.0

发布时间:2020-12-16 09:13:14 所属栏目:安全 来源:网络整理
导读:在前面: scala:2.12 hbase:2.0.2 开发工具:IDEA 准备工作: 1、将生产上的hbase中的conf/hbase-site.xml文件拷贝到idea中的src/resources目录下 2、将生产环境中hbase中的$HBASE_HOME/lib下的*.jar文件加载到IDEA中 3、点击libraries-中间的"+" -java 4、

  在前面:

  scala:2.12

  hbase:2.0.2

  开发工具:IDEA 

准备工作:

  1、将生产上的hbase中的conf/hbase-site.xml文件拷贝到idea中的src/resources目录下

    

  2、将生产环境中hbase中的$HBASE_HOME/lib下的*.jar文件加载到IDEA中

    

  3、点击libraries->中间的"+" ->java

    

  4、选择jar包所放的位置,点击OK

    

  5、继续点击ok即可

  6、进行连接代码编写:

    

package spark._core

import java.io.IOException
import java.util.UUID

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.SubstringComparator
import org.apache.hadoop.hbase.util.Bytes
import parquet.org.slf4j.LoggerFactory

/**
  * Author Mr. Guo
  * Create 2018/11/5 - 19:08
  */
object Operator_Hbase {

  def LOG = LoggerFactory.getLogger(getClass)

  def getHbaseConf: Configuration = {
    val conf: Configuration = HBaseConfiguration.create
    conf.addResource(".mainresourceshbase-site.xml")
    conf.set("hbase.zookeeper.property.clientPort","2181")
    /*conf.set("spark.executor.memory","3000m")
    conf.set("hbase.zookeeper.quorum","master,slave1,slave2")
    conf.set("hbase.master","master:60000")
    conf.set("hbase.rootdir","Contant.HBASE_ROOTDIR")*/
    conf
  }

  //创建一张表
  @throws(classOf[MasterNotRunningException])
  @throws(classOf[ZooKeeperConnectionException])
  @throws(classOf[IOException])
  def createTable(hbaseconn: Connection,tableName: String,columnFamilys: Array[String]) = {
    //建立一个数据库操作对象
    var admin: Admin = hbaseconn.getAdmin;
    var myTableName: TableName = TableName.valueOf(tableName)
    if (admin.tableExists(myTableName)) {
      LOG.info(tableName + "Table exists!")
    } else {
      val tableDesc: HTableDescriptor = new HTableDescriptor(myTableName)
      tableDesc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation")
      for (columnFamily <- columnFamilys) {
        val columnDesc: HColumnDescriptor = new HColumnDescriptor(columnFamily)
        tableDesc.addFamily(columnDesc)
      }
      admin.createTable(tableDesc)
      LOG.info(tableName + "create table success!")
    }
    admin.close()
  }

  //载入数据
  def addRow(table: Table,rowKey: String,columnFamily: String,quorm: String,value: String) = {
    val rowPut: Put = new Put(Bytes.toBytes(rowKey))
    if (value == null) {
      rowPut.addColumn(columnFamily.getBytes,quorm.getBytes,"".getBytes())
    } else {
      rowPut.addColumn(columnFamily.getBytes,value.getBytes)
    }
    table.put(rowPut)
  }

  //获取数据
  def getRow(table: Table,rowKey: String): Result = {
    val get: Get = new Get(Bytes.toBytes(rowKey))
    val result: Result = table.get(get)
    for (rowKv <- result.rawCells()) {
      println("Famiily:" + new String(rowKv.getFamilyArray,rowKv.getFamilyOffset,rowKv.getFamilyLength,"UTF-8"))
      println("Qualifier:" + new String(rowKv.getQualifierArray,rowKv.getQualifierOffset,rowKv.getQualifierLength,"UTF-8"))
      println("TimeStamp:" + rowKv.getTimestamp)
      println("rowkey:" + new String(rowKv.getRowArray,rowKv.getRowOffset,rowKv.getRowLength,"UTF-8"))
      println("Value:" + new String(rowKv.getValueArray,rowKv.getValueOffset,rowKv.getValueLength,"UTF-8"))
    }
    return result
  }

  //批量添加数据
  def addDataBatch(table: Table,list: java.util.List[Put]) = {
    try {
      table.put(list)
    } catch {
      case e: RetriesExhaustedWithDetailsException => {
        LOG.error(e.getMessage)
      }
      case e: IOException => {
        LOG.error(e.getMessage)
      }
    }
  }

  //查询全部
  def queryAll(table: Table): ResultScanner = {
    val scan: Scan = new Scan
    try {
      val s = new Scan()
      val result: ResultScanner = table.getScanner(s)
      return result
    } catch {
      case e: IOException => {
        LOG.error(e.toString)
      }
    }
    return null
  }

  //查询条记录
  def queryBySingleColumn(table: Table,queryColumn: String,value: String,columns: Array[String]): ResultScanner = {
    if (columns == null || queryColumn == null || value == null) {
      return null
    }
    try {
      val filter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(queryColumn),Bytes.toBytes(queryColumn),CompareOp.EQUAL,new SubstringComparator(value))
      val scan: Scan = new Scan()
      for (columnName <- columns) {
        scan.addColumn(Bytes.toBytes(columnName),Bytes.toBytes(columnName))
      }
      scan.setFilter(filter)
      return table.getScanner(scan)
    } catch {
      case e: Exception => {
        LOG.error(e.toString)
      }
    }
    return null
  }

  //删除表
  def dropTable(hbaseconn: Connection,tableName: String) = {
    try {
      val admin: HBaseAdmin = hbaseconn.getAdmin.asInstanceOf[HBaseAdmin]
      admin.disableTable(TableName.valueOf(tableName))
      admin.deleteTable(TableName.valueOf(tableName))
    } catch {
      case e: MasterNotRunningException => {
        LOG.error(e.toString)
      }
      case e: ZooKeeperConnectionException => {
        LOG.error(e.toString)
      }
      case e: IOException => {
        LOG.error(e.toString)
      }
    }

  }

  def main(args: Array[String]): Unit = {
    val conf: Configuration = getHbaseConf
    val conn = ConnectionFactory.createConnection(conf)
    //定义表名称
    val table:Table = conn.getTable(TableName.valueOf("test"))
    try {
      //列族fam1,fam2
      val familyColumn:Array[String] = Array[String]("info1","info2")
      //建表
//      createTable(conn,"test",familyColumn)

      val uuid:UUID = UUID.randomUUID()
      val s_uuid:String = uuid.toString

      //载入数据
//      addRow(table,s_uuid,"info","column1A",s_uuid+"_1A")

      //获取表中所有数据
//      getRow(table,"9ec78ac4-6042-4c34-8862-f5aca3e")
      //删除表
//     dropTable(conn,"test")
    }catch{
      case e:Exception => {
        if (e.getClass == classOf[MasterNotRunningException]){
          System.out.println("MasterNotRunningException")
        }
        if (e.getClass == classOf[ZooKeeperConnectionException]){
          System.out.println("ZooKeeperConnectionException")
        }
        if (e.getClass == classOf[IOException]){
          System.out.println("IOException")
        }
        e.printStackTrace()
      }
    }finally{
      if (null != table){
        table.close()
      }
    }
  }
}

  7、实际操作的过程中可能会遇到如下问题:

    

   解决方案:

    

    双击打开该文件,找到标签?<component name="PropertiesComponent">?, 在标签里加一行??<property name="dynamic.classpath" value="true" />

    

    保存即可

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读