介绍性的话我这里就不翻译了, 总结起来就是zmq很cool, 你应该尝试一下.

如何安装与使用zmq

在Linux和Mac OS上, 请通过随机附带的包管理软件, 或者home brew安装zmq. 包名一般就叫zmq, 安装上就好.

安装后, 以Mac OS为例, 会出现一个新的头文件 /usr/local/include/zmq.h , 和一个链接库 /usr/local/lib/libzmq.a.

所以, 如果你使用C语言, 那么很简单, 写代码的时候加上头文件 #include <zmq.h> 就好了, 链接的时候加上库 -lzmq 就好了.

如果你使用的不是C语言, 那么也很简单, 去复习一下C语言, 然后再回来看这个教程. 需要注意的是, 这个教程里的所有示例代码在编译的时候需要指定 -std=c99.

一问一答例子入门

先放一个一问一答的例子来让你感受一下

这是服务端代码

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h> int main(void)
{
void * context = zmq_ctx_new();
void * socket = zmq_socket(context, ZMQ_REP);
zmq_bind(socket, "tcp://*:5555"); while(1)
{
char buffer[10];
int bytes = zmq_recv(socket, buffer, 10, 0);
buffer[bytes] = '\0';
printf("[Server] Recevied Request Message: %d bytes, content == \"%s\"\n", bytes, buffer); sleep(1); const char * replyMsg = "World";
bytes = zmq_send(socket, replyMsg, strlen(replyMsg), 0);
printf("[Server] Sended Reply Message: %d bytes, content == \"%s\"\n", bytes, replyMsg);
} zmq_close(socket);
zmq_ctx_destroy(context); return 0;
}

这是客户端代码

#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h> int main(void)
{
printf("Connecting to server...\n"); void * context = zmq_ctx_new();
void * socket = zmq_socket(context, ZMQ_REQ);
zmq_connect(socket, "tcp://localhost:5555"); for(int i = 0; i < 10; ++i)
{
char buffer[10];
const char * requestMsg = "Hello";
int bytes = zmq_send(socket, requestMsg, strlen(requestMsg), 0);
printf("[Client][%d] Sended Request Message: %d bytes, content == \"%s\"\n", i, bytes, requestMsg); bytes = zmq_recv(socket, buffer, 10, 0);
buffer[bytes] = '\0';
printf("[Client][%d] Received Reply Message: %d bytes, content == \"%s\"\n", i, bytes, buffer); } zmq_close(socket);
zmq_ctx_destroy(context); return 0;
}

这是makefile

all: client server
%: %.c
gcc -std=c99 $^ -o $@ -lzmq

这个例子就很简单, 要点有以下

服务端上:

  1. 服务端创建一个context, 再用context去创建一个socket, 再把socket绑定到tcp的5555端口上
  2. 先从socket中读取数据, 再向socket中写入数据

客户端上

  1. 客户端也是创建一个context, 再用context去创建一个socket, 与服务端不同的, 客户端是用tcp协议连接到本机的5555端口上, 这也是服务端监听的网络地址
  2. 客户端先向socket里写入数据, 再从socket中读取数据
  3. 客户端执行 写入-读出 这样的操作十遍后, 关闭socket, 销毁context, 退出程序

看起来套路和你从<Unix 网络编程>里学到的差不多嘛. 不过, 你可以试试, 先启动客户端, 然后再启动服务端, 你会发现, 程序没有崩溃. 这就是zmq高明的地方, 把操作系统原生脆弱的网络编程接口进行了封装. 并且实际上不止于此, 后面我们会学到更多. 这只是开胃小菜.

注意: 网络通信中没有字符串!

你可能注意到了我们上面的例子里, 其实客户端与服务端互相传输的数据里, 并没有包含C风格字符串最后一位的'\0'. 请时刻谨记这一点, 网络通信中, 流动在网络编程API上的数据, 对于API本身来说, 都是字节序列而已, 如何解释这些字节序列, 是网络编程API的使用者的责任. 比如上面, 我们需要在每次接收数据的时候记录接收的数据的大小, 并且在buffer中为接收到的数据之后的一个字节赋值为0, 即人为的把接收到的数据解释为字符串. 而对于zmq_sendzmq_recv来说, 它并不关心客户端与服务端传输的数据具体是什么.

这在所有网络编程API中都是这个套路, 不光是zmq, linux socket, winsock, 都是这样. 字符串? 不存在的. 我能看见的, 只是字节序列而已.

获取zmq的版本信息

当你要把zmq应用到实际项目中的时候, 版本号注是一个你必须关注的事情了. 当然, 项目初期你可以不关心它, 或者项目规模较小的时候你可以不关心它. 但随着项目的进展, 项目中使用到的库的版本号就成了所有人必须关心的事情. 实际上所有第三方库的版本都是一个需要项目owner关心的事情, 因为总有一些sb会做出以下的事情:

  1. 当一个sb需要一个额外功能的时候, 他会以光速引入一个库, 并且从不检查这个库是否已经被引入到项目中.
  2. 当这个sb引入这个第三方库的时候, 这个sb只关心自己写的代码能不能顺利编译运行
  3. 很大概率这个sb不会仔细阅读项目的构造工具脚本, 这个sb只关心如何把这坨他看不懂的东西, 搞的不报错, 能运行起来.
  4. 很在可能这个sb引入的这个第三方库, 项目已经在先前引入了, 经过这个sb这次操作, 项目中会存在不同版本的两个同名库的引用.
  5. 一般情况下这个sb由于追求cool, 会引入最新的版本, 甚至是beta版
  6. 多数情况下, 这次操作引入的负面影响会在几个月后爆发.

所以, 在这里衷心的建议你, 时刻关注你项目中使用的所有第三方库, 搞清楚你的项目构造工具链的运行过程. 而对于zmq来说, 要获得zmq的版本, 需要如下调用一些函数

#include <zmq.h>
#include <stdio.h> int main(void)
{
int major = 0;
int minor = 0;
int patch = 0; zmq_version(&major, &minor, &patch); printf("ZMQ_VERSION == %d.%d.%d\n", major, minor, patch); return 0;
}

在我写(抄)这个教程的时候, 我使用的版本号是4.2.5

封装一些工具函数, 阅读manpage, 并关心zmq API的返回值

有三件事我建议你养成习惯

  1. 封装一些工具函数, 并且在你的编程生涯中不断的改进它们
  2. 多查阅编程手册, 在*nix平台上, 多查阅manpage
  3. 对于C网络的API, 多关心函数的返回值的意义. 这里的返回值包括但不限于: 函数的返回值, errno, errmsg等

现在我要写三个工具函数, 这三个函数都不完美, 但它们都会出现大后续的示例程序里, 用于缩减示例程序的篇幅:

第一个工具函数: 向zmq socket发送字符串数据, 但不带结尾的'\0'

/*
* 把字符串作为字节数据, 发送至zmq socket, 但不发送字符串末尾的'\0'字节
* 发送成功时, 返回发送的字节数
*/
static inline int s_send(void * socket, const char * string)
{
return zmq_send(socket, string, strlen(string), 0);
}

第二个工具函数: 从zmq socket中接收数据, 并把其解释为一个字符串

/*
* 从zmq socket中接收数据, 并将其解释为C风格字符串
* 注意: 该函数返回的字符串是为在堆区创建的字符串
* 请在使用结束后手动调用free将其释放
*/
static inline char * s_recv(void * socket)
{
char buffer[256];
int length = zmq_recv(socket, buffer, 255, 0);
if(length == -1)
{
return NULL;
} buffer[length] = '\0'; return strndup(buffer, sizeof(buffer) - 1);
}

第三个函数: 在取值范围 [0, x) 中随机生成一个整数

/*
* 生成一个位于 [0, num)区间的随机数
*/
#define randof(num) (int)((float)(num) * random() / (RAND_MAX + 1.0))

这些工具函数都会以静态内联函数的形式写在一个名为 "zmq_helper.h" 的头文件中, 在后续用得着这些工具函数的时候, 示例程序将直接使用, 而不做额外的说明. 对应的, 当新增一个工具函数的时候, 工具函数本身的源代码会在合适的时候贴出

什么是模式? pattern?

相信以Java为主要工作语言的同学, 在毕业面试的时候基本上都被面试官问过各种设计模式, design patterns. 不知道你们有没有思考过一个哲学问题: 什么是模式? 什么是pattern? 为什么我们需要设计模式?

我在这里给出我的理解: 模式并不高大上, 模式其实就是"套路". 所谓的设计模式就是在面向对象程序设计架构中, 前人总结出来的一些惯用套路.

网络编程中也有这样的套路, 也被称之为模式, pattern. ZMQ作为一个像消息库的网络库, 致力于向你提供套路, 或者说, 向你提供一些便于实现套路的工具集. 下面, 我们来看我们接触的第二个套路: 发布-订阅套路. (第一个套路是 请求-应答 套路)

发布-订阅 套路

发布-订阅套路中有两个角色: 发布者, 订阅者. 或者通俗一点: 村口的大喇叭, 与村民.

发布者, 与村口的大喇叭的共性是: 只生产消息, 不接收消息. 而订阅者与村民的共性是: 只接收消息, 而不生产消息(好吗, 村民会生产八卦消息, 抬杠就没意思了). ZMQ提供了两种特殊的socket用于实现这个模式, 这个套路, 下面是一个例子:

村口的大喇叭循环播放天气预报, 播放的内容很简单: 邮编+温度+相对温度. 各个村民只关心自己村的天气情况, 他们村的邮编是10001, 对于其它地区的天气, 村民不关心.

发布者/村口的大喇叭:

#include <zmq.h>
#include <stdio.h>
#include <stdlib.h>
#include "zmq_helper.h" int main(void)
{
void * context = zmq_ctx_new();
void * socket = zmq_socket(context, ZMQ_PUB);
zmq_bind(socket, "tcp://*:5556"); srandom((unsigned)time(NULL)); while(1)
{
int zipcode = randof(100000); // 邮编: 0 ~ 99999
int temp = randof(84) - 42; // 温度: -42 ~ 41
int relhumidity = randof(50) + 10; // 相对湿度: 10 ~ 59 char msg[20];
snprintf(msg, sizeof(msg), "%5d %d %d", zipcode, temp, relhumidity);
s_send(socket, msg);
} zmq_close(socket);
zmq_ctx_destroy(context); return 0; }

订阅者/村民:

#include <zmq.h>
#include <stdio.h>
#include "zmq_helper.h" int main(void)
{
void * context = zmq_ctx_new();
void * socket = zmq_socket(context, ZMQ_SUB);
zmq_connect(socket, "tcp://localhost:5556"); char * zipcode = "10001";
zmq_setsockopt(socket, ZMQ_SUBSCRIBE, zipcode, strlen(zipcode)); for(int i = 0; i < 50; ++i)
{
char * string = s_recv(socket);
printf("[Subscriber] Received weather report msg: %s\n", string);
free(string);
} zmq_close(socket);
zmq_ctx_destroy(context); return 0;
}

makefile

all: publisher subscriber
%: %.c
gcc -std=c99 $^ -o $@ -lzmq

这个例子中需要特别注意的点有:

  1. 村民必须通过zmq_setsockopt函数设置一个过滤器, 以说明关心哪些消息. 如果不设置过滤器, 那么什么消息都不会收到
  2. 即便你先启动村民, 再给大喇叭上电, 村民还是会遗漏掉大喇叭最初始发送的一些消息..呃, 这么讲吧, 大概能丢失几万条这样.这是因为tcp建立连接需要时间. 在建立连接这段时间内, 大喇叭已经向外疯狂发送了很多消息. 在后续章节, 大概在第三章, 我们将会学到如何严格同步村民与喇叭. 让喇叭在所有村民就绪之后再开始发送消息.

另外, 关于这个例子中的两种socket类型, 有以下特点

  1. ZMQ_PUB类型的socket, 如果没有任何村民与其相连, 其所有消息都将被简单就地抛弃
  2. ZMQ_SUB类型的socket, 即是村民, 可以与多个ZMQ_PUB类型的socket相连, 即村民可以同时收听多个喇叭, 但必须为每个喇叭都设置过滤器. 否则默认情况下, zmq认为村民不关心喇叭里的所有内容.
  3. 当一个村民收听多个喇叭时, 接收消息采用公平队列策略
  4. 如果存在至少一个村民在收听这个喇叭, 那么这个喇叭的消息就不会被随意抛弃: 这句话的意思是, 当消息过多, 而村民的消化能力比较低的话, 未发送的消息会缓存在喇叭里.
  5. 在ZMQ大版本号在3以上的版本里, 当喇叭与村民的速度不匹配时. 若使用的传输层协议是tcpipc这种面向连接的协议, 则堆积的消息缓存在喇叭里, 当使用epgm这种协议时, 堆积的消息缓存了村民里. 在ZMQ 大版本号为2的版本中, 所有情况下, 消息都将堆积在村民里. 后续章节我们会学习到, 如何以"高水位阈值"来保护喇叭.

ZMQ里的ZMQ_PUB型的发布者, 也就是喇叭, 其发送消息的能力是很炸的, zmq的作者在官方的guide里讲到, 发布者与订阅者位于同台机器上, 通过tcp://locahost连接, 发布者发布一千万条消息, 大概用时4秒多. 这还是一台2011年的i5处理器的笔记本电脑. 还不是IDC机房里的服务器...你大致感受一下..这个时候有人就跳出来说了, 这同台机器走了loopback, 肯定效率高啊.

如果你也冒出这样的想法, pong友, 看来你没理解zmq的作者想表达的意思. 显然, 如果采用以太网作链路层, 这个数据不可能这么炸裂, 但作者只是想向你表达: ZMQ本身绝对不会成为性能的瓶颈, 瓶颈肯定在网络IO上, 而不是ZMQ库, 甚至于说操作系统协议栈上. 应用程序的性能瓶颈, 99.9999%都不在协议栈与网络库上, 而是受限于物理规格的网络IO.

性能低? 你不买个几百张82599武装你的机房, 性能低你怪谁? 心里没一点i3数吗?

分治套路

分治套路里有三个角色:

  1. Ventilator. 包工头, 向手下各个工程队分派任务. 一个.
  2. Worker. 工程队, 从包工头里接收任务, 干活. 多个.
  3. Sink. 甲方监理, 工程队干完活后, 向甲方监理报告. 所以工程队的活干完之后, 监理统一收集所有工程队的成果. 一个.

在介绍这一节的示例代码之前, 我们先引入了两个工具函数:

/*
* 获取当时时间戳, 单位ms
*/
static inline int64_t s_clock(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (int64_t)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
} /*
* 使当前进程睡眠指定毫秒
*/
static inline void s_sleep(int ms)
{
struct timespec t;
t.tv_sec = ms/1000;
t.tv_nsec = (ms % 1000) * 1000000; nanosleep(&t, NULL);
}

分治套路也被称为流水线套路. 下面是示例代码:

包工头代码:

#include <zmq.h>
#include <stdio.h>
#include <time.h>
#include "zmq_helper.h" int main(void)
{
void * context = zmq_ctx_new();
void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
void * socket_to_worker = zmq_socket(context, ZMQ_PUSH);
zmq_connect(socket_to_sink, "tcp://localhost:5558");
zmq_bind(socket_to_worker, "tcp://*:5557"); printf("Press Enter when all workers get ready:");
getchar();
printf("Sending tasks to workers...\n"); s_send(socket_to_sink, "Get ur ass up"); // 通知监理, 干活了 srandom((unsigned)time(NULL)); int total_ms = 0;
for(int i = 0; i < 100; ++i)
{
int workload = randof(100) + 1; // 工作需要的耗时, 单位ms
total_ms += workload;
char string[10];
snprintf(string, sizeof(string), "%d", workload);
s_send(socket_to_worker, string); // 将工作分派给工程队
} printf("Total expected cost: %d ms\n", total_ms); zmq_close(socket_to_sink);
zmq_close(socket_to_worker);
zmq_ctx_destroy(context); return 0;
}

工程队代码:

#include <zmq.h>
#include <stdio.h>
#include "zmq_helper.h" int main(void)
{
void * context = zmq_ctx_new();
void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL);
void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
zmq_connect(socket_to_ventilator, "tcp://localhost:5557");
zmq_connect(socket_to_sink, "tcp://localhost:5558"); while(1)
{
char * msg = s_recv(socket_to_ventilator);
printf("Received msg: %s\n", msg);
fflush(stdout);
s_sleep(atoi(msg)); // 干活, 即睡眠指定毫秒
free(msg);
s_send(socket_to_sink, "DONE"); // 活干完了通知监理
} zmq_close(socket_to_ventilator);
zmq_close(socket_to_sink);
zmq_ctx_destroy(context); return 0;
}

监理代码:

#include <zmq.h>
#include <stdio.h>
#include "zmq_helper.h" int main(void)
{
void * context = zmq_ctx_new();
void * socket_to_worker_and_ventilator = zmq_socket(context, ZMQ_PULL);
zmq_bind(socket_to_worker_and_ventilator, "tcp://*:5558"); char * msg = s_recv(socket_to_worker_and_ventilator);
printf("Received msg: %s", msg); // 接收来自包工头的开始干活的消息
free(msg); int64_t start_time = s_clock(); for(int i = 0; i < 100; ++i)
{
// 接收100个worker干完活的消息
char * msg = s_recv(socket_to_worker_and_ventilator);
free(msg); if(i / 10 * 10 == i)
printf(":");
else
printf(".");
fflush(stdout);
} printf("Total elapsed time: %d ms]\n", (int)(s_clock() - start_time)); zmq_close(socket_to_worker_and_ventilator);
zmq_ctx_destroy(context); return 0;
}

这个示例程序的逻辑流程是这样的:

  1. 包工头向两个角色发送消息: 向工程队发送共计100个任务, 向监理发送消息, 通知监理开始干活
  2. 工程队接收来自包工头的消息, 并按消息里的数值, 睡眠指定毫秒. 每个任务结束后都通知监理.
  3. 监理先是接收来自包工头的消息, 开始计时. 然后统计来自工程队的消息, 当收集到100个任务完成的消息后, 计算实际耗时.

包工头里输出的预计耗时是100个任务的共计耗时, 在监理那里统计的实际耗时则是由多个工程队并行处理100个任务实际的耗时.

这里个例子中需要注意的点有:

  1. 这个例子中使用了ZMQ_PULLZMQ_PUSH两种socket. 分别供消息分发方与消息接收方使用. 看起来略微有点类似于发布-订阅套路, 具体之间的区别后续章节会讲到.
  2. 工程队上接包工头, 下接监理. 在任务执行过程中, 你可以随意的增加工程队的数量.
  3. 我们通过让包工头通知监理, 以及手动输入enter来启动任务分发的方式, 手动同步了工程队/包工头/监理. PUSH/PULL模式虽然和PUB/SUB不一样, 不会丢失消息. 但如果不手动同步的话, 最先建立连接的工程队将几乎把所有任务都接收到手, 导致后续完成连接的工程队拿不到任务, 任务分配不平衡.
  4. 包工头分派任务使用的是轮流/平均分配的方式.这是一种简单的负载均衡
  5. 监理接收多个工程队的消息, 使用的是公平队列策略.

所以, 你大致能看出来, 分治套路里有一个核心问题, 就是任务分发者与任务执行者之间的同步. 如果在所有执行者均与分发者建立连接后, 进行分发, 那么任务分发是比较公平的. 这就需要应用程序开发者自己负责同步事宜. 关于这个话题进一步的技巧将在第三章进一步讨论.

使用ZMQ的一点建议

现在我们写了三个例子, 分别是请求-回应套路, 发布-订阅套路, 流水线套路. 在继续进一步学习之前, 有必要对一些点进行强调

  1. 学习ZMQ请慢慢学. 不要着急. 其实学习所有库工具都是如此, learn it by hard way. 很多程序员总是看不到 "看懂" 和 "学会" 这两个层次之间的十万公里距离, 觉得"看懂"了, 再抄点代码, 复制粘贴一下, 就算是"精通"ZMQ了, 不, 不, 不, 差得远, 当年你就这这样学C语言的, 所以除了数据结构实验, 你写不出任何有用的代码. 我建议你一步一步的学习, 不要急功近利, 仔细的写代码, 琢磨, 体会, 理解.
  2. 养成良好的编程风格, 不要写屎一样的代码.
  3. 重试自测, 无论是在工作还是在学习上, 用各种测试手段来保证代码质量, 不要从心理上过度依赖debug
  4. 学会抽象, 无论是工作还是学习中, 积累代码, 自己动手写一些函数, 封装, 并随着时间去精炼它们, 慢慢的, 虽然最终你会发现你写的代码99%都是屎, 但这个沉淀的过程对你一定有很大的帮助.
  5. 上面四条是zmq guide原作者的建议, 我表示比较赞同.

正确的处理context

你大致注意到了, 在上面的所有示例代码中, 每次都以zmq_ctx_new()函数创建出一个名为context的变量, 目前你不需要了解它的细节, 这只是ZMQ库的标准套路. 甚至于你将来都不需要了解这个context里面到底是什么. 但你必须要遵循zmq中关于这个context的一些编程规定:

  1. 在一个进程起始时调用zmq_ctx_new()创建context
  2. 在进程结束之前调用zmq_ctx_destroy()销毁掉它

每个进程, 应该持有, 且应该只持有, 一个context. 当然, 目前来说, 你这样理解就行了, 后续章节或许我们会深入探索一下context, 但目前, 请谨记, one context per process.

如果你在代码中调用了fork系统调用, 那么请在子进程代码区的开始处调用zmq_ctx_new(), 为子进程创建自己的context

把屁股擦干净

网络编程和内存泄漏简直就是一对狗男女, 要避免这些狗血的场景, 写代码的时候, 时刻要谨记: 把屁股擦干净.在使用ZMQ编程的过程中, 我建议你:

  1. 在调用zmq_ctx_destroy()之前, 先调用zmq_close()关闭掉所有的zmq socket. 否则zmq_ctx_destroy可能会被一直阻塞着
  2. 尽量使用zmq_send()zmq_recv()来收发消息, 尽量避免使用与zmq_msg_t相关的API接口. 是的, 那些接口有额外的特性, 有额外的性能提升, 但在性能瓶颈不在这些细枝末节的时候, 不要过度造作.
  3. 假如你非得用zmq_msg_t相关的接口收发消息, 那么请在调用zmq_msg_recv()之后, 尽快的调用zmq_msg_close()释放掉消息对象
  4. 如果你在一个进程中开了一堆堆的socket, 那么你就需要在架构上思考一下, 你的程序是不是有设计不合理的地方.
  5. 在进程退出的时候, 时刻谨记关闭socket, 销毁context
  6. 不要在多个线程间共享socket.
  7. 用完socket之后记得关闭.
  8. 上面是zmq guide作者给出的建议, 下面, 我再给你一条: 熟读相关接口的manpage, 注意接口的返回值, 做好调用失败后的灾后重建工作

当然, 上面主要是对C语言作者的一些建议, 对于其它语言, 特别是有GC的语言, 使用ZMQ相关接口之前建议确认相关的binding接口是否正确处理了资源句柄.

你为什么需要ZMQ

网络编程, 特别是*nix平台的网络编程, 99%程序员的启蒙始于<Unix网络编程>这本书, 90%里的项目充斥着linux socket, epoll与fd. 是的, 2018年了, 他们还是这么干的. 我们就从这个视角来列举一下, 使用*nix平台原生的网络API与多路IO接口, 你在写服务端程序时需要头疼的事情:

  1. 如何处理IO. 阻塞式IO太低效, 异步IO代码不好写.
  2. 如何平滑的向你的服务增删机器, 平行扩容
  3. 如何传递消息? 如何设计消息结构? 通讯协议?
  4. 消息传递过程中如何缓冲? 生产消费速度不一致时采用何种策略?
  5. 如何处理消息丢失? 如何保证通讯的可靠性?
  6. 如何处理多种三层四层协议之间的协同?
  7. 消息如何路由? 如何负载均衡? 如何实现有状态的会话?
  8. 如何处理多编程语言的协同?
  9. 如何使消息在多种架构机器上能通用读写? 如何实现了, 如何保证效率和成本?
  10. 如何处理网络错误?

我问你, 你头大不大? 想不想死?

读过开源项目吗? 比如Hadoop Zookeeper, 你去观摩一下zookeeper.c, 真是看的人头大想死. 你再翻翻其它开源项目, 特别是用C/C++写的Linux端程序, 每个都要把网络库事件库重新写一遍.

所以矛盾很突出, 为什么不能造一个大家都用的轮子呢? 原因很简单, 有两个方面:

  1. 对于大佬来说, 操作系统提供的网络API和事件API已经算是轮子了
  2. 真的要做一个通用的网络库, 或者消息库, 其实难度非常大. AMQP就是一个例子, 你可以去感受一下.

那么ZMQ解决了什么问题呢? ZMQ给上面提出的问题都给了完美答案吗? 理性的说, 肯定没有, 但是ZMQ是这样回答这些问题的:

  1. ZMQ用后台线程实现了IO的异步处理. 应用间的通信使用了无锁的数据结构.
  2. 集群中的结点动态增删的时候, ZMQ能默默的正确处理重连/断连等脏活.
  3. ZMQ努力的对消息做了队列缓存, 多数情况下, 默认的ZMQ行为为你提供了便利, 也足够应付你的应用场景.
  4. 当缓冲队列爆掉时, ZMQ提供了"高水位阈值"这个机制. 这个机制在队列爆掉时将自动阻塞发送者, 或者静静的扔掉数据. 具体哪种行为, 取决于你使用的socket的类型
  5. ZMQ可以欢快的跑在多种传输层协议上, 更改协议甚至不需要怎么改代码(好吧, 至少要改那么一两行)
  6. ZMQ在多种套路下, 都会像爸爸看儿子那样小心翼翼的照顾那些低能儿(处理消息的速度比较慢的那些结点)
  7. 有多种现成的套路让你实现花式负载均衡. 比如请求回应套路, 发布订阅套路.
  8. ZMQ可以很简单的创建代理, 代理是一种有效降低网络局部复杂度的技术.
  9. ZMQ保证消息传递的原子性. 要么所有消息都收到, 要么你一根毛都收不到.
  10. ZMQ本身并不引入二进制消息的规范. 你如何解释消息, 那完全是你的自由.
  11. ZMQ多数情况下可以妥善的处理网络异常, 比如在合适的场合进行合适的重试重传, 这些脏活对于你来说, 都是透明的, 不可见的.
  12. ZMQ能有效降低你IDC里的碳排放. 保护环境人人有责.

总之, 就是很好, 当然了没有一个框架库的作者会说自己的产品不好, 而具体好不好, 学了用了之后才会知道, 上面的点看一看得了, 别当真.

socket的可扩展性

在发布-订阅套路由, 当你开启多个村民的时候, 你会发现, 所有村民都能收到消息, 而村口的喇叭也工作正常. 这就是zmq socket的可扩展性. 对于发布端来讲, 开发人员始终面对的是一个socket, 而不用去管连接我到底下面会有多少订阅用户. 这样极大简化了开发人员的工作, 实际发布端程序跑起来的时候, 会自主进行适应, 并执行最合理的行为. 更深层次一点, 你可能会说, 这样的功能, 我用epoll在linux socket上也能实现, 但是, 当多个订阅者开始接收数据的时候, 你仔细观察你cpu的负载, 你会发现发布端进程不光正确接纳了所有订阅者, 更重要的是把工作负载通过多线程均衡到了你电脑的多个核心上. 日最大程度的榨干了你的cpu性能. 如果你单纯的用epoll和linux socket来实现这个功能, 发布端只会占用一个核心, 除非你再写一坨代码以实现多线程或多进程版的村口大喇叭.

04-24 21:53