使用SQLAlchemy表达式时出现Dask

使用SQLAlchemy表达式时出现Dask

本文介绍了使用SQLAlchemy表达式时出现Dask read_sql_table错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正尝试将SQLAlchemy表达式与dask的read_sql_table结合使用,以减少通过加入和过滤一些不同的表而创建的数据集。 表示应该这样做。



(下面的示例不包含任何联接,因为不需要它们来复制问题。)



  import dask.dataframe as dd 
import pandas as pd
从sqlalchemy导入create_engine
从sqlalchemy导入列,元数据,表
从sqlalchemy.sql导入选择


username ='username'
password ='密码'
服务器='产品'
数据库='my_db'

connection_string = f'postgresql + psycopg2:// {用户名}:{密码} @ {服务器} / {database}'

engine = create_engine(connection_string)

元数据= MetaData()

t = Table('my_table',元数据,
Column('id'),
schema ='my_schema')

I'能够构建选择并将其与SQLAlchemy一起使用而没有问题

 >> s = select([t])。limit(5)
> rp = engine.execute(b)
>> rp.fetchall()

[(3140757,),(3118225,),(3156070,),(3193075,),(3114614,)]

我也能够将SQLAlchey select馈送给熊猫的read_sql,它工作正常

 >> pd.read_sql(s,connection_string)

id
0 3140757
1 3118225
2 3156070
3 3193075
4 3114614

但是,当我将相同的选择传递给dask时,出现了ProgrammingError。它表明dask正在转身并调用pandas.read_sql,因此您认为它应该可以工作,但显然不起作用。

 >>> dd.read_sql_table(s,connection_string,index_col ='id')

---------------------------- -----------------------------------------------
ProgrammingError Traceback(最近一次通话最近)
C:_miniconda3\envs\my_env\lib\站点包\sqlalchemy\engine\base.py in _execute_context(自我,方言,构造函数,语句,参数,* args)
1192个参数,
-> 1193上下文)
1194除外BaseException,例如e:

C:\\ miniconda3 \\ envs \\ my_env \\ lib \\ site-packages \\ sqlalchemy \\ engine \\ default。 py in do_execute(自己,游标,语句,参数,上下文)
508 def do_execute(自己,游标,语句,参数,上下文=无):
-> 509 cursor.execute(语句,参数)
510

ProgrammingError:FROM中的子查询必须具有别名
第2行:FROM(SELECT my_schema.my_table.id AS id
^
提示:例如,FROM(SELECT ...)[AS] foo。


上面的异常是以下异常的直接原因:

编程错误回溯(最近一次通话最近)
<模块>
---->中的ipython-input-5-0db95e60f442> 1 dd.read_sql_table(s, connection_string,index_col ='id')

C:\miniconda3\envs\my_env\lib\站点包\dask\dataframe\io\sql.py read_sql_table(表,uri,index_col,除法,npartitions,限制,列,bytes_per_chunk,head_rows,schema,meta,engine_kwargs,** kwargs)
116#从前几行破坏元数据
117 q = sql .select(列).limit(head_rows).select_from(表)
-> 118 head = pd.read_sql(q,engine,** kwa rgs)
119
120如果head.empty:

C:\miniconda3\envs\my_env\lib\站点包\pandas\io readread.sql中的sql.py(sql,con,index_col,coerce_float,params,parse_dates,列,chunksize)
395 sql,index_col = index_col,params = params,
396 coerce_float = coerce_float,parse_dates = parse_dates,
-> 397 chunksize = chunksize)
398
399

C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\ read_query中的sql.py(self,sql,index_col,coerce_float,parse_dates,params,chunksize)
1061 args = _convert_params(sql,params)
1062
-> 1063结果= self.execute(* args)
1064列= result.keys()
1065

C:\miniconda3\envs\my_env\lib\在execute(self,* args,** kwargs)
952 def中的``site-packagespandasioio.sql'',
952 def execute(self,* args,** kwargs):
953 到SQLAlchemy可连接的简单传递
-> 954 return self.connectable.execute(* args,** kwargs)
955
956 def read_table(self,table_name,index_col = None,coerce_float = True,

C: executeminiconda3\envs\my_env\lib站点包\sqlalchemy\engine\base.py in execute(self,statement,* multiparams,** params)
2073
2074 connection = self.contextual_connect(close_with_result = True)
-> 2075 return connection.execute(statement,* multiparams,** params)
2076
2077 def标量(self,statement ,* multiparams,** params):

C:\miniconda3\envs\my_env\lib\站点包\sqlalchemy\engine\base.py in execute( self,object,* multiparams,** params)
946提高exc.ObjectNotExecutableError(object)
947 else:
-> 948 return meth(self,multiparams,params)
949
950 def _execute_function(self,func,multiparams,params):

C:miniconda3\envs\my_env\lib\s _execute_on_connection(自已,连接,多参数,参数)中的ite-packages sqlalchemy\sql\elements.py
267 def _execute_on_connection(自已,连接,多参数,参数):
268(如果为self)。 supports_execution:
- 269 return connection._execute_clauseelement(self,multiparams,params)
270 else:
271raise exc.ObjectNotExecutableError(self)

C:\miniconda3\envs\my_env _execute_clauseelement中的lib库站点包SQLalchemyengine.base(py,self,elem,multiparams,params)
1058编译的sql,
1059蒸馏的参数,
-> ;第1060章没关系了(二更)第1062章(二更)第1062章(二更) _execute_context中的site-packagesalsqlalchemy\engine\base.py(自身,方言,构造函数,语句,参数,* args)
1198参数,
1199光标,
-> ; 1200上下文)
1201
1202如果self._has_events或self.engine._has_events:

C:\miniconda3\envs\my_env\lib\site- _handle_dbapi_exception中的包sqlalchemy\engine\base.py(自身,e,语句,参数,游标,上下文)
1411 util.raise_from_cause(
1412 sqlalchemy_exception,
-> 1413 exc_info
1414)
1415 else:

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\util\ raise_from_cause中的compat.py(except,exc_info)
263 exc_type,exc_value,exc_tb = exc_info
264原因=如果exc_value不是异常则exc_value否则
-> 265提高(type(exception),exception,tb = exc_tb,cause = cause)
266
267 if py3k:

C:\miniconda3\envs\my_env \lib\site-packages\sqlalchemy\util\compat.py以提高(tp,value,tb,原因)
246值.__ cause __ =如果value .__ traceback__为$,则导致
247不是tb:
-> 248筹集价值.with_traceback(tb)
249筹集价值
250

C:\miniconda3\envs\my_env\lib\站点软件包\sqlalchemy _execute_context中的\engine\base.py(自身,方言,构造函数,语句,参数,* args)
1191语句,
1192参数,
-> 1193上下文)
1194除BaseException外为e:
1195 self._handle_dbapi_exception(

C:\miniconda3\envs\my_env\lib\site-packages\ do_execute中的 sqlalchemy引擎 default.py(自身,游标,语句,参数,上下文)
507
508 def do_execute(自身,游标,语句,参数,上下文=无):
-> 509 cursor.execute(语句,参数)
510
511 def do_execute_no_params(self,cursor,statement,context = None):

ProgrammingError: FROM中的(psycopg2.ProgrammingError)子查询必须具有别名
第2行:FROM(SELECT my_schema.my_table.id AS id
^
提示:例如,FROM(SELECT ...) [AS] foo。
[SQL:'SELECT id \nFROM(SELECT my_schema.my_table.id AS id \nFROM my_schema.my_table \n LIMIT%(param_1)s)\n LIMIT%(param_2 )s'] [参数:{'param_1':5,'param_2':5}](此错误的背景位于:http:// sqlalche。我/ e / f405)


解决方案

如Chris在不同的答案是,Dask用 SELECT列FROM(yourquery)的形式包装查询,这对于PostgreSQL是无效的语法,因为它期望该括号表达式的别名。无需重新实现整个 read_sql_table 方法,只需将 .alias('somename')添加到您的表达式中即可为表达式加上别名选择,即

  select([t])。limit(5).alias('foo')

该表达式由Dask包裹后,会为Postgres生成正确的语法

 从(您的查询)SELECT列为foo 


I'm trying to use an SQLAlchemy expression with dask's read_sql_table in order to bring down a dataset that is created by joining and filtering a few different tables. The documentation indicates that this should be possible.

(The example below, does not include any joins as they are not needed to replicate the problem.)

I build my connection string, create an SQLAlchemy engine and table corresponding to a table in my database. (I'm using PostgreSQL.)

import dask.dataframe as dd
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy import Column, MetaData, Table
from sqlalchemy.sql import  select


username = 'username'
password = 'password'
server = 'prod'
database = 'my_db'

connection_string = f'postgresql+psycopg2://{username}:{password}@{server}/{database}'

engine = create_engine(connection_string)

metadata = MetaData()

t = Table('my_table', metadata,
    Column('id'),
    schema='my_schema')

I'm able to build a select and use it with SQLAlchemy with no issue

>>> s = select([t]).limit(5)
>>> rp = engine.execute(s)
>>> rp.fetchall()

[(3140757,), (3118225,), (3156070,), (3193075,), (3114614,)]

I'm also able to feed the SQLAlchey select to panda's read_sql, which works fine

>>> pd.read_sql(s, connection_string)

id
0   3140757
1   3118225
2   3156070
3   3193075
4   3114614

However, when I pass the same select to dask, I get a ProgrammingError. It shows that dask is turning around and calling pandas.read_sql, so you would think it should work, but something is obviously not.

>>> dd.read_sql_table(s, connection_string, index_col='id')

---------------------------------------------------------------------------
ProgrammingError                          Traceback (most recent call last)
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1192                         parameters,
-> 1193                         context)
   1194         except BaseException as e:

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
    508     def do_execute(self, cursor, statement, parameters, context=None):
--> 509         cursor.execute(statement, parameters)
    510

ProgrammingError: subquery in FROM must have an alias
LINE 2: FROM (SELECT my_schema.my_table.id AS id
             ^
HINT:  For example, FROM (SELECT ...) [AS] foo.


The above exception was the direct cause of the following exception:

ProgrammingError                          Traceback (most recent call last)
<ipython-input-5-0db95e60f442> in <module>
----> 1 dd.read_sql_table(s, connection_string, index_col='id')

C:\miniconda3\envs\my_env\lib\site-packages\dask\dataframe\io\sql.py in read_sql_table(table, uri, index_col, divisions, npartitions, limits, columns, bytes_per_chunk, head_rows, schema, meta, engine_kwargs, **kwargs)
    116         # derrive metadata from first few rows
    117         q = sql.select(columns).limit(head_rows).select_from(table)
--> 118         head = pd.read_sql(q, engine, **kwargs)
    119
    120         if head.empty:

C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\sql.py in read_sql(sql, con, index_col, coerce_float, params, parse_dates, columns, chunksize)
    395             sql, index_col=index_col, params=params,
    396             coerce_float=coerce_float, parse_dates=parse_dates,
--> 397             chunksize=chunksize)
    398
    399

C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\sql.py in read_query(self, sql, index_col, coerce_float, parse_dates, params, chunksize)
   1061         args = _convert_params(sql, params)
   1062
-> 1063         result = self.execute(*args)
   1064         columns = result.keys()
   1065

C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\sql.py in execute(self, *args, **kwargs)
    952     def execute(self, *args, **kwargs):
    953         """Simple passthrough to SQLAlchemy connectable"""
--> 954         return self.connectable.execute(*args, **kwargs)
    955
    956     def read_table(self, table_name, index_col=None, coerce_float=True,

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in execute(self, statement, *multiparams, **params)
   2073
   2074         connection = self.contextual_connect(close_with_result=True)
-> 2075         return connection.execute(statement, *multiparams, **params)
   2076
   2077     def scalar(self, statement, *multiparams, **params):

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in execute(self, object, *multiparams, **params)
    946             raise exc.ObjectNotExecutableError(object)
    947         else:
--> 948             return meth(self, multiparams, params)
    949
    950     def _execute_function(self, func, multiparams, params):

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\sql\elements.py in _execute_on_connection(self, connection, multiparams, params)
    267     def _execute_on_connection(self, connection, multiparams, params):
    268         if self.supports_execution:
--> 269             return connection._execute_clauseelement(self, multiparams, params)
    270         else:
    271             raise exc.ObjectNotExecutableError(self)

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_clauseelement(self, elem, multiparams, params)
   1058             compiled_sql,
   1059             distilled_params,
-> 1060             compiled_sql, distilled_params
   1061         )
   1062         if self._has_events or self.engine._has_events:

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1198                 parameters,
   1199                 cursor,
-> 1200                 context)
   1201
   1202         if self._has_events or self.engine._has_events:

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
   1411                 util.raise_from_cause(
   1412                     sqlalchemy_exception,
-> 1413                     exc_info
   1414                 )
   1415             else:

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\util\compat.py in raise_from_cause(exception, exc_info)
    263     exc_type, exc_value, exc_tb = exc_info
    264     cause = exc_value if exc_value is not exception else None
--> 265     reraise(type(exception), exception, tb=exc_tb, cause=cause)
    266
    267 if py3k:

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\util\compat.py in reraise(tp, value, tb, cause)
    246             value.__cause__ = cause
    247         if value.__traceback__ is not tb:
--> 248             raise value.with_traceback(tb)
    249         raise value
    250

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1191                         statement,
   1192                         parameters,
-> 1193                         context)
   1194         except BaseException as e:
   1195             self._handle_dbapi_exception(

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
    507
    508     def do_execute(self, cursor, statement, parameters, context=None):
--> 509         cursor.execute(statement, parameters)
    510
    511     def do_execute_no_params(self, cursor, statement, context=None):

ProgrammingError: (psycopg2.ProgrammingError) subquery in FROM must have an alias
LINE 2: FROM (SELECT my_schema.my_table.id AS id
             ^
HINT:  For example, FROM (SELECT ...) [AS] foo.
 [SQL: 'SELECT id \nFROM (SELECT my_schema.my_table.id AS id \nFROM my_schema.my_table \n LIMIT %(param_1)s) \n LIMIT %(param_2)s'] [parameters: {'param_1': 5, 'param_2': 5}] (Background on this error at: http://sqlalche.me/e/f405)
解决方案

As Chris said in a different answer, Dask wraps your query in something of a form SELECT columns FROM (yourquery), which is an invalid syntax for PostgreSQL, because it expects an alias for that parenthesised expression. Without reimplementing the whole read_sql_table method, the expression can be aliased simply by adding .alias('somename') to your select, i.e.

select([t]).limit(5).alias('foo')

That expression, when wrapped by Dask, generates correct syntax for Postgres

SELECT columns FROM (yourquery) AS foo

这篇关于使用SQLAlchemy表达式时出现Dask read_sql_table错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-11 13:39