加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

如何使用scala将postgreSQL数据库连接到Apache Spark?

发布时间:2020-12-16 09:46:14 所属栏目:安全 来源:网络整理
导读:我想知道如何在scala中做以下事情? 使用Spark scala连接到postgreSQL数据库。 编写SQL查询,如SELECT,UPDATE等,以修改其中的表 那个数据库。 我知道使用scala来做,但是如何在打包时将psql scala的连接器jar导入sbt? 解决方法 我们的目标是运行Spark工作
我想知道如何在scala中做以下事情?

>使用Spark scala连接到postgreSQL数据库。
>编写SQL查询,如SELECT,UPDATE等,以修改其中的表
那个数据库。

我知道使用scala来做,但是如何在打包时将psql scala的连接器jar导入sbt?

解决方法

我们的目标是运行Spark工作人员的并行SQL查询。

构建设置

将连接器和JDBC添加到build.sbt中的libraryDependencies中。我只是用MySQL这样做,所以我将在我的例子中使用,但Postgres应该是一样的。

libraryDependencies ++= Seq(
  jdbc,"mysql" % "mysql-connector-java" % "5.1.29","org.apache.spark" %% "spark-core" % "1.0.1",// etc
)

当您创建SparkContext时,您会告诉它要将哪些jar复制到执行程序。包括连接器罐。一个好看的方式来做到这一点:

val classes = Seq(
  getClass,// To get the jar with our own code.
  classOf[mysql.jdbc.Driver]  // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)

现在,Spark已经准备好连接到数据库了。每个执行者将运行部分查询,以便结果准备好进行分布式计算。

这有两个选择。较旧的方法是使用org.apache.spark.rdd.JdbcRDD

val rdd = new org.apache.spark.rdd.JdbcRDD(
  sc,() => {
    sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  },"SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",1000,10,row => row.getString("BOOK_TITLE")
)

查看参数的文档。简单地说

>你有SparkContext。
>然后创建连接的函数。每个工作人员将调用这个连接到数据库。
>然后SQL查询。这必须与示例类似,并且包含起始和结束键的占位符。
>然后指定键的范围(在我的例子中为0到1000)和分区数。范围将在分区之间划分。所以一个执行者线程将在示例中最终执行SELECT * FROM FOO WHERE 0< = KEY AND KEY< = 100。
>最后我们有一个将ResultSet转换成某个东西的函数。在这个例子中,我们把它转换成一个String,所以你最终得到一个RDD [String]。

由于Apache Spark版本1.3.0可以通过DataFrame API提供另一种方法。而不是JdbcRDD,您将创建一个org.apache.spark.sql.DataFrame

val df = sqlContext.load("jdbc",Map(
  "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred","dbtable" -> "BOOKS"))

有关选项的完整列表,请参见https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases(可以像JdbcRDD那样设置分区的关键范围和数量)。

更新

JdbcRDD不支持更新。但是你可以在foreachPartition中简单地做。

rdd.foreachPartition { it =>
  val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
  for (bookTitle <- it) {
    del.setString(1,bookTitle)
    del.executeUpdate
  }
}

(这会为每个分区创建一个连接,如果是这个问题,请使用连接池!)

DataFrames通过createJDBCTable和insertIntoJDBC方法支持更新。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读