问题描述
I am trying to group events by one of its properties and over time using the KSQL Windowed Aggregation, specifically the Session Window.
我有一个 STREAM
由 kafka 主题制作而成,并指定了 TIMESTAMP
属性.
I have a STREAM
made from a kafka topic with the TIMESTAMP
property well specified.
当我尝试使用会话窗口创建 STREAM
时,查询如下:
When I try to create a STREAM
with a Session Windowing with a query like:
CREATE STREAM SESSION_STREAM AS
SELECT ...
FROM EVENT_STREAM
WINDOW SESSION (5 MINUTES)
GROUP BY ...;
我总是收到错误:
您的 SELECT 查询生成一个 TABLE.请改用 CREATE TABLE AS SELECT 语句.
是否可以使用窗口聚合创建STREAM
?
当我按照建议尝试创建一个 TABLE
然后创建一个包含所有会话开始事件的 STREAM
时,查询如下:
When I try as suggested to create a TABLE
and then a STREAM
that contains all the session starting events, with a query like:
CREATE STREAM SESSION_START_STREAM AS
SELECT *
FROM SESSION_TABLE
WHERE WINDOWSTART=WINDOWEND;
KSQL 通知我:
KSQL 不支持对窗口表的持久查询
如何在 KSQL 中创建启动会话窗口的 STREAM
事件?
How to create a STREAM
of events starting a session window in KSQL?
推荐答案
您的 create stream 语句,如果切换到 create table 语句将创建一个不断更新的表.接收器主题 SESSION_STREAM
将包含对表的更改流,即它的更改日志.
Your create stream statement, if switched to a create table statement will create a table that is constantly being updated. The sink topic SESSION_STREAM
will contain the stream of changes to the table, i.e. its changelog.
ksqlDB 将此建模为 TABLE,因为它具有 TABLE 语义,即表中只能存在具有任何特定键的单行.但是,更改日志将包含已应用于表的更改的流.
ksqlDB models this as a TABLE, because it has TABLE semantics, i.e. only a single row can exist in the table with any specific key. However, the changelog will contain the STREAM of changes that have been applied to the table.
如果你想要的是一个包含所有会话的主题,那么像这样的东西将创建:
If what you want is a topic containing all the sessions then something like this will create that:
-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT)
WITH (kafka_topic='data', value_format='json');
-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
FROM DATA
WINDOW SESSION (5 SECONDS)
GROUP BY USER_ID;
这将创建一个 SESSIONS
主题,其中包含对 SESSIONS
表的更改:即它的更改日志.
This will create a SESSIONS
topic that contains the changes to the SESSIONS
table: i.e. its changelog.
如果您想将其转换为会话启动事件流,那么不幸的是,ksqlDB 还没有允许您直接更改从表创建流,但您可以通过创建流表的更改日志:
If you want to convert this into a stream of session start events, then unfortunately ksqlDB doesn't yet allow you to directly change create a stream from the table, but you can create a stream over the table's change log:
-- Create a stream over the existing `SESSIONS` topic.
-- Note it states the window_type is 'Session'.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS
SELECT * FROM SESSION_STREAM
WHERE WINDOWSTART = WINDOWEND;
注意,在即将发布的 0.10 版本中,您将能够正确命名 SESSION_STREAM
中的键列:
Note, with the upcoming 0.10 release you'll be able to name the key column in the SESSION_STREAM
correctly:
CREATE STREAM SESSION_STREAM (USER_ID INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
这篇关于KSQL 窗口聚合流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!