11.3 管理 pgq-queues
Skytools 的一个核心组件是pgq。它提供了一个通用排队接口,它可以让您把消息从一个消息提供者传送到一个任意数目的接收者。
现在的问题是:一般来说,一个队列的要点是什么?队列有一些很不错的特征。首先,它可以保证消息的传递。除此之外,它将确保消息被放入队列的顺序会被保留。在复制的情况下,这是非常重要的,因为我们必须确保小不会相互追赶对方。
队列的思想是能够使任何东西从整个生产数据被发送到任何别的参与系统的主机。这不仅适用于复制,使用与很多—您可以把pgq 作为一个灵活调度信息的设施。这种情况的实际例子可以是,购物车购买,银行转帐,或者用户信息。在这种意义上复制一个完整的表或多或少是一种特殊情况。
一般来说,队列知道两个操作:
• Enqueue: 将消息放入队列
• Dequeue: 从队列获取消息(这也被称为“消耗”一个消息)。
这两个操作是每个基于队列的应用程序的核心。
[在 Skytools中,我们把什么定义为一个队列,在 JMS 的术语汇中,您称之为“ 主题(topic)”]
11.3.1 运行 pgq
要在数据库内部使用pgq,您必须把它作为一个正常的PostgreSQL扩展来安装。如果安装过程工作正常,您可以简单地运行下面的指令:
test=# CREATE EXTENSION pgq;
CREATE EXTENSION
既然所有的模块已经加载到数据库中了,我们可以创建一个简单的队列。
创建队列与添加数据
出于示例的目的,我们创建一个名为DemoQueue的队列:
test=# SELECT pgq.create_queue('DemoQueue');
create_queue
--------------
1
(1 row)
如果已经成功创建队列,将会返回一个号码。在内部,队列只是一个一些pgq记账表的内部入口。
test=# \x
Expanded display is on.
test=# SELECT * FROM pgq.queue;
-[ RECORD 1 ]------------+------------------------------
queue_id | 1
queue_name | DemoQueue
queue_ntables | 3
queue_cur_table | 0
queue_rotation_period | 02:00:00
queue_switch_step1 | 489693
queue_switch_step2 | 489693
queue_switch_time | 2013-05-14 16:35:38.132693+02
queue_external_ticker | f
queue_disable_insert | f
queue_ticker_paused | f
queue_ticker_max_count | 500
queue_ticker_max_lag | 00:00:03
queue_ticker_idle_period | 00:01:00
queue_per_tx_limit |
queue_data_pfx | pgq.event_1
queue_event_seq | pgq.event_1_id_seq
queue_tick_seq | pgq.event_1_tick_seq
记账表列出了我们的队列内部的一些基本信息。在这个具体的例子中,它会告诉我们pgq将使用多少内部表来处理我们的队列,此刻哪个表是活跃的,它被切换的频率,等等。实际上,这些信息与普通用户并不相关—这是内部的事情。
一旦队列被创建,我们可以向队列中添加数据。这样做的函数有三个参数:第一个参数是队列的名字。第二个与第三个参数是要入队的数值。在许多情况下,这里使用两个值是很有意义的。第一个值可以代表一个键,而第二个值可以被看作该消息的负载。下面是一个例子:
test=# BEGIN;
BEGIN
test=# SELECT pgq.insert_event('DemoQueue',
'some_key_1', 'some_data_1');
insert_event
--------------
1
(1 row)
test=# SELECT pgq.insert_event('DemoQueue',
'some_key_2', 'some_data_2');
insert_event
--------------
2
(1 row)
test=# COMMIT;
COMMIT
添加接收者
在我们的例子中,我们已经添加了两行样本数据。现在,我们可以注册两个接收者,它们应该以正确的顺序获取那些消息:
test=# BEGIN;
BEGIN
test=# SELECT pgq.register_consumer('DemoQueue',
'Consume_1');
register_consumer
-------------------
1
(1 row)
test=# SELECT pgq.register_consumer('DemoQueue',
'Consume_2');
register_consumer
-------------------
1
(1 row)
test=# COMMIT;
COMMIT
两个接收者已经创建完毕。只要两个接收者都获取消息并把它标记为已经接受,消息就被标记为已处理。
配置 ticker
在我们可以清楚地看到消息如何被接收之前,我们要简明地讨论一下pgq的工作方式。接收者怎么知道那些行数据是发送给它的呢?管理队列并不简单。试想一下,两个并发的事务添加数据行。如果依赖的所有事物被复制,一个事务只能被复制。
下面是一个例子:
Connection 1: Connection 2:
INSERT ... VALUES (1)
BEGIN;
BEGIN;
INSERT ... VALUES (2)
INSERT ... VALUES (3)
COMMIT;
INSERT ... VALUES (4)
COMMIT;
请记住,如果我们要管理队列,我们必须确保保持总的顺序。所以如果写行数据4的事务已经被提交,我们只能提供行数据3给接收者。如果我们在第一个连接中的第二个事务完成之前,把行数据3提供给接收者,行数据3将反超行数据2,不可如此。
在pgq的情况下,一个所谓的ticker进程将处理这些细节。
ticker(pgqd)进程将为我们处理队列,并决定谁准备接收什么。为了使ticker 进程工作,我们创建两个目录。一个将保持日志文件,而另一个将存储由ticker进程创建的 pid 文件:
hs@hs-VirtualBox:~$ mkdir log
hs@hs-VirtualBox:~$ mkdir pid
一旦我们创建了这些目录,我们必须为ticker创建一个配置文件:
[pgqd]
logfile = ~/log/pgqd.log
pidfile = ~/pid/pgqd.pid
## optional parameters ##
# libpq connect string without dbname=
base_connstr = host=localhost
# startup db to query other databases
initial_database = postgres
# limit ticker to specific databases
database_list = test
# log into syslog
syslog = 0
syslog_ident = pgqd
## optional timeouts ##
# how often to check for new databases
check_period = 60
# how often to flush retry queue
retry_period = 30
# how often to do maintentance
maint_period = 120
# how often to run ticker
ticker_period = 1
正如我们已经提到的,ticker负责这些队列。要确保它顺利工作,我们必须把ticker指向 PostgreSQL 实例。请记住,连接字符串将自动完成(基础设施已经知晓部分信息,并且它被用于自动完成)。理想情况下,您会使用这里的 database_list 指令来确保只有那些真正需要的数据库被采用。
至于连接的日志,这里,您有两个选择。您可以直接传送日志到syslog 或者发送日志到一个日志文件。在我们的例子中,我们决定不使用syslog(在我们的路配置文件中,syslog已经被设置为0了)。最后,还有一些配置队列应该被执行需要维持的频率的参数,等等。
ticker启动起来很容易:
hs@hs-VirtualBox:~/skytools$ pgqd ticker.ini
2013-05-14 17:01:38.006 23053 LOG Starting pgqd 3.1.4
2013-05-14 17:01:38.059 23053 LOG test: pgq version ok: 3.1.3
2013-05-14 17:02:08.010 23053 LOG {ticks: 30, maint: 1, retry: 0}
2013-05-14 17:02:38.012 23053 LOG {ticks: 30, maint: 0, retry: 0}
这里最重要的事情是,ticker也可以直接作为守护进程启动。命令行选项 –d 将自动把那个进程发送到后端,并把它从有源终端解耦。
接收消息
只是增加消息队列可能不是我们想要的。在某些时候,我们也要接收这些消息。要做到这一点,我们可以调用pgq.next_bach。系统会返回一个数字标识批处理。
test=# BEGIN;
BEGIN
test=# SELECT pgq.next_batch('DemoQueue', 'Consume_1');
next_batch
------------
1
(1 row)
一旦我们得到了批处理的ID,我们就可以获取数据本身:
test=# \x
Expanded display is on.
test=# SELECT * FROM pgq.get_batch_events(1);
-[ RECORD 1 ]----------------------------
ev_id | 1
ev_time | 2013-05-14 16:43:39.854199+02
ev_txid | 489695
ev_retry |
ev_type | some_key_1
ev_data | some_data_1
ev_extra1 |
ev_extra2 |
ev_extra3 |
ev_extra4 |
-[ RECORD 2 ]----------------------------
ev_id | 2
ev_time | 2013-05-14 16:43:39.854199+02
ev_txid | 489695
ev_retry |
ev_type | some_key_2
ev_data | some_data_2
ev_extra1 |
ev_extra2 |
ev_extra3 |
ev_extra4 |
test=# COMMIT;
COMMIT
在我们的例子中,一批有两个消息组成。在单独的事务中或者通过许多不同的连接已经入队的消息,仍可能在终止于接收者的同一个工作包。这完全是预期的行为。正确的顺序将被 保留。
一旦一个批处理已经被接收者处理了,它就可以被标记为已完成的。
test=# SELECT pgq.finish_batch(1);
finish_batch
--------------
1
(1 row)
这意味,数据已经从队列出队—逻辑上 pgq.get_batch_events 将为这个批处理ID返回一个错误:
test=# SELECT * FROM pgq.get_batch_events(1);
ERROR: batch not found
CONTEXT: PL/pgSQL function pgq.get_batch_events(bigint) line 16 at
assignment
[该消息只是因为这个接收者出队了。其它接收者将仍然能够再次接受它。]
删除队列
如果一个队列不再需要了。它就可以被丢弃了。但是,您不能简单地调用pgq.drop_queue。如果所有的接收者都被注销了,删除队列为唯一可能的。
test=# SELECT pgq.drop_queue('DemoQueue');
ERROR: cannot drop queue, consumers still attached
CONTEXT: PL/pgSQL function pgq.drop_queue(text) line 10 at RETURN
要注销接收者,我们可以做如下几点:
test=# SELECT pgq.unregister_consumer('DemoQueue',
'Consume_1');
unregister_consumer
---------------------
1
(1 row)
test=# SELECT pgq.unregister_consumer('DemoQueue',
'Consume_2');
unregister_consumer
---------------------
1
(1 row)
现在我们可以安全地删除队列。
test=# SELECT pgq.drop_queue('DemoQueue');
drop_queue
------------
1
(1 row)
为大项目使用pgq
如果您要模拟一个要传输的消息流,pgq已经被证明是特别有用的。pgq的妙处在于,您可以把一切基本的东西都放到一个队列中—您可以自由地根据消息的类型个格式决定(只要您在使用文本)。
看到pgq不仅仅是单纯地和复制相关的工具是很重要的—它有一个广泛的范围,并为无数的应用程序提供了坚实的技术基础。