加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

postgresql – 从Postgres JDBC表中读取Spark的速度很慢

发布时间:2020-12-13 16:08:23 所属栏目:百科 来源:网络整理
导读:我试图从PostgreSQL数据库加载大约1M行到Spark.使用Spark时需要大约10秒.但是,使用psycopg2驱动程序加载相同的查询需要2s.我正在使用 postgresql jdbc驱动程序版本42.0.0 def _loadFromPostGres(name): url_connect = "jdbc:postgresql:"+dbname properties
我试图从PostgreSQL数据库加载大约1M行到Spark.使用Spark时需要大约10秒.但是,使用psycopg2驱动程序加载相同的查询需要2s.我正在使用 postgresql jdbc驱动程序版本42.0.0

def _loadFromPostGres(name):
    url_connect = "jdbc:postgresql:"+dbname
    properties = {"user": "postgres","password": "postgres"}
    df = SparkSession.builder.getOrCreate().read.jdbc(url=url_connect,table=name,properties=properties)
    return df

df = _loadFromPostGres("""
    (SELECT "seriesId","companyId","userId","score" 
    FROM user_series_game 
    WHERE "companyId"=655124304077004298) as
user_series_game""")

print measure(lambda : len(df.collect()))

输出是 –

--- 10.7214591503 seconds ---
1076131

使用psycopg2 –

import psycopg2
conn = psycopg2.connect(conn_string)
cur = conn.cursor()

def _exec():
    cur.execute("""(SELECT "seriesId","score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298)""")
    return cur.fetchall()
print measure(lambda : len(_exec()))
cur.close()
conn.close()

输出是 –

--- 2.27961301804 seconds ---
1076131

测量功能 –

def measure(func) :
    start_time = time.time()
    x = func()
    print("--- %s seconds ---" % (time.time() - start_time))
    return x

请帮我找到这个问题的原因.

编辑1

我做了一些基准测试.使用Scala和JDBC –

import java.sql._;
import scala.collection.mutable.ArrayBuffer;

def exec() {

val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ 
    "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

val conn = DriverManager.getConnection(url,"postgres","postgres");

val sqlText = """SELECT "seriesId","score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298"""

val t0 = System.nanoTime()

val stmt = conn.prepareStatement(sqlText,ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY)

val rs = stmt.executeQuery()

val list = new ArrayBuffer[(Long,Long,Double)]()

while (rs.next()) {
    val seriesId = rs.getLong("seriesId")
    val companyId = rs.getLong("companyId")
    val userId = rs.getLong("userId")
    val score = rs.getDouble("score")
    list.append((seriesId,companyId,userId,score))
}

val t1 = System.nanoTime()

println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

println(list.size)

rs.close()
stmt.close()
conn.close()
}

exec()

输出是 –

Elapsed time: 1.922102285s
1143402

当我在Spark Scala中收集了() –

import org.apache.spark.sql.SparkSession

def exec2() {

    val spark = SparkSession.builder().getOrCreate()

    val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ 
    "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

    val sqlText = """(SELECT "seriesId","score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298) as user_series_game"""

    val t0 = System.nanoTime()

    val df = spark.read
          .format("jdbc")
          .option("url",url)
          .option("dbtable",sqlText)
          .option("user","postgres")
          .option("password","postgres")
          .load()

    val list = df.collect()

    val t1 = System.nanoTime()

    println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

    print (list.size)
}

exec2()

输出是

Elapsed time: 1.486141076s
1143445

因此,在Python序列化中花费了4倍的额外时间.我知道会有一些惩罚,但这似乎太多了.

解决方法

原因很简单,有两个原因.

首先,我将介绍psycopg2的工作原理.

这个lib psycopg2的工作方式与任何其他lib一样,可以连接到RDMS.这个lib会将查询发送到你的postgres的引擎,它会将数据返回给你.像这样直接前行.

Conn -> Query -> ReturnData -> FetchData

当你使用spark时,两种方式有点不同. Spark不像是在单个线程中运行的编程语言.它有一个分布式系统可以工作.即使您在本地计算机上运行.请参阅Spark有Driver(Master)和Workers的基本概念.

驱动程序接收执行查询到Postgres的请求,驱动程序不会请求每个工作人员请求来自Postgres的信息.

如果您看到文档here,您将看到如下注释:

Don’t create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.

此注释表示每个工作人员都有责任为您的postgres请求数据.这是开始这个??过程的一小部分开销,但没有什么大不了的.但是这里有一个开销,将数据发送给每个工作者.

第二点,您收集这部分代码:

print measure(lambda : len(df.collect()))

collect函数将为您的所有工作人员发送命令,以将数据发送给您的驱动程序.要存储在驱动程序的内存中,它就像一个Reduce,它会在进程的中间创建Shuffle. Shuffle是将数据发送给其他工作人员的过程的一个步骤.在收集的情况下,每个工作人员都会将其发送给您的司机.

所以你的代码的JDBC在JDBC中的步骤是:

(Workers)Conn -> (Workers)Query -> (Workers)FetchData -> (Driver)
Request the Data -> (Workers) Shuffle -> (Driver) Collect

好吧,在Spark中发生的其他一些事情,比如QueryPlan,构建DataFrame和其他东西.

这就是你在简单的Python代码中比Spark更快响应的原因.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读