如何使用scala将postgreSQL数据库连接到Apache Spark?
我想知道如何在scala中做以下事情?
>使用Spark scala连接到postgreSQL数据库。 我知道使用scala来做,但是如何在打包时将psql scala的连接器jar导入sbt? 解决方法
我们的目标是运行Spark工作人员的并行SQL查询。
构建设置 将连接器和JDBC添加到build.sbt中的libraryDependencies中。我只是用MySQL这样做,所以我将在我的例子中使用,但Postgres应该是一样的。 libraryDependencies ++= Seq( jdbc,"mysql" % "mysql-connector-java" % "5.1.29","org.apache.spark" %% "spark-core" % "1.0.1",// etc ) 码 当您创建SparkContext时,您会告诉它要将哪些jar复制到执行程序。包括连接器罐。一个好看的方式来做到这一点: val classes = Seq( getClass,// To get the jar with our own code. classOf[mysql.jdbc.Driver] // To get the connector. ) val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath()) val conf = new SparkConf().setJars(jars) 现在,Spark已经准备好连接到数据库了。每个执行者将运行部分查询,以便结果准备好进行分布式计算。 这有两个选择。较旧的方法是使用 val rdd = new org.apache.spark.rdd.JdbcRDD( sc,() => { sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred") },"SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",1000,10,row => row.getString("BOOK_TITLE") ) 查看参数的文档。简单地说 >你有SparkContext。 由于Apache Spark版本1.3.0可以通过DataFrame API提供另一种方法。而不是JdbcRDD,您将创建一个 val df = sqlContext.load("jdbc",Map( "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred","dbtable" -> "BOOKS")) 有关选项的完整列表,请参见https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases(可以像JdbcRDD那样设置分区的关键范围和数量)。 更新 JdbcRDD不支持更新。但是你可以在foreachPartition中简单地做。 rdd.foreachPartition { it => val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred") val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?") for (bookTitle <- it) { del.setString(1,bookTitle) del.executeUpdate } } (这会为每个分区创建一个连接,如果是这个问题,请使用连接池!) DataFrames通过createJDBCTable和insertIntoJDBC方法支持更新。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |