我有一个Postgres查询(通过SQLAlchemy),它使用复杂的条件选择匹配的行:

original_query = session.query(SomeTable).filter(*complex_filters)

我不知道查询是如何构造的,我只能访问生成的查询实例。
现在,我想使用这个“不透明”查询(这个问题中的黑框)来构造其他查询,从同一个表中使用完全相同的条件,但是在匹配的original_query行上有额外的逻辑。例如,顶部有SELECT DISTINCT(column)
another_query = session.query(SomeTable.column).distinct().?select_from_query?(original_query)


SELECT SUM(tab_value) FROM (
    SELECT tab.key AS tab_key, tab.value AS tab_value -- inner query, fixed
    FROM tab
    WHERE tab.product_id IN (1, 2)  -- simplified; the inner query is quite complex
) AS tbl
WHERE tab_key = 'length';


SELECT tab_key, COUNT(*) FROM (
    SELECT tab.key AS tab_key, tab.value AS tab_value
    FROM tab
    WHERE tab.product_id IN (1, 2)
) AS tbl
GROUP BY tab_key;

等。
如何在SQLAlchemy中干净地实现?select_from_query?部分?
基本上,如何在SqlAlchemy中执行SELECT dynamic FROM (SELECT fixed)
动机:内部查询对象来自代码的不同部分。我无法控制它是如何构造的,并且希望避免对每个必须在其上运行的SELECT重复其特殊逻辑。我想重复使用这个查询,但是在上面添加额外的逻辑(如上面的例子所示)。

最佳答案

original_query只是一个SQLAlchemy query API object,您可以对此应用其他筛选器和条件。查询API是生成的;每个Query()实例操作都返回一个新的(不可变的)实例,并且您的起点(original_query)不受影响。
这包括使用Query.distinct()添加DISTINCT()子句,Query.with_entities()更改哪些列是查询的一部分,Query.values()执行查询,但只返回特定的单列值。
使用.distinct(<column>).with_entities(<column>)创建新的查询对象(可以进一步重用):

another_query = original_query.distinct(SomeTable.column).with_entities(SomeTable.column)

或者使用.distinct(<column>).values(<column>)在那里获得(column_value,)元组结果的迭代器,然后:
distinct_values = original_query.distinct(SomeTable.column).values(SomeTable.column)

请注意,.values()会立即执行查询,就像.all()会立即执行查询一样,而.with_entities()会返回一个只有一列的新Query对象(然后.all()或迭代或切片将执行并返回结果)。
演示,使用一个人工的Foo模型(对sqlite执行以使其更易于快速演示):
>>> from sqlalchemy import *
>>> from sqlalchemy.ext.declarative import declarative_base
>>> from sqlalchemy.orm import sessionmaker
>>> Base = declarative_base()
>>> class Foo(Base):
...     __tablename__ = "foo"
...     id = Column(Integer, primary_key=True)
...     bar = Column(String)
...     spam = Column(String)
...
>>> engine = create_engine('sqlite:///:memory:', echo=True)
>>> session = sessionmaker(bind=engine)()
>>> Base.metadata.create_all(engine)
2019-06-10 13:10:43,910 INFO sqlalchemy.engine.base.Engine PRAGMA table_info("foo")
2019-06-10 13:10:43,910 INFO sqlalchemy.engine.base.Engine ()
2019-06-10 13:10:43,911 INFO sqlalchemy.engine.base.Engine
CREATE TABLE foo (
    id INTEGER NOT NULL,
    bar VARCHAR,
    spam VARCHAR,
    PRIMARY KEY (id)
)


2019-06-10 13:10:43,911 INFO sqlalchemy.engine.base.Engine ()
2019-06-10 13:10:43,913 INFO sqlalchemy.engine.base.Engine COMMIT
>>> original_query = session.query(Foo).filter(Foo.id.between(17, 42))
>>> print(original_query)  # show what SQL would be executed for this query
SELECT foo.id AS foo_id, foo.bar AS foo_bar, foo.spam AS foo_spam
FROM foo
WHERE foo.id BETWEEN ? AND ?
>>> another_query = original_query.distinct(Foo.bar).with_entities(Foo.bar)
>>> print(another_query)  # print the SQL again, don't execute
SELECT DISTINCT foo.bar AS foo_bar
FROM foo
WHERE foo.id BETWEEN ? AND ?
>>> distinct_values = original_query.distinct(Foo.bar).values(Foo.bar)  # executes!
2019-06-10 13:10:48,470 INFO sqlalchemy.engine.base.Engine SELECT DISTINCT foo.bar AS foo_bar
FROM foo
WHERE foo.id BETWEEN ? AND ?
2019-06-10 13:10:48,470 INFO sqlalchemy.engine.base.Engine (17, 42)

在上面的演示中,原始查询将选择带有Foo过滤器的特定BETWEEN实例,但是添加.distinct(Foo.bar).values(Foo.bar)之后将仅对DISTINCT foo.bar列执行查询,但要使用相同的BETWEEN过滤器类似地,通过使用.with_entities(),我们得到了一个仅用于该单个列的新查询对象,但过滤器仍然是该新查询的一部分。
您添加的示例的工作方式相同;实际上不需要在那里有子选择,因为相同的查询可以表示为:
SELECT sum(tab.value)
FROM tab
WHERE tab.product_id IN (1, 2) AND tab_key = 'length';

只需添加额外的过滤器,然后使用.with_entities()将所选列替换为SUM()即可:
summed_query = (
    original_query
    .filter(Tab.key == 'length')  # add a filter
    .with_entities(func.sum(Tab.value)

或者,就上述演示而言:
>>> print(original_query.filter(Foo.spam == 42).with_entities(func.sum(Foo.bar)))
SELECT sum(foo.bar) AS sum_1
FROM foo
WHERE foo.id BETWEEN ? AND ? AND foo.spam = ?

有子查询的用例(例如限制联接中特定表的结果),但这不是其中之一。
如果您确实需要子查询,那么查询API具有Foo(对于更简单的情况)和Query.from_self()
例如,如果您只需要从原始查询中选择聚合行,并通过Query.subselect()对聚合值进行筛选,然后将结果与每个组的最高行id的另一个表连接,并进行进一步筛选,则需要一个子查询:
summed_col = func.sum(SomeTable.some_column)
max_id = func.max(SomeTable.primary_key)
summed_results_by_eggs = (
    original_query
    .with_entities(max_id, summed_col)      # only select highest id and the sum
    .group_by(SomeTable.other_column)       # per group
    .having(summed_col > 10)                # where the sum is high enough
    .from_self(summed_col)                  # give us the summed value as a subselect
    .join(                                  # join these rows with another table
        OtherTable,
        OtherTable.foreign_key == max_id    # using the highest id
    )
    .filter(OtherTable.some_column < 1000)  # and filter some more
)

以上只会选择大于10的总和HAVING值,以及每组中最高的SomeTable.some_column值。此查询必须使用子查询,因为您希望在与另一个表联接之前限制符合条件的SomeTable.id行。
为了演示这个,我添加了第二个表:
>>> from sqlalchemy.orm import relationship
>>> class Eggs(Base):
...     __tablename__ = "eggs"
...     id = Column(Integer, primary_key=True)
...     foo_id = Column(Integer, ForeignKey(Foo.id))
...     foo = relationship(Foo, backref="eggs")
...
>>> summed_col = func.sum(Foo.bar)
>>> max_id = func.max(Foo.id)
>>> print(
...     original_query
...     .with_entities(max_id, summed_col)
...     .group_by(Foo.spam)
...     .having(summed_col > 10)
...     .from_self(summed_col)
...     .join(Eggs, Eggs.foo_id==max_id)
...     .filter(Eggs.id < 1000)
... )
SELECT anon_1.sum_2 AS sum_1
FROM (SELECT max(foo.id) AS max_1, sum(foo.bar) AS sum_2
FROM foo
WHERE foo.id BETWEEN ? AND ? GROUP BY foo.spam
HAVING sum(foo.bar) > ?) AS anon_1 JOIN eggs ON eggs.foo_id = anon_1.max_1
WHERE eggs.id < ?

SomeTable方法接受在外部查询中使用的新实体,如果省略了这些实体,则会拉出所有列。在上面的代码中,我提取了summatedcolumn值;如果没有这个参数,也将选择Eggs列。

关于python - 如何在SqlAlchemy中没有JOIN的情况下嵌套SELECT?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56525884/

10-12 22:06
查看更多