G MEMCACHEQ AS MESSAGE QUEUE

PHP,消息队列,MEMCACHEQ

使用消息队列(MESSAGE QUEUE)可以把某些耗时的工作推后,然后在后台慢慢地去执行,
这样就不会让你的用户等待太久。

今天介绍PHP的消息队列: MEMCACHEQ。

MEMCACHEQ

MEMCACHEQ的特性:
1 简单易用
2 处理速度快
3 多条队列
4 并发性能好
5 与memcache的协议兼容。这就意味着只要装了memcache的extension就可以了,不需要额外的插件。
6 在zend framework中使用也很方便。

MEMCACHEQ依赖于libevent和BerkleyDB。
BerkleyDB用于持久化存储队列的数据。 这样在MEMCACHEQ崩溃或者服务器挂掉的时候,
不至于造成数据的丢失。这一点很重要,很重要。

MemcacheQ will run as a daemon and communicates over the UDP protocol. It can handle concurrent connections and can be used in a load-balanced environment.

启动参数:

memcacheq -d -r -H /data/memcacheq -N -R -v -L 1024 -u nobody &> \
/data/memcacheq/error.log
解释一下:
-d 守护进程
-r Maximize core file limit
-v 详细输出
-u 以用户的身份来运行
-H BDB 文件的保存目录
-N Performance improvement
-R 太久的日志文件会被删除。
-L 日志缓存大小,默认是32K。 1024表示1024K。

其它参数:
-h help
-vv 更加详细的输出
不使用-d的话,输出会直接显示到控制台。

ZEND_QUEUE

zend framework有一个与MEMCACHEQ通信的adapter:
Zend_Queue_Adapter_Memcacheq

http://framework.zend.com/manual/en/zend.queue.adapters.html

下面用一个真实的例子来演示如何在zend framework中使用MEMCACHEQ。

一个新闻网站,每条新闻显示的时候,都要显示出来此新闻的访问量,同时还要把它的访问量加1。

这个访问量是保存到news表中的,与news的其它信息保存在一起。
这个数据变化得非常快,缓存的意义不大。
如果每次用户查看新闻的时候,都去数据库中执行一次update visitCount+1的操作,
肯定是比较费力的。
用户必须等待update完成之后,才能看到新闻的内容。

使用MEMCACHEQ之后呢,我们可以把每一次访问都记录到消息队列中,然后在后台再周期性去更新数据库。
写入消息队列的速度是非常快的,比直接更新mysql快得多得多。

在viewNews.php中:

<?php
// MemcacheQ config
$queueOptions = array(
'name' => 'example-queue',
'driverOptions' => array(
'host' => '127.0.0.1',
'port' => 22201
)
); // Instantiate Zend_Queue
$queue = new Zend_Queue('MemcacheQ', $queueOptions); // Find out if the queue exists
if (!$queue->getAdapter()->isExists($queueOptions['name']))
{
// If not, create it
$queue->createQueue($queueOptions['name']);
} // Build a query string (key=val&key=val) because we need a scalar value
//用户在$timestamp访问了$page页面。
$params = http_build_query(
array(
'timestamp' => $timestamp,
'page' => $page
)
);
// Send the data to the queue
$queue->send($params);

这样就把这一次访问保存到了消息队列[example-queue]中。

然后再搞个cron,去处理队列中的消息。

<?php
// MemcacheQ config
$queueOptions = array(
'name' => 'example-queue',
'driverOptions' => array(
'host' => '127.0.0.1',
'port' => 22201
)
); // Instantiate Zend_Queue
$queue = new Zend_Queue('MemcacheQ', $queueOptions); // Retrieve 5 items from the queue
$messages = $queue->receive(5); // $message is now a instance of Zend_Queue_Message_Iterator
// @TODO: Use a nice FilterIterator ;)
foreach ($messages as $job)
{
if ('creating queue' == $job->body || false === $job->body)
{
// Skip message
continue;
}
// Parse the query string to a array
parse_str($job->body, $data); // Execute the heavy process
//更新数据库
$this->registerHit($data['timestamp'], $data['page']);
}
05-08 07:58