postgresql – 从Postgres JDBC表中读取Spark的速度很慢
我试图从PostgreSQL数据库加载大约1M行到Spark.使用Spark时需要大约10秒.但是,使用psycopg2驱动程序加载相同的查询需要2s.我正在使用
postgresql jdbc驱动程序版本42.0.0
def _loadFromPostGres(name): url_connect = "jdbc:postgresql:"+dbname properties = {"user": "postgres","password": "postgres"} df = SparkSession.builder.getOrCreate().read.jdbc(url=url_connect,table=name,properties=properties) return df df = _loadFromPostGres(""" (SELECT "seriesId","companyId","userId","score" FROM user_series_game WHERE "companyId"=655124304077004298) as user_series_game""") print measure(lambda : len(df.collect())) 输出是 – --- 10.7214591503 seconds --- 1076131 使用psycopg2 – import psycopg2 conn = psycopg2.connect(conn_string) cur = conn.cursor() def _exec(): cur.execute("""(SELECT "seriesId","score" FROM user_series_game WHERE "companyId"=655124304077004298)""") return cur.fetchall() print measure(lambda : len(_exec())) cur.close() conn.close() 输出是 – --- 2.27961301804 seconds --- 1076131 测量功能 – def measure(func) : start_time = time.time() x = func() print("--- %s seconds ---" % (time.time() - start_time)) return x 请帮我找到这个问题的原因. 编辑1 我做了一些基准测试.使用Scala和JDBC – import java.sql._; import scala.collection.mutable.ArrayBuffer; def exec() { val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000") val conn = DriverManager.getConnection(url,"postgres","postgres"); val sqlText = """SELECT "seriesId","score" FROM user_series_game WHERE "companyId"=655124304077004298""" val t0 = System.nanoTime() val stmt = conn.prepareStatement(sqlText,ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY) val rs = stmt.executeQuery() val list = new ArrayBuffer[(Long,Long,Double)]() while (rs.next()) { val seriesId = rs.getLong("seriesId") val companyId = rs.getLong("companyId") val userId = rs.getLong("userId") val score = rs.getDouble("score") list.append((seriesId,companyId,userId,score)) } val t1 = System.nanoTime() println("Elapsed time: " + (t1 - t0) * 1e-9 + "s") println(list.size) rs.close() stmt.close() conn.close() } exec() 输出是 – Elapsed time: 1.922102285s 1143402 当我在Spark Scala中收集了() – import org.apache.spark.sql.SparkSession def exec2() { val spark = SparkSession.builder().getOrCreate() val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000") val sqlText = """(SELECT "seriesId","score" FROM user_series_game WHERE "companyId"=655124304077004298) as user_series_game""" val t0 = System.nanoTime() val df = spark.read .format("jdbc") .option("url",url) .option("dbtable",sqlText) .option("user","postgres") .option("password","postgres") .load() val list = df.collect() val t1 = System.nanoTime() println("Elapsed time: " + (t1 - t0) * 1e-9 + "s") print (list.size) } exec2() 输出是 Elapsed time: 1.486141076s 1143445 因此,在Python序列化中花费了4倍的额外时间.我知道会有一些惩罚,但这似乎太多了. 解决方法
原因很简单,有两个原因.
首先,我将介绍psycopg2的工作原理. 这个lib psycopg2的工作方式与任何其他lib一样,可以连接到RDMS.这个lib会将查询发送到你的postgres的引擎,它会将数据返回给你.像这样直接前行.
当你使用spark时,两种方式有点不同. Spark不像是在单个线程中运行的编程语言.它有一个分布式系统可以工作.即使您在本地计算机上运行.请参阅Spark有Driver(Master)和Workers的基本概念. 驱动程序接收执行查询到Postgres的请求,驱动程序不会请求每个工作人员请求来自Postgres的信息. 如果您看到文档here,您将看到如下注释:
此注释表示每个工作人员都有责任为您的postgres请求数据.这是开始这个??过程的一小部分开销,但没有什么大不了的.但是这里有一个开销,将数据发送给每个工作者. 第二点,您收集这部分代码: print measure(lambda : len(df.collect())) collect函数将为您的所有工作人员发送命令,以将数据发送给您的驱动程序.要存储在驱动程序的内存中,它就像一个Reduce,它会在进程的中间创建Shuffle. Shuffle是将数据发送给其他工作人员的过程的一个步骤.在收集的情况下,每个工作人员都会将其发送给您的司机. 所以你的代码的JDBC在JDBC中的步骤是:
好吧,在Spark中发生的其他一些事情,比如QueryPlan,构建DataFrame和其他东西. 这就是你在简单的Python代码中比Spark更快响应的原因. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |