问题描述
我有以下任务:
- 从多个模式的一个表中加载数据
- 使用PySpark
- 使用一个有权访问数据库中所有架构的用户
我正在使用以下代码(或多或少):
I am using the following code (more or less):
def connect_to_oracle_db(spark_session, db_query):
return spark_session.read \
.format("jdbc") \
.option("url", "jdbc:oracle:thin:@//<host>:<port>/<srvice_name") \
.option("user", "<user>") \
.option("password", "<pass>") \
.option("dbtable", db_query) \
.option("driver", "oracle.jdbc.driver.OracleDriver")
def run(self):
all_schemes = <list of all available schemes>
for str_schema in all_schemes:
db_query = "(Select * from " + str_schema + ".TABLE1) TABLE1_DATA"
df_table1 = slef.connect_to_oracle_db(db_query).load()
# process df_table1
大约有300个方案,而且速度很慢,因为每个迭代都会创建并关闭新连接.我想找到一种方法来重用现有连接或以某种方式创建连接池.对我来说似乎无效.
There are around 300 schemes and and it is quite slow because each for iteration the new connection is created and close. I want to find a way how to reuse the existing connection or somehow create the connection pool. It looks quite ineffective for me.
您是否知道如何重用连接或为PySpark创建连接池?
Do you have any idea how to reuse the connection or create connection pool for PySpark?
推荐答案
在类似Spark的分布式环境中,没有经典意义上的连接池.您必须记住,每个分区可以由不同的物理节点,不同的逻辑容器(如果适用于给定的集群管理器)以及最终不同的JVM处理.
There is no place for a connection pool in a classical sense in a distributed like Spark. You have to remember that each partition can be processed by different physical node, different logical container (if applicable on a given cluster manager) and finally different JVMs.
在这种情况下,连接池并不是真的有帮助.由于Spark适用于大量进口,因此单个连接的利用率已经很高.
Not that connection pool could really help in such case. Since Spark is intended for massive imports, individual connection utilization is already pretty high.
但是,这里有不同的可见问题(还有其他问题,从您的摘录中不明显,因为您显示的代码实际上并未获取数据):
There are however different visible problems here (and possibly other problems, not obvious from the snippet, as the code you've shown doesn't actually fetch the data):
-
您没有配置
fetchsize
,因此将使用特定驱动程序的默认设置.对于 Oracle,它是10 ,这完全不适合大规模处理
You didn't configure
fetchsize
, so the default for the particular driver will be used. For Oracle it is 10, which is completely unfit for large scale processing
return spark_session.read
.format("jdbc")
.option("fetchsize", some_reasonable_value)
...
您没有配置分区,因此Spark将仅使用一个分区来处理所有数据.您可以在从JDBC源迁移数据时如何优化分区?中了解有关可能的解决方案的信息.
You didn't configure partitioning, so Spark will process all data using only a single partition. You can read about possible solutions in How to optimize partitioning when migrating data from JDBC source?
您已将此建模为一个顺序过程.除非以某种方式在下游组合数据集,否则最好为每个表提交一个单独的作业,然后让调度程序根据可用资源进行优化.
You've modeled this a sequential process. Unless dataset are somehow combined downstream it would be best, to submit a separate job for each table, and let scheduler optimize things, according to available resources.
您还可以考虑在单个应用程序中并行处理表
You can also consider processing tables in parallel in a single application
重申一下-Spark是懒惰的,因此核心问题可能在其他地方,而上面列出的问题可能是次要的.
And just to reiterate - Spark is lazy, so the core problem can be in some other place, and the issues listed above, can be secondary.
这篇关于Pyspark-重用JDBC连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!