我非常怀疑我是以最有效的方式来做这件事的,这就是我在这里标记plpgsql的原因。我需要在20亿行上运行这个程序,用于1000个测量系统。
你有一个测量系统,当它们失去连接时,通常会报告上一个值,并且它们经常会因为脉冲而失去连接,但有时会持续很长一段时间。你需要聚合,但是当你这样做的时候,你需要看看它重复了多长时间,并根据这些信息生成各种过滤器。假设你正在测量一辆车的mpg,但它在20 mpg下停留了一个小时,而不是移动到20.1左右,以此类推。当它卡住的时候,你需要评估它的准确性。你还可以设置一些替代规则,当汽车在高速公路上行驶时,你可以使用窗口功能生成汽车的“状态”,并有一些东西可以分组。不费吹灰之力:

--here's my data, you have different systems, the time of measurement, and the actual measurement
--as well, the raw data has whether or not it's a repeat (hense the included window function
select * into temporary table cumulative_repeat_calculator_data
FROM
    (
    select
    system_measured, time_of_measurement, measurement,
    case when
     measurement = lag(measurement,1) over (partition by system_measured order by time_of_measurement asc)
     then 1 else 0 end as repeat
    FROM
    (
    SELECT 5 as measurement, 1 as time_of_measurement, 1 as system_measured
    UNION
    SELECT 150 as measurement, 2 as time_of_measurement, 1 as system_measured
    UNION
    SELECT 5 as measurement, 3 as time_of_measurement, 1 as system_measured
    UNION
    SELECT 5 as measurement, 4 as time_of_measurement, 1 as system_measured
    UNION
    SELECT 5 as measurement, 1 as time_of_measurement, 2 as system_measured
    UNION
    SELECT 5 as measurement, 2 as time_of_measurement, 2 as system_measured
    UNION
    SELECT 5 as measurement, 3 as time_of_measurement, 2 as system_measured
    UNION
    SELECT 5 as measurement, 4 as time_of_measurement, 2 as system_measured
    UNION
    SELECT 150 as measurement, 5 as time_of_measurement, 2 as system_measured
    UNION
    SELECT 5 as measurement, 6 as time_of_measurement, 2 as system_measured
    UNION
    SELECT 5 as measurement, 7 as time_of_measurement, 2 as system_measured
    UNION
    SELECT 5 as measurement, 8 as time_of_measurement, 2 as system_measured
    ) as data
) as data;

--unfortunately you can't have window functions within window functions, so I had to break it down into subquery
--what we need is something to partion on, the 'state' of the system if you will, so I ran a running total of the nonrepeats
--this creates a row that stays the same when your data is repeating - aka something you can partition/group on
select * into temporary table cumulative_repeat_calculator_step_1
FROM
    (
    select
    *,
    sum(case when repeat = 0 then 1 else 0 end) over (partition by system_measured order by time_of_measurement asc) as cumlative_sum_of_nonrepeats_by_system
    from cumulative_repeat_calculator_data
    order by system_measured, time_of_measurement
) as data;

--finally, the query. I didn't bother showing my desired output, because this (finally) got it
--I wanted a sequential count of repeats that restarts when it stops repeating, and starts with the first repeat
--what you can do now is take the average measurement under some condition based on how long it was repeating, for example
select *,
case when repeat = 0 then 0
else
row_number() over (partition by cumlative_sum_of_nonrepeats_by_system, system_measured order by time_of_measurement) - 1
end as ordered_repeat
from cumulative_repeat_calculator_step_1
order by system_measured, time_of_measurement

那么,为了在一个巨大的表上运行它,您会做什么不同的事情,或者您会使用什么替代工具?我之所以考虑plpgsql,是因为我怀疑这需要在数据库中完成,或者在数据插入过程中完成,尽管我通常在加载数据之后处理它。有没有什么方法可以在不诉诸子查询的情况下一次性完成此任务?
我已经测试了一个替代方法,但它仍然依赖于子查询,我认为这更快。对于该方法,您可以创建一个带有start_timestamp、end_timestamp、system的“starts and stops”表。然后加入到较大的表中,如果时间戳介于两者之间,则将其分类为处于该状态,这实际上是cumlative_sum_of_nonrepeats_by_system的一种替代方法。但当你这样做的时候,你就加入了1=1,用于成千上万的设备和成千上万的“事件”。你觉得这样更好吗?

最佳答案

测试用例
首先,一种更有用的方式来呈现您的数据,或者更好地以sqlfiddle格式呈现,随时可以使用:

CREATE TEMP TABLE data(
   system_measured int
 , time_of_measurement int
 , measurement int
);

INSERT INTO data VALUES
 (1, 1, 5)
,(1, 2, 150)
,(1, 3, 5)
,(1, 4, 5)
,(2, 1, 5)
,(2, 2, 5)
,(2, 3, 5)
,(2, 4, 5)
,(2, 5, 150)
,(2, 6, 5)
,(2, 7, 5)
,(2, 8, 5);

简化查询
由于目前还不清楚,我仅假设上述情况。
接下来,我将您的查询简化为:
WITH x AS (
   SELECT *, CASE WHEN lag(measurement) OVER (PARTITION BY system_measured
                               ORDER BY time_of_measurement) = measurement
                  THEN 0 ELSE 1 END AS step
   FROM   data
   )
   , y AS (
   SELECT *, sum(step) OVER(PARTITION BY system_measured
                            ORDER BY time_of_measurement) AS grp
   FROM   x
   )
SELECT * ,row_number() OVER (PARTITION BY system_measured, grp
                             ORDER BY time_of_measurement) - 1 AS repeat_ct
FROM   y
ORDER  BY system_measured, time_of_measurement;

现在,虽然使用纯SQL非常好而且很有光泽,但是使用plpgsql函数会更快,因为它可以在一个表扫描中完成,而这个查询至少需要三次扫描。
使用plpgsql函数更快:
CREATE OR REPLACE FUNCTION x.f_repeat_ct()
  RETURNS TABLE (
    system_measured int
  , time_of_measurement int
  , measurement int, repeat_ct int
  )  LANGUAGE plpgsql AS
$func$
DECLARE
   r    data;     -- table name serves as record type
   r0   data;
BEGIN

-- SET LOCAL work_mem = '1000 MB';  -- uncomment an adapt if needed, see below!

repeat_ct := 0;   -- init

FOR r IN
   SELECT * FROM data d ORDER BY d.system_measured, d.time_of_measurement
LOOP
   IF  r.system_measured = r0.system_measured
       AND r.measurement = r0.measurement THEN
      repeat_ct := repeat_ct + 1;   -- start new array
   ELSE
      repeat_ct := 0;               -- start new count
   END IF;

   RETURN QUERY SELECT r.*, repeat_ct;

   r0 := r;                         -- remember last row
END LOOP;

END
$func$;

呼叫:
SELECT * FROM x.f_repeat_ct();

在这种plpgsql函数中,请务必始终使用table限定列名,因为我们使用与输出参数相同的名称,如果不限定列名,则优先使用这些名称。
数十亿行
如果有数十亿行,则可能需要将此操作拆分。我引用手册:
注:当前执行的RETURN NEXTRETURN QUERY
在从函数返回之前将整个结果集存储为
上面讨论过。这意味着如果PL/pgSQL函数生成
结果集很大,性能可能很差:将写入数据
到磁盘以避免内存耗尽,但函数本身不会
返回,直到生成整个结果集。未来
PL/pgSQL的版本可能允许用户定义set returning
没有此限制的函数。目前,在
哪些数据开始写入磁盘由here控制
配置变量。具有足够内存的管理员
在内存中存储更大的结果集应该考虑增加
参数。
考虑一次为一个系统计算行,或者为work_mem设置足够高的值以处理负载。按照报价单中提供的链接了解有关工作记忆的更多信息。
一种方法是在函数中为work_memwork_mem设置一个非常高的值,这只对当前事务有效。我在函数中添加了一个注释行。不要在全局范围内设置得太高,因为这可能会对服务器造成核攻击。阅读手册。

10-06 16:13