0%

zmq中文指南_2

第二章——socket和模式

  在第一章我们稍微接触了一下zmq,熟悉了zmq一些主要模式的例子:request-reply,pub-sub和pipeline.在这章我们开始学习怎么把这些工具用到实际编程中。
  我们会讲到:

  • 怎么创建zmq socket,并用其工作。
  • 怎么在socket上发送和接收message。
  • 怎么用zmq的异步IO模型构建自己的app;
  • 怎么在一个线程中处理多个sockets
  • 怎么正确处理致命和非致命的错误。
  • 怎么处理像ctrl-c中断信号;
  • 怎么干净的退出一个zmq程序;
  • 怎么检查zmq程序内存泄漏;
  • 怎么发送和接收多帧信息;
  • 怎么在网络中中转信息;
  • 怎么构建一个简单的消息中间件;
  • 怎么用zmq写多线程应用;
  • 怎么在线程间用zmq;
  • 怎么在网络节点间使用zmq进行同步;
  • 怎么对pub-sub创建和使用信封;
  • 用HWM防止内存溢出。

socket的API

  说实话,zmq做了某种替换,但我们并不对此感到愧疚。它对你来说是好的,对我们来说是不好的。Zmq奉上了一组熟悉的以socket为基础的api,这让我们花了大力气去隐藏很多消息处理细节。然而,我们努力的成果会慢慢改变你去设计和编写分布式软件的世界观。

  Socket是网络编程事实上的标准api,也很吸引人。Zmq对开发者有好的一个例子就是它用socket和message的概念而不是一些其他任意的概念。Kudos to Martin Sustrik for pulling this off. It turns “Message Oriented Middleware”, a phrase guaranteed to send the whole room off to Catatonia, into “Extra Spicy Sockets!”, which leaves us with a strange craving for pizza and a desire to know more.(这段不会翻译了:( )

  像美味的菜一样,zmq的sockets很容易消化。跟BSD sockets一样,zmq的sockets生命期有4部分:

  • 创建销毁socket,协同作用组成socket的这一生(zmq_socket(), zmq_close())。
  • 操作socket,配置或者读取配置(zmq_setscokopt(), zmq_getsockopt())。
  • 创建zmq的网络连接,把sockets嵌入到网络拓扑结构中(zmq_bind(), zmq_connect())。
  • 使用socket接收发送message(zmq_msg_send(), zmq_msg_recv())。

  注意sockets总是void指针,message(我们很快就会看到了)总是结构体。所以在C中你就按原样传入sockets,但你需要在所有操作message的函数(比如zmg_msg_send(), zmq_msg_recv())中传入message的地址。要记住是很简单的,“在zmq中,所有的socket都是我们的”, 但message全都是你自己的。

  创建、销毁和配置sockets可以按照你对任何对象期望的那样工作,但记住zmq是异步、可扩展的框架。这对我们如何把socket接入到网络拓扑结构和之后我们怎么使用socket都有影响

把socket嵌入拓扑结构中

  为了在两个节点之间创建连接,你需要在一个节点用zmq_bind(),在另一个节点用zmq_connect()。一般来说,用zmq_bind()的一端是“server”,有个公开的网络地址,用zmq_connect()的一端是“client”,有个未公开或者任意的网络地址。因此我们说我们“绑定socket到一个端点上”和“连接一个socket到一个端点上”,这个“端点”就是个公开的网络地址。

  zmq的连接情况有点儿不同于经典的TCP连接。主要不同在于:

  • zmq可以使用任意的传输方式(inproc, ipc, tcp, pgm或者epgm),查看zmq_inproc(),zmq_ipc(),zmq_tcp(),zmq_pgm(),zmq_epgm()
  • 一个socket可以又很多连进和很多连出。
  • 没有zmq_accept()方法。当一个socket被绑定到一个端口上时,它就自动开始accept连接了。
  • 网络连接自动发生在后台,如果网络断开,zmq会自动重连(比如说对端掉线后又上线)。
  • 你自己的应用的代码没办法直接使用zmq的连接,他们都被封装到socket中了。

  许多架构都遵循client/server模式:server是相对固定的;client则大部分是动态的,他们可能很多时候来来去去。这涉及到定位的问题:server对client来说是可见的,但反过来就不一定。所以哪个节点用zmq_bind()(server),哪个节点用zmq_connect()(client)就很明显了。当然这也依赖于你使用的socket种类和一些不常用的网络架构。我们稍后会看不同的socket类型。

  现在,假设我们先于server启动client。在传统的网络编程中,我们会得到个大红叉,但zmq会让我们随意启动关闭client。一旦client节点做了zmq_connect()的操作,连接就存在了,并且该节点可以开始往socket中写数据了。有些情况下(希望在消息队列累积到丢失数据或者client阻塞发生之前)server会活过来,进行zmq_bind()操作,然后zmq就开始递送数据了。

  一个server节点可以绑定到很多端点上(也就是说是一个协议和地址的组合)并且它可以用一个socket做到这些。这意味着它会接收不同的传输方式:

1
2
3
zmq_bind(socket, "tcp://*:5555");
zmq_bind(socket, "tcp://*:9999");
zmq_bind(socket, "inproc://somename");

  对大多数传输方式来说,你不能重复绑定同一个端点,不像UDP的例子。然而,ipc传输确实会让一个进程绑定到一个先前进程使用过的端点上。这意味着它允许进程在崩溃后恢复。
  尽管zmq努力让哪端绑定(bind)哪端连接(connect)不那么明显,但它们确实不同。我们稍后会更详细的查看这些不同。你需要首先注意的是“servers”在你的拓扑结构中是相对固定的部分,“clients”则是那些经常变动、连接其它端点的部分,然后围绕这个模型去设计你的应用。

  socket有不同的类型。socket的类型定义了socket的语义,规定了向前向后中继数据或者组建队列的准则。你可以把特定类型的socket连接起来,比如连接一个publisher socket和一个subscriber socket。sockets共同工作在“消息模式”(messaging patterns)中。我们稍后会详细讲到。

  使用不同的方式连接socket给了zmq作为消息队列系统的基本能力。其上则有不同的层,比如说代理。但从本质上来说用zmq设计你的网络架构就像孩子搭积木一样。

发送和接收message

  你需要用zmq_msg_send()zmq_msg_recv()来发送和接收message。这些名字是常规的命名方式,但是zmq的I/O模型跟传统的TCP模型有很大的不同,你需要时间来适应它。

f9

  在处理数据的时候TCP的socket和zmq的socket主要有以下的不同点:

  • zmq像UDP那样传输数据,而不像TCP那样把数据当成字节流。一个zmq的message是特定长度的二进制数据。我们很快就会讨论到message;它们的设计考虑到了性能的优化并且使用了一些技巧。
  • zmq的socket在后台线程操作I/O。这意味着不管你的应用程序是否在忙,message都会到达本地的输入队列,也会发送到本地的输出队列。
  • 根据socket的类型,zmq的socekt有内建的1-N路由模型。

  zmq_send()操作并不会真正把message发送到连接着的socket中。它会把message加入到队列中好让I/O线程异步的发送。除了一些特殊的情况,它不会阻塞。所以当zmq_send()返回时并不意味着message就发送出去了。

单播传输

  zmq提供了一整套的单播传输(inproc, ipc, tcp)和多播传输(epgm, pgm)。多播是比较高级的技术,我们会在以后讨论到。在你清楚知道你的扇出率会使单播1-N模型失效之前不要尝试使用多播。

  大多数情况下使用tcp协议,该tcp是非连接的tcp传输。它简单可扩展并且对大多数情况来说都足够快。我们称它为非连接的是因为zmq的tcp传输在我们连接对端之前并不要求对端存在。client和server可以在任何时候绑定(bind)和连接(connect),可以随时退出加入,这些对应用程序来说都是透明的。

  进程间通信也是非连接的。但它有个局限:不能在Windows下工作。通常我们用“.ipc”的扩展名命名端点名,防止和其他文件名产生冲突。在UNIX系统中,如果你使用ipc通信你需要用适当的权限去创建ipc文件,否则有不同用户ID的进程不能共享该ipc通道。同时你也必须保证所有的进程都能找到ipc文件,比如让它们跑在同一个工作目录下。

  线程间通信inproc是个面向连接的信号传输通道。它比tcp和ipc都快。这种通信方式相比tcp和ipc来说有个特殊的限制:server必须在client connect之前bind。在将来版本中可能会修改这种设定,但目前来说这个规定了你怎样使用inproc socket:我们创建并绑定一个socket,然后启动一个连接其它socket的子线程。

zmq不是中立的传输者

  新使用zmq的人普遍问的一个问题(我也这样问过我自己)是:“我怎么用zmq写个XYZ的服务器?”比如说,“我怎么用zmq写个HTTP的服务器?”这意味着如果我们可以用传统的socket来传输HTTP的请求和回复,我们就应该可以用zmq的socket做同样的事情,仅有的变化是用zmq更快更好。

  答案通常是“这并不是zmq干的事情”。zmq不是个中立的传输者:它会在它使用的协议上组建一个帧栈。这个帧栈跟原有的用它们自己的帧栈的协议并不兼容。例如,同样是基于TCP/IP协议,HTTP的请求跟zmq的请求相比如下:

f10

  HTTP请求用CR-LF作为它最简单的帧分隔符,但zmq用长度标识帧。所以你可以用zmq的request-reply模式写一个像HTTP的协议,但它并不是HTTP协议。

f11

  但从v3.3版本开始,zmq添加了一个叫ZMQ_ROUTER_RAW的socket选项,该选项让你可以不用zmq的帧栈读写数据。你可以用这个选项读写合适的HTTP请求和回复。Hardeep Singh为了能用telnet连接他的zmq应用而写了这个更新。编写这个更新仍然需要经验,但它也显示了zmq怎样为了解决新的问题而发展的。可能下个更新就是你的。

I/O线程

  我们说过zmq是使用后台线程处理I/O操作的。一个I/O线程(对所有类型的socket来说)对几乎所有的程序都是很有效的,当然,除了那些特别极端的情况。当你创建一个新的上下文(context)时,它会开启一个I/O线程。通常一个I/O线程可以处理每秒1G的进出数据。如果需要增加I/O线程数,你需要在创建socket之前使用zmq_ctx_set():

1
2
3
4
int io_threads = 4;
void *context = zmq_ctx_new();
zmq_ctx_new(context, ZMQ_IO_THREADS, io_threads);
assert(zmq_ctx_get(context, ZMQ_IO_THREADS) == io_threads);

  我们已经知道了一个socket可以同时操作几十个甚至几千个连接。这取决于你怎么设计你的应用程序。传统的网络应用对每个远程连接来说有一个进程或线程去处理,这里的每个进程或线程操作一个socket。zmq可以让你把所有的这些架构都组装到一个单独的进程,然后根据需要把它组装到你的拓扑结构中。

  如果你只是用zmq进行线程间通信(比如说没外部socket I/O操作的多线程程序),你可以把I/O线程数设置为0,但这并不是一个显著的性能优化,更多的只是个小技巧而已。

消息模式

  在zmq的socket地下隐藏的是消息模式的世界。如果你有大型消息系统的背景,或者你对UDP很了解,你就会对zmq的消息模式感到熟悉。但大多数zmq的新手会感到很新奇,因为我们大多很熟悉TCP那种一对一的消息模式。

  让我们简要的回忆下zmq都能为你做些什么事。它能快速有效的传输数据块(message)。你可以把节点映射为线程、进程或者(物理)节点。zmq会提供给你单一一个的socket API使用,而你不用去关心真正的传输协议(比如进程内通信、进程间通信、TCP或者多播)是什么。在对端上线或者掉线时它会自动重连。在需要的时候它会在发送者和接收者两端都缓存message进队列。它会限制这些队列的长度以保证进程不会撑爆内存。它会处理socket的错误。它在后台线程处理所有的I/O操作。它在节点间的通信使用无锁技术,所以它从不会有锁、等待、信号量或者死锁的出现。

  但深入进去,我们会发现zmq使用一种叫“模式”的概念来路由和存储messag。正是这些模式才让zmq这么智能。它们基于我们在分发数据和任务的最佳方式的工作上艰辛获得的大量经验。zmq的模式是硬编码进去的,但将来可能会有用户自定义的模式

  zmq的模式是用两个相匹配的socket类型来实现的。换句话说,为了了解zmq的模式,你需要了解socket的类型和它们是如何在一起工作的。总之这些是需要学习的;因为这些并不是那么明显就能掌握的。

  内建的zmq模式有一下这些:

  • Request-reply,连接一组client到一组server,这是远程过程调用(rpc)和任务分发模型。
  • Pub-sub,连接一组publisher到一组subscriber。这是数据分发模型。
  • Pipeline,连接一组有多步骤和循环的扇入扇出模型的节点。这是平行任务分发和手机模型。
  • Exclusive pair,单独连接两个socket。这个模型是用来连接一个进程内两个线程的,不要跟“正常的”socket对混淆。

  我们在第一章中看了前三个模型,在本章后边我们会看到exclusive pair模型。zmq_socket()的手册对这些模型介绍得更清楚——很值得看上几遍。下面是有效的socket组合(任意一端都能bind):

  • PUB 和 SUB
  • REQ 和 REP
  • REQ 和 ROUTER
  • DEALER 和 REP
  • DEALER 和 ROUTER
  • DEALER 和 DEALER
  • ROUTER 和 ROUTER
  • PUSH 和 PULL
  • PAIR 和 PAIR

  你也会看到关于XPUB和XSUB socket的说明,我们会在稍后遇到(他们是PUB和SUB的原始版本)。任何其他的组合会产生未定义和不可靠的结果,在将来的zmq版本中如果你尝试使用它们可能会得到错误的返回值。你可以而且当然可以用代码桥接不同的socket类型,比如说从一个socket类型读取数据,然后写入到另外一种socket类型。

高级message模型

  这四个核心模型是内建在zmq中的。他们是zmq API的组成部分,由核心c++库实现,并且保证在各个版本都会提供。

  在这之上,我们添加了高级的message模型。我们基于zmq构建了这些高级模型,并且在各种不同语言中都做了实现。他们并不是核心库的一部分,并不会包含在zmq包中,只是存在与zmq社区中属于他们自己的空间内。例如我们会在第四章中遇到的Majordome模型,存在于zmq组织的GitHub的Manjordome项目中。

  这本书的目的之一是给你提供一套那样的高级模型,有小型的(如何稳妥的处理messages)也有大型的(如果构架可靠的pub-sub架构)。

使用messages

  libzmq核心库实际上有两个API来发送和接收message。我们已经见过并且用过了zmq_send()zmq_recv()。 我们仍会经常使用他们,但zmq_recv()在处理任意大小的message时相当麻烦:它会把message截断到你提供缓冲区大小。所以有第二类API来操作zmq_msg_t结构,这个操作虽然更丰富但也更困难些:

  • 初始化一条message: zmq_msg_init(), zmq_msg_init_size(), zmq_msg_init_data()
  • 发送和接收一条message: zmq_msg_send(), zmq_msg_recv()
  • 释放一条message: zmq_msg_close()
  • 获取message的内容: zmq_msg_data(), zmq_msg_size(), zmq_msg_more()
  • 操作message的属性: zmq_msg_get(), zmq_msg_set()
  • message复制: zmq_msg_copy(), zmq_msg_move()

  在传输过程中, zmq的message是任意大小(从0到内存可用的大小)的数据块。你需要用protocol buffers, msgpack, JSON或者任意其他你的程序通信需要的方式做序列化。选择一种简便的数据表现形式是明智的,但你也可以根据需要设计你自己的形式。

  在内存中,zmq的message是以zmq_msg_t结构体(或类,根据你选择的语言来定)的形式存在的。在c中使用zmq有以下一些基本原则:

  • 你创建传递的是zmq_msg_t对象, 不是数据块。
  • 读取一条message时,你需要先用zmq_msg_init()创建一个空的message, 然后把它传到zmq_msg_recv()中。
  • 用新的数据写一条message时,你要用zmq_msg-init_size()创建一条message,同时分配数据块大小的内存。然后用memcpy填充那个message,再然后把它传给zmq_msg_send()
  • 释放(不是销毁)一条message,你需要调用zmq_msg_close()。这会删除一个引用,最终zmq会销毁这条message。
  • 在获取message内容的时候,你需要用zmq_msg_data()。如果要知道这条message里有多少数据,使用zmq_msg_size()
  • 除非你认真读了手册页并且清楚的知道你需要的是什么,不要使用zmq_msg_move(), zmq_msg_copy() 或者 zmq_msg_init_data()
  • 在你把一条message传入zmq_msg_send()之后,zmq会清空这条message。比如设置其大小为0. 你不能重复发送同一条message,并且也不能在发送一条message之后再获取它的内容。
  • 如果你使用zmq_send(), zmq_recv()发送字节数据而不是message结构体,以上这些原则并不使用。

  如果你想重复发送同一条message,并且它是相同大小的,就新建第二条messag, 用zmq_msg_init()初始化它,然后用zmq_msg_copy()创建第一条message的拷贝。这并不是复制数据,只是复制了一个引用。然后你就可以发送相同的message两次了(如果你做了更多的复制,就可以发送更多),message会在最后一个副本发被发送或者关闭的时候才真正销毁掉。

  zmq也支持多帧message,这让你能用单个数据包发送或接收一个帧的列表。真实的应用中广泛地使用这项技术,我们会在第三章讨论。

  帧(在zmq的手册页中也被称为“message部件(message parts)”)是zmq的message的基本数据包格式。一个帧是一个定长的数据块,长度从0开始。如果你写过一些TCP程序的话,你就会赞同帧是“现在我该从网络socket上读取多大的数据?”这样的问题的一个很好的答案了。

  ZMTP定义了zmq在一个TCP连接上如何读取和写入的。如果你有兴趣了解它怎么工作的话可以读读,相当短的。

  回过来看,一条zmq的message就是一个帧,像UDP那样。我们稍后会用多帧message来扩展这个概念,它相当简单,只是一系列用1标识一个“more”的标志位的帧后边跟一个用0标识接收的帧。zmq API会让你在写message的时候用“more”标志位,在读取的时候检查是否有“more”的标志位。

  然而在低级的zmq API和手册页里有时候会混淆message和frame(帧)。下边是些有用的解释:

  • 一条message可以是一个或多个部分。
  • 这些部分被称作“frames”。
  • 每个部分就是一个zmq_msg_t对象。
  • 在低级API中分别发送和接收单独的部分。
  • 高级的API们提供发送多部分messages的封装。

  关于message需要知道的别的东西:

  • 你可以发送长度为0的帧,比如从一个线程发送一个信号到另一个线程。
  • zmq保证传递一条message的所有部分(一个或多个),或者一个都不传递。
  • zmq不会立即发送message(单帧或多帧的),而是在不确定的稍后。因此一条多帧message必须能放进内存中。
  • 一条message(单帧或多帧的)必须能放进内存中。如果你想发送任意大小的文件,必须把它拆开成多个部分,把每个部分当成一条单帧的message来发送。使用多帧message不会减少内存使用。
  • 在那些作用域结束之后并不会自动销毁对象的语言中使用时,接收一条message之后你必须调用zmq_msg_close()。不必在发送一条message时使用这个方法。
      再强调一下,先不要使用zmq_msg_init_data()。这是零拷贝方法,会给你造成很多问题的。zmq有更多更重要的东西等着你去学习的~
      丰富的API也可能是负担。设计这些方法是为了优化性能,并不是为了使用简单。除非你认真读了手册页,否则你铁定会用错的。所以一个好的语言绑定的主要工作就是把这些API封装成更好使用的类。

操作多socket

  到目前为止所有的例子中,主循环基本上都像这样:

  1. 在socket上等待message
  2. 处理message
  3. 回复

  如果我们想同时从多个端点读取数据该怎么办?最简单的办法是用一个socket连接所有的端点,让zmq做扇入操作(注:像漏斗一样让信息流入一个socket)。如果远处端点用相同的模型的话,这样是可行的,但把一个PULL socket连到一个PUB端点就错了。
  
  实际上,我们用zmq_poll()同时从多个socket上读取数据。更好的办法是把zmq_poll()封装成一个良好的事件驱动框架,但很明显那些工作超出了我们要讲的范围。
  
  让我们开始干些累活吧,部分是为了尝试下做不对的乐趣,但主要是我想给你看看怎么做非阻塞的socket读取。下面是一个用非阻塞socket从两个socket读取的例子。这段比较复杂的程序同时干着天气月报的更新、订阅工作和一个并行任务的工作者的工作:   

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// msreader: Multiple socket reader in C
// Reading from multiple sockets
// This version uses a simple recv loop

#include "zhelpers.h"

int main(void){
// Connect to task ventilator
void *context = zmq_ctx_new();
void *receiver = zmq_socket(context, ZMQ_PULL);
zmq_connect(receiver, "tcp://localhost:5557");

// Connect to weather server
void *subscriber = zmq_socket(context, ZMQ_SUB);
zmq_connect(subscriber, "tcp://localhost:5556");
zmq_setsockop(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);

// Process messages from both sockets
// We prioritize traffic from the task ventilator
while(1){
char msg[256];
while(1){
int size = zmq_recv(receiver, msg, 255, ZMQ_DONTAWIT);
if(size != -1){
// Process task
}
else
break;
}
while(1){
int size = zmsg_recv(subscriber, msg, 255, ZMQ_DONTWAIT);
if(size != -1){
// Process sweather update
}
else
break;
}
// No activity, so sleep for 1 msec
s_sleep(1);
}
zmq_close(receiver);
zmq_close(subscriber);
zmq_ctx_destroy(context);
return 0;
}

  这种实现的代价是在第一条message上有额外延迟(在进程中没有等待message时,循环结尾的sleep())。在那些毫秒级延迟都有致命缺陷的应用中这是个大问题。另外,你需要详细查查nanosleep()的相关文档或者使用那些你确信不会造成忙循环的函数。

  你可以公平的使用两个socket:先读一个,再读另一个,而用设置优先级,就像我们在这个例子中做的那样。

  现在让我们看看下边这个没什么用处的小程序使用zmq_poll()怎么干的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// mspoller: Multiple socket poller in C
// Reading from multiple sockets
// This version uses zmq_poll()

#include "zhelpers.h"

int main(void){
// Connect to task ventilator
void *context = zmq_ctx_new();
void *receiver = zmq_socket(context, ZMQ_PULL);
zmq_connect(receiver, "tcp://localhost:5557");

// Connect to weather server
void *subscriber = zmq_socket(context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
zmq_setscokopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);

// Porcess messages from both sockets
while(1){
char msg[256];
zmq_pollitem_t items[] = {
{receiver, 0, ZMQ_POLLIN, 0},
{subscriber, 0, ZMQ_POLLIN, 0}
};
zmq_poll(items, 2, -1);
if(items[0].revents & ZMQ_POLLIN){
int size = zmq_recv(receiver, msg, 256, 0);
if(size != -1){
// Process task
}
}
if(items[1].revents & ZMQ_POLLIN){
int size = zmq_recv(subscriber, msg, 256, 0);
if(size != -1){
// Process weather update
}
}
}
zmq_close(subscriber);
zmq_ctx_destroy(context);
return 0;
}

  items结构体有以下四个成员:

1
2
3
4
5
6
typedef struct{
void *socket; // 0MQ socket to poll on
int fd; // OR, native file handle to poll on
short events; // Events to poll on
short revents; // Events returned after poll
}zmq_pollitem_t;

多帧message

  zmq让我们能用几个帧组成一条message, 给我们一条“多部分的message”。实际中的应用对多帧message的使用很广泛,可以用来给信息封装地址信息或者简单的序列化。我们稍后会看到回复信封。
  
  现在我们需要了解的是如何在那些不需要知道message内部信息,只需要向前传递它的应用(比如代理proxy)中透明安全的读写message。

  当你使用多帧message时,每个帧是一个zmq_msg项,比如说,如果你要发送一条有5部分的message,你必须创建,发送,销毁5个zmq_msg项。你可以提前组织每个部分(把zmq_msg项存到数组或者其他结构中),或者发送的时候一个一个组织。

  下面是我们如何发送多帧message的(我们把接收的每个帧都存放到一个message对象中):

1
2
3
4
5
zmq_msg_send(&message, socket, ZMQ_SNDMORE);
...
zmq_msg_send(&message, socket, ZMQ_SNDMORE);
..
zmq_msg_send(&message, socket, 0);

  下面是我们怎么接收处理多帧所有部分的:

1
2
3
4
5
6
7
8
9
10
while(1){
zmq_msg_t message;
zmq_msg_init(&message);
zmq_msg_recv(&message, socket, 0);
// Process the message frame
...
zmq_msg_close(&message);
if(!zmq_msg_more(&message))
break; // Last message frame
}

  我们需要知道的关于多帧message的东西:

  • 当你发送一条多帧message的时候,第一帧(还有后边跟着的所有帧)只有在你发送完最后一部分才真正发送出去。
  • 如果你使用zmq_poll(),当你收到message的第一帧,其余所有帧也都已经到了。
  • 要不你就收到一条message的所有帧,要不就什么都收不到。
  • 一条message的每个部分都是单独的zmq_msg项。
  • 不管你是否检查“more”属性,你都会收到一条message的所有帧。
  • 在发送的时候,zmq先把message的帧存入内存队列中,直到收到最后一个帧再把他们全部发出去。
  • 除了关闭socket,你是没办法取消发送一条已经发出去一部分的message的。

中间件和代理

  zmq致力于分布式智能,但那并不意味着你的网络中间就是空白的。在分布式网络中充满了消息传输基础设施,我们常用zmq去搭建这些架构。zmq设施能适应小到简单的通道大到面向服务的中间件设施。消息传输领域把这些中间设施称为中间件,意思是在中间解决两端信息沟通的东西。在zmq中,我们在不同上下文中称它们为代理,队列,传送者,设备或者掮客。

  这种模式在显示世界中相当常见,这也就是为什么我们的社会和商业充满了那些在复杂网络中为了减少复杂和浪费而存在的中间者。真实世界的中间件常被叫做批发商,分销商,经理等。

动态发现问题

  当你设计大型分布式框架的时候你会遇到这样一个问题:各个部分怎样知道其他部分的?特别是当各个部分任意来去的时候,我们把这个问题叫做“动态发现问题”。

  有几个不同的办法去解决动态发现问题。最简单的是用硬编码(或者配置)整个网络架构,发现的问题手动解决。也就是说,当你要新加一部分的时候,你需要重新配置网络来让其他部分发现它。

f12

  在实践中,这能使架构的扩充变得脆弱难以维护。比如说你有一个发布者和一百个订阅者。你把每个订阅者通过配置绑定到发布者的端点。这还比较容易。订阅者是动态的;发布者是固定的。现在你需要添加更多的发布者。情况忽然就变得不那么简单了。如果你需要把每个订阅者都连到每个发布者上边,这样来避免动态发现问题的代价就太大了。

f13

  确实有一些解决动态发现问题的办法,但相当简单的一种做法就是添加中间件;也就是说,在网络中加入一个固定节点用于连接其它所有节点。在传统的消息传输机制中,这是message中间人的工作。zmq没有像那样的message中间人,但能让我们很简单的构建中间件。

  你可能会奇怪,如果所有的网络最够都增长到足够大而需要中间件,为什么我们不简单地为所有程序在一个固定的地方设置一个中间件呢?新手对这一般很疑惑。不考虑性能,仅仅使用星形结构,系统就能正常工作。然而message中间件是种贪婪的东西;要行使它们作为中间件的工作,他们必须变得相当复杂,相当稳定,这到最后就成了个问题。

  最好把中间件当做无状态的消息交换装置。一个好的例子就是HTTP代理;它存在,但没有任何特殊的角色。在我们的理智中加入一个pub-sub代理去解决动态发现问题。我们把这个代理放到网络结构的”中间“。该代理打开一个XSUB socket和一个XPUB socket,并把它们都绑定到已知的IP地址和端口上。然后,所有其他的进程都连接到这个代理上,而不是进程彼此相连。这样增加更多的订阅者和发布者就不会那么麻烦了。

f14

  我们需要XPUB和XSUB socket是因为zmq从发布者那里给订阅者做订阅工作。XSUB和XPUB就像SUB和PUB,除了他们会把订阅信息解释成特殊的message.该proxy必须从XSUB socket读取message,解析后写入XPUB socket传给订阅端。这是XSUB和XPUB的主要用途。

共享队列(DEALER和ROUTER socket)
  在Hello World client/server应用中,我们只有一个client跟一个server交互。然而,在真实的场景里我们通常需要允许多server和多client。这让我们需要有扩充server的能力(多线程或多进程或多节点。)。唯一的限制是server必须是无状态的,所有的状态都在请求中或者存储在一个共享的地方(比如数据库)。

f15

  有两种方法把多client连接到多server。暴力解法是把每个client直接连到每个server端。一个client socket是可以连接到多个server的socket的,然后REQ socket就会在这些server之间分发请求了。比如说你把一个client scoket连到了三个server端:A, B, C。 client发出了4个请求R1,R2,R3,R4, R1和R4去到了A,R2去到了B,R3去到了C。

  该设计能让你很方便的增加更多的client。你也能增加更多的服务。每一个client都能把它的请求分发的这些服务器上。但每个client都必须知道server的拓扑结构。如果你有100个client,然后你要增加3个server,你就必须重新配置和重启100个client,好让这些client知道新来的3个server。

  在凌晨三点我们的超级计算机集群用完所有资源,然后我们绝望的发现需要增加几百个新的服务节点这种事显然不是我们希望得到的。太多的静态部分像水泥一样:你有越多的静态部分,要改变网络拓扑结构就越难。我们想要的是有个在client和server之间的东西,它知道所有的拓扑结构。最理想的是我们应该能随时添加移除server或client而不用去改变原有拓扑结构的其他部分。

  所以我们要写个简单的消息队列中介来提供这种灵活性。这个中介绑定两个端点,一个连接client的前端点和一个连接server的后端点。它会用zmq_poll()监控这来个那个socket是否活跃,然后当socket有数据到来,它就在这两个socket之间转发数据。它并不会真正管理任何队列——zmq会在每个socket上自动的做这些工作。

  当你用REQ去连接一个REP,你就会得到一个严格的同步请求——回复对话。client发送请求,server接收请求并发送回复。client再去读取回复。如果client或者server试着做些其他的事(比如说不等接收就发送两个请求),它们就会出错。

  但我们的中介必须是非阻塞的。显然,我们可以用zmq_poll()去监控两个socket,但我们用不成REP和REQ。

f16

  幸运的是,有两个叫DEALER和ROUTER的socket能让你实现非阻塞的请求——回复。你可以在第三章中看到DEALER和ROUTER socket是怎么让你构建起所有的异步请求——回复流模型的。但现在,我们只是先看看DEALER和ROUTER怎么让我们在中间件上扩展REQ-REP的,也就是我们的小型中介。

  在这个简单的请求——回复扩展模型中,REQ连接ROUTER, DEALER连接REP。在DEALER和ROUTER之间,我们必须有段代码(像我们的中介)从一个socket中拉取message送到另一个socket中。

  这个请求——回复中介会绑定两个端点,一个让client去连接(前端socket),另一个让worker去连接(后端socket)。为了检查这个中介,你希望修改你的worker好让他们能连接到后端socket。下面的client显示的就是我的意思:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// rrclient: Request-reply client in C
// Hello World client
// Connects REQ socket to tcp://localhost:5559
// Sends "Hello" to server, expects "World" back

#include "zhelpers.h"

int main(void){
void *context = zmq_ctx_new();

// Socket to talk to server
void *requester = zmq_socket(context, ZMQ_REQ);
zmq_connect(requester, "tcp://localhost:5559");

int request_nbr;
for(request_nbr = 0; request_nbr != 10; request_nbr++){
s_send(requester, "Hello");
char *string = s_recv(requester);
printf("Received reply %d [%s]\n", request_nbr, string);
free(string);
}
zmq_close(requester);
zmg_ctx_destroy(context);
return 0;
}

  worker在这里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// rrworker: Request-reply worker in C
// Hello World worker
// Connects REP socket to tcp://*:5560
// Expects "Hello" from client, replies with "World"

#include "zhelpers.h"

int main(void){
void *context = zmq_ctx_new();

// Socket to talk to clients
void *responder = zmq_socket(context, ZMQ_REP);
zmq_connect(responder, "tcp://localhost:5560");

while(1){
// Wait for next request form client
char *string = s_recv(responder);
printf("Received request: [%s]\n", string);
free(string);

// Do some "work"
sleep(1);

// Send reply back to client
s_send(responder, "World");
}
// We never get here but clean up anyhow
zmq_close(responder);
zmq_ctx_destroy(context);
return 0;
}

  下边的是中介,它恰当的操作了多帧message:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// rrbroker: Request-reply broket in C
// Simple request-reply broker

#include "zhelpers.h"

int main(void){
// Prepare our context and sockets
void *context = zmq_ctx_new();
void *frontend = zmq_socket(context, ZMQ_ROUTER);
void *backend = zmq_socket(context, ZMQ_DEALER);
zmq_bind(frontend, "tcp://*:5559");
zmq_bind(backend, "tcp://*:5560");

// Initialize poll set
zmq_pollitem_t items[] ={
{frontend, 0, ZMQ_POLLIN, 0},
{backend, 0, ZMQ_POLLIN, 0}
};
// Switch messages between sockets
while(1){
zmq_msg_t message;
zmq_poll(items, 2, -1);
if(items[0].revents & ZMQ_POLLIN){
while(1){
// Process all parts of the message
zmq_msg_init(&message);
zmq_msg_recv(&message, frontend, 0);
int more = zmq_msg_more(&message);
zmq_msg_send(&message, backend, more ? ZMQ_SNDMORE: 0);
zmq_msg_close(&message);
if(!more)
break; // Last message part
}
}
if(items[1].revents &ZMQ_POLLIN){
while(1){
// Process all parts of the message
zmq_msg_init(&message);
zmq_msg_recv(&message, backend, 0);
int more = zmg_msg_more(&message);
zmq_msg_send(&message, frontend, more ? ZMQ_SNDMORE: 0);
zmq_msg_close(&message);
if(!more)
break; // Last message part
}
}
}
// We never get here, but clean up anyhow
zmq_close(frontend);
zmq_close(backend);
zmq_ctx_destroy(context);
return 0;
}

f17

  使用一个请求——回复的中间件可以让你的C/S架构更容易扩展,因为client看不到worker,并且worker也看不到client。唯一的静态节点就是在中间的中介。

zmq的内建代理机制

  上一个部分rrbroker的核心循环看起来非常有效,并且可用。它让我们能轻松设计pub-sub的转发和共享队列或者其他小型中间件。zmq把它单独封装成了zmq_proxy():

1
zmq_proxy(frontend, backend, capture);

  这两个(如果我们想抓取数据,就要三个)必须正确地连接、绑定和配置。当我们调用zmq_proxy()方法时,它就像开始了rrbroker的主循环。让我们用zmq_proxy()重写这个请求-回复的中介,并重新定义这个为“消息队列”(人们花费很多代码但做到的很少):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// msgqueue: Message queue broker in C
// Simple message queuing broker
// Same as request-reply broker but using QUEUE device

#include "zhelpers.h"

int main(void){
void *context = zmq_ctx_new();

// Socket facing clients
void *frontend = zmq_socket(context, ZMQ_ROUTER);
int rc = zmq_bind(frontend, "tcp://*:5559");
assert(rc == 0);

// Socket facing services
void *backend = zmq_socket(context, ZMQ_DEALER);
rc = zmq_bind(backend, "tcp://*:5560");
assert(rc == 0);

// Start the proxy
zmq_proxy(frontend, backend, NULL);

// We never get here...
zmq_close(frontend);
zmq_close(backend);
zmq_ctx_destroy(context);
return 0;
}

  如果你像大多数的zmq用户那样,在这个阶段你就回开始想:“如果我随意的在这个proxy中配置不同种类的socket,会发生什么?”简短的回答是:试试看会发生什么。在实际中,你应该总是遵守着ROUTER/DEALER, XSUB/XPUB, PULL/PUSH这样的组合。

传输桥接

  zmq用户经常有的一个请求是:“我怎么把我的zmq网络跟X技术连接起来?”这里X是其他一些网络或者消息传输技术。

f18

  一个简单的答案就是搭一座桥。一座桥就是一个小的应用,它在一个socket上跟一种协议通信,把信息转换成适合另一个socket上的通信协议。你也可以把它叫成协议解释器。zmq中的一个普遍的桥接问题是桥接两个端口或者网络。

  举个简单的例子,我们要写一个在一个发布者和一组订阅者之间的代理(proxy),桥接两个网络。前端socket(SUB)面对的是天气预报服务器所在的内网,后端(PUB)面对的是订阅者所在的外部网络。它从前端的socket订阅天气预报服务,然后发布给后端的socket。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//wuproxy: Weather update proxy in C
// Weather proxy device

#include "zhelpers.h"

int main(void){
void *context = zmq_ctx_new();

// This is where the weather server sits
void *frontend = zmq_socket(context, ZMQ_XSUB);
zmq_connect(frontend, "tcp://192.168.55.210:5556");

// This is our public endpoint for subscribers
void *backend = zmq_socket(context, ZMQ_XPUB);
zmq_bind(backend, "tcp://10.1.1.0:8100");

// Run the proxy until the user interrupts us
zmq_proxy(frontend, backend, NULL);

zmq_close(frontend);
zmq_close(backend);
zmq_ctx_destroy(context);
return 0;
}

  看起来跟早先的代理的例子很像,但关键部分是前端和苟端socket分别连接两个不同的网络。我们可以用这种模型去连接一个多播网络(pgm传输协议)和一个tcp的发布者。

处理错误和ETERM

  zmq的错误处理原则是快速失效和可靠性的结合。我们坚信,进程应该对内部错误尽可能的脆弱,对外部攻击或者错误尽可能健壮。比如说,一个活细胞如果检测到内部的一丁点错误就会自我死亡,但它会用尽办法对抗外部攻击。

  散布在zmq代码中的断言对代码的健壮至关重要,它们必须处在像细胞壁那样正确的位置上,那儿就应该有这样一堵墙。如果它不清楚这个错误是内部的还是外部的,那就是个需要修复的设计错误。在C/C++中,断言会立即中断程序,并给出一个错误。在别的语言中,你可能会得到例外或者终止。

  当zmq监测到一个外部错误,它会返回一个错误。在一些特殊的情形中,如果没有明显的从错误中恢复的策略的话,它会静默的丢掉message。

  我们目前看到的大多数C的例子里都没有错误处理逻辑。真实的代码应该在每个zmq调用上进行错误处理。如果你用了一种不是C的绑定语言,这种绑定会为你做很多错误处理。在C中,你需要自己去做这些。这里有一些关于错误处理的简单的原则,从POSIX规范开始:

  • 创建对象的方法如果出错会返回NULL
  • 处理数据的方法会返回处理过的字节数,或者在失败时返回-1
  • 其他方法会在成功时返回0,在失败时返回-1
  • 错误代码在error中或者zmq_errno()
  • 为日志输出设计的描述性文字由zmq_strerror()提供

  例如:

1
2
3
4
5
6
7
8
9
void *context = zmq_ctx_new();
assert(context);
void *socket = zmq_socket(context, ZMQ_REP);
assert(socket);
int rc = zmq_bind(socket, "tcp://*:5555");
if(rc == -1){
printf("E: bind failed: %s\n", strerror(errno));
return -1;
}

  你需要以非致命性错误处理的两种主要的例外情形是:

  • 当你用ZMQ_DONTWAIT选项接收一条message的时候,但那时并没有数据,zmq会返回-1并把errno设置为EAGIN
  • 当一个线程调用zmq_ctx_destroy()的时候,其他线程还在做阻塞性的工作,zmq_ctx_destroy()会通知上下文关闭并且让所有的阻塞调用都退出并返回-1,并把errno设置为ETERM

  在C/C++中,断言在测试过的代码中可以全部移除,所以不要把完整的zmq调用放到assert()中。那看起来很整洁,但优化器会把断言和你想要调用的函数一并移除,你的程序就会以一种莫名其妙的方式崩溃掉。

f19

  让我们来看看怎么干净的终止一个进程。我们拿上一部分的并行管道做例子。如果我们已经在后台开启了所有的worker,现在想在作业结束的时候全部kill掉它们。让我们给worker都发送一个kill的信息吧。做这件事最好的地方是在sink,因为它知道什么时候作业会完成。

  我们怎么把sink接到worker呢? PUSH/PULL socket是唯一可用的方式了。我们可以转换成另一种socket类型,或者我们可以混合多种不同的socket流。我们稍后试试这样的:用一个pub-sub模型去发送kill信息给worker:

  • sink在一个新端口上创建一个PUB socket
  • worker把他们的输入socket绑定到sink的那个新端口上
  • 当sink检测到作业已经结束了,它会往PUB socket上发送一个kill指令
  • 当一个worker检测到这个kill指令,它会自己退出。

  在sink中并不用花费很多代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
void *controller = zmq_socket(context, ZMQ_PUB);
zmq_bind(controller, "tcp://*:5559");
...
// Send kill signal to workers
s_send(controller, "KILL");

  下边是worker进程,它会操作两个socket(一个PULL socket用来获取任务,一个SUB socket用来获取控制指令),用到了我们前边看到的`zmq_poll()`:

// taskwork2: Parrallel task worker with kill signalling in C
// Task worker - design 2
// Adds pub-sub flow to receive and respond to kill signal

#include "zhelpers.h"

int main(void){
// Socket to receive messages on
void *context = zmq_ctx_new();
void *receiver = zmq_socket(context, ZMQ_PULL);
zmq_connect(receiver, "tcp://localhost:5557");

// Socket to send messages to
void *sender = zmq_socket(context, ZMQ_PUSH);
zmq_connect(sender, "tcp://localhost:5558");

// Socket for control input
void *controlller = zmq_socket(context, ZMQ_SUB);
zmq_connect(controlller, "tcp://localhost:5559");
zmq_setscokopt(controlller, ZMQ_SUBSCRIBE, "", 0);

// Process messages from either socket
while(1){
zmq_pollitem_t items[] = {
{receiver, 0, ZMQ_POLLIN, 0},
{controlller, 0, ZMQ_POLLIN, 0}
};
zmq_poll(items, 2, -1);
if(items[0].revents & ZMQ_POLLIN){
char *string = s_recv(receiver);
printf("%s.", string); // Show progress
fflush(stdout);
s_sleep(atoi(string)); // Do the work
free(string);
s_send(sender, ""); // Send results to sink
}
// Any waiting controller command acts as 'KILL'
if(items[1].revents & ZMQ_POLLIN)
break; // Exit loop
}
zmq_close(receiver);
zmq_close(sender);
zmq_close(controller);
zmq_ctx_destroy(context);
return 0;
}

  下边是一个修改过的sink程序。如果它不再收集结果,就会给所有的worker都发送一条kill指令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// tasksink2: Parallel task sink with kill signaling in C
// Task sink - design 2
// Adds pub-sub flow to send kill signal to workers

#include "zhelpers.h"

int main(void){
// Socket to receive messages on
void *context = zmq_ctx_new();
void *receiver = zmq_socket(context, ZMQ_PULL);
zmq_bind(receiver, "tcp:// *5558");

// Socket for worker control
void *controller = zmq_socket(context, ZMQ_PUB);
zmq_bind(controller, "tcp://*:5559");

// Wait for start of batch
char *string = s_recv(receiver);
free(string);

// Start our clock now
int64_t start_time = s_clock();

// Process 100 confirmations
int task_nbr;
for(task_nbr = 0; task_nbr < 100; task_nbr++){
char *string = s_recv(receiver);
free(string);
if((task_nbr / 10) * 10 == task_nbr)
printf(":");
else
printf(".");
fflush(stdout);
}
printf("Total elapsed time: %d msec\n", (int)(s_clock() - start_time));

// Send kill signal to workers
s_send(controller, "KILL");

zmq_close(receiver);
zmq_close(controller);
zmq_ctx_destroy(context);
return 0;
}

处理中断信号

  真正的应用需要在使用Ctrl-C或者产生像SIGTERM这样的信号的时候干净的退出。这些信号默认是杀死进程,意味着message不会被冲刷过去,或者文件没有被干净的关闭等。

  下面是我们通常怎么处理信号的:

1
2
3
4
5
6
7
8
9
10
11
//interrupt: Handling Ctrl-C cleanly in C

while(true){
zstr_send(client, "Hello");
char *reply = zstr_recv(client);
if(!reply)
break; // Interrupted
printf("Client: %s\n", reply);
free(reply);
sleep(1);
}

  程序提供s_catch_signals(),它会捕获Ctrl-C(SIGINT)和SIGTERM。如果任何一个信号到达,s_catch_signals()句柄会设置全局变量s_interrupted。多亏你的信号处理句柄,你的程序才不会自己挂掉。相反,你有机会做一些清理工作,然后优雅地退出。现在你就必须明确检查一个中断并且恰当的处理它。在你的主代码开始之前就调用s_catch_signals()(从interrupt.c复制过来的)。这会建立起信号处理机制。中断会对zmq调用有以下影响:

  • 如果你的代码正被一个阻塞调用阻塞着(发送一条message,接受一条message,或者正在轮询),然后当一个信号到达时,该系统调用会返回EINTR。
    - 像s_recv()那样封装好的调用,在它们被中断的时候返回NULL。

  所以检查一个EINTR的返回码,一个NULL的返回值,并且/或者s_interupted

  下面是一个典型的代码片段:

1
2
3
4
5
6
7
8
s_catch_signals();
client = zmq_socket(...);
while(!s_interupted){
char *message = s_recv(client);
if(!message)
break; // Ctrl-C used
}
zmq_close(client);

  如果你调用了s_catch_signals()并且没检查中断,你的程序会对Ctrl-C和SIGTERM信号免疫,这有时候会有用,但大多数时候是没用的。

检查内存泄露

  任何需要长时间运行的程序都需要正确管理内存,否则它会用掉所有可用内存,然后挂掉。如果你正使用一种能自动管理内存的语言,恭喜你。如果你用了C或者C++或者其他需要你自己对内存负责的语言,下面是一个使用valgrind的简短教程,它会报告你程序里边任何的泄露问题。

  • 在Ubuntu或者Debian里安装valgrind,使用下面的命令就行:

    1
    sudo apt-get install valgrind
  • 默认的zmq本身会让valgrind报告很多错误 ,要移除这些警告,需要创建一个叫vg.supp的文件,里边包含以下内容:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    {
    <socketcall_sendto>
    Memcheck:Param
    socketcall.sendto(msg)
    fun:send
    ...
    }
    {
    <socketcall_sendto>
    Memcheck:Param
    socketcall.send(msg)
    fun:send
    ...
    }
  • 修改你的程序,好让它在Ctrl-C的时候干净地退出。对任何会自己退出的程序来说,并不需要这样处理,但对那些长时间运行的应用来说,这是必须的,否则valgrind会警告所有当前已经申请的内存。

  • 如果你默认的设置里没有使用-DDEBUG,你需要设置一下。那会保证valgrind能准备指出哪里的内存泄露了。

  • 最后,像这样运行valgrind:

      valgrind --tool=memchech --leak-check=full --suppressiongs=vg.supp someprog
    

  在修复完它报告的所有错误之后,你会得到以下让人高兴的信息:

1
==30536== ERROR SUMMARY: 0 errors from 0 contexts...

zmq和多线程

  zmq可能是写多线程(MT)应用最好的办法了。如果你熟悉传统的socket编程方式,那zmq的会需要你做些适应,zmq会把你所知道的写MT应用的东西都给扔到垃圾堆上,倒上汽油,一把火点了。只有很少的书渴望着被烧掉,但关于并发编程的书确实这样希望的。(译注:并发编程相当麻烦,因此最好没有并发编程这种模式,也就是说最好没有介绍并发编程的书。。。)

  为了写出极端完美的MT程序(我着重强调),我们不需要互斥锁,加锁或者任何其他形式的线程间通信,我们只需要zmq的socket发送的message就行了。

  “极端完美的MT程序”指的是容易写容易理解的代码,那种对不同语言不同操作系统都有相同设计模式的,能在任意数量CPU上扩展的,完全没有等待状态和衰弱返回点的程序。

  如果你花了很长时间去学习那些能让你的MT代码正常工作的技巧,使用锁啊信号量啊关键区域啊,你会相当沮丧的发现这完全没用。如果说我们从三十多年的并发编程学到了什么经验的话,那就是:不要共享状态。那就像两个醉汉试着分享一瓶啤酒。不管他们是否是好朋友,迟早他们会干起架来的。你在酒桌上加越多的醉汉,他们越会为了一瓶啤酒干架。典型的MT应用就像醉汉们打架一样。

  The list of weird problems that you need to fight as you write classic shared-state MT code would be hilarious if it didn’t translate directly into stress and risk, as code that seems to work suddenly fails under pressure.(sorry。。。不会翻译了)。一个有着世界级影响力的大公司通过公开的大量代码列出了“11个在你的多线程代码中会出现的问题”,包括忘记同步,不恰当的粒度划分,读写冲突,无锁重排序,锁护送效应,two-step dance和优先级反转。

  好吧,我们只说了7个问题,不是11个。但那不是重点。重点是,你真的想让管理电力网或者股票市场的程序在一个忙碌的周四下午3点钟的时候遇到两步锁护送效应?谁会在乎这个术语到底是什么意思?这并不能让我们开始去编程,用更多复杂的技巧去跟更复杂的副作用做斗争。

  一些被广泛使用的模型——尽管是整个工业的基础——从根本上就是坏掉的,共享状态并行模型就是其中之一。那些想没有限制地扩展的代码应该像互联网做的那样,只是发送信息,而不共享任何状态——除了共享对那些坏掉的编程模型的轻视。

  使用zmq你需要遵守一些规则才能写出愉快的多线程代码:

  • 孤立数据应该在它自己的线程内私有,决不应该在多线程共享。这唯一的例外是zmq的context,它是线程安全的。
  • 跟传统的并发机制保持距离,像互斥锁啊,敏感区域啊,信号量啊等等。这些在zmq应用里是敌对部分。
  • 在你的进程开始部分创建一个zmq context,把它传进每个你想用inproc socket连接的线程。
  • 在你的应用里创建attached线程,用inprocPAIR socket连接这些线程和它们的父线程。该模型是:在父线程中绑定socket,然后创建子线程去连接。
  • detached线程去模拟独立任务,用它们自己的context。用tcp连接这些。然后你就能不用改什么代码就能把它们转移到独立的进程中。
  • 全部的线程间交互都可以用zmq的message实现,message你可以定义的多多少少正式些。
  • 不要在线程间共享zmq的socket.zmq的socket不是线程安全的。技术上来说是有可能从把一个socket从一个线程转移到另一个线程去,但这需要技巧。唯一一个在线程间共享socket相对安全的地方是那些会在socket上做垃圾收集的绑定语言中。

  如果你想在应用中启动不止一个代理,比如,你可能希望在每个线程中都跑自己的代理。这时候如果你在一个线程中创建了该代理的前端和后端socket,然后把它们传到位于另一个线程中的代理,就很容易出错。这在一开始可能会工作,但在实际使用中会随机挂掉。记住:只能在创建socket的线程中才能使用和关闭它们。

  如果你遵守了这些规则,你能很容易的构建优雅的多线程程序,然后根据需要把这些多线程分割到多进程中。应用层逻辑能存在于线程、进程或者节点中:根据你的扩展的需要。

  zmq使用操作系统原始的线程,而不是虚拟化的”绿色”线程。好处是你不需要学习任何新的线程API,zmq的线程会跟你的操作系统干净的契合。你可以用标准化工具,比如Intel的ThreadChecker去看看你的应用到底在做什么。缺点是原始的线程API不总是可移植的,如果你有大量的线程数(几千个),一些操作系统就会相当有压力。

  让我们看看在实际中这是怎样工作的。我们会把我们旧的Hello World服务器转换成某种更厉害的东西。该服务器最初运行在一个单一线程中。如果每个请求的任务比较少,这是可以的:一个zmq线程能在单核CPU上全速运行,没有等待,做着很多糟糕的工作。但实际中的服务器必须在每个请求中做最重要的事。在1000个client同时访问服务器的时候,单一核心就可能不够用了。所以现实中的服务器都会运行多个工作线程,然后它会尽可能快的接收请求,再把这些请求分发给工作线程。这些工作线程阵列会处理完任务,最后返回结果。

  当然,你也能用一个代理中介和额外的工作进程干这些工作,但在一个进程中一次启动16个核心业务线程总比启动16个进程来的容易。还有,启动工作线程会取消网络跳跃,延迟和阻塞。

  MT版本的Hello World服务主要是把broker和worker整合到一个进程中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//mtserver: Multithreaded service in C
// Multithreaded Hello World server

#include "zhelpers.h"
#include <pthread.h>

static void *worker_routine(void *context){
// Socket to talk to dispatcher
void *receiver = zmq_socket(context, ZMQ_REP);
zmq_connect(receiver, "inproc://workkers");

while(1){
char *string = s_recv(receiver);
printf("Received request: [%s]\n", string);
free(string);
// Do some 'work'
sleep(1);
// Send reply back to client
s_send(receiver, "World");
}
zmq_close(receiver);
return NULL;
}

int main(void){
void *context = zmq_ctx_new();

// Socket to talk to clients
void *clients = zmq_socket(context, ZMQ_ROUTER);
zmq_bind(clients, "tcp://*:5555");

// Socket to talk to workers
void *workers = zmq_socket(context, ZMQ_DEALER);
zmq_bind(workers, "inproc://workers");

// Launch pool of worker threads
int thread_nbr;
for(thread_nbr = 0; thread_nbr < 5; thread_nbr++){
pthread_t worker;
pthread_create(&worker, NULL, worker_routine, context);
}
// Connect work threads to client threads via a queue proxy
zmq_proxy(clients, workers, NULL);

// We never get here, but clean up anyhow
zmq_close(clients);
zmq_close(workers);
zmq_ctx_destroy(context);
return 0;
}

f20

  所有的代码现在对你来说都应该是熟悉的。它是这样工作的:

  • 服务器启动一组工作线程。每个工作线程创建一个REP socket然后进程向这个socket发起请求。工作线程就像单线程的服务器,唯一的区别是传输协议(是inproc而不是tcp)和绑定-连接方向。
  • 服务器创建一个ROUTER socket去跟client通信,然后绑定在它的外部接口上(通过tcp)。
  • 服务器创建一个DEALER socket去跟worker通信,把它绑定在内部接口上(通过inproc)。
  • 服务器启动一个代理连接这两个socket。代理从所有的client公平的接收请求,然后分发到worker,它也会把结果返回给相应的client。

  注意创建线程的语句在大多数编程语言中都不可移植。POSIX库的是pthreads,但在Windows上你必须用另一套不同的API。在我们的例子中,pthread_create能启动一个运行worker_routine函数的新线程。我们会在第三章看到如何把它封装成一个可移植的API。

线程间信号(PAIR socket)

  当你开始用zmq编写多线程应用的时候,你会遇到怎么同步各个线程的问题。尽管你会尝试插入”sleep”语句,或者用那些像信号量或者互斥锁的多线程技术,但你唯一应该用的就是zmq的message机制。一定要记住醉汉和啤酒的故事。

  让我们启动三个线程,在它们准备好的时候就发送信号。在这个例子中,我们使用基于inproc协议的PAIR socket。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//mtrelay: Multithreaded relay in C
// Multithreaded relay]

#include "zhelpers.h"
#include <pthread.h>

static void *
step1 (void *context) {
// Connect to step2 and tell it we're ready
void *xmitter = zmq_socket (context, ZMQ_PAIR);
zmq_connect (xmitter, "inproc://step2");
printf ("Step 1 ready, signaling step 2\n");
s_send (xmitter, "READY");
zmq_close (xmitter);

return NULL;
}

static void *
step2 (void *context) {
// Bind inproc socket before starting step1
void *receiver = zmq_socket (context, ZMQ_PAIR);
zmq_bind (receiver, "inproc://step2");
pthread_t thread;
pthread_create (&thread, NULL, step1, context);

// Wait for signal and pass it on
char *string = s_recv (receiver);
free (string);
zmq_close (receiver);

// Connect to step3 and tell it we're ready
void *xmitter = zmq_socket (context, ZMQ_PAIR);
zmq_connect (xmitter, "inproc://step3");
printf ("Step 2 ready, signaling step 3\n");
s_send (xmitter, "READY");
zmq_close (xmitter);

return NULL;
}

int main (void)
{
void *context = zmq_ctx_new ();

// Bind inproc socket before starting step2
void *receiver = zmq_socket (context, ZMQ_PAIR);
zmq_bind (receiver, "inproc://step3");
pthread_t thread;
pthread_create (&thread, NULL, step2, context);

// Wait for signal
char *string = s_recv (receiver);
free (string);
zmq_close (receiver);

printf ("Test successful!\n");
zmq_ctx_destroy (context);
return 0;
}

f21

  这是用zmq写多线程程序的一个经典模型:

  1. 两个线程通过inproc通信,使用共享的context。
  2. 父线程创建一个socket,把它绑定到一个inproc://端点,这之后才启动子线程,把context传递过去。
  3. 子线程创建第二个socket,连接到那个inproc://端点,这之后才去给父线程发送信号说自己准备好了。

  注意基于这种模型的多线程代码不能扩展成多个进程。如果你使用了inproc和socket对,你正在构建的是一个紧耦合的应用,例如你的线程结构上是相互依赖的。最好只在延迟是致命的时候才这样做。另一种设计模型是松耦合应用,在那种模型中线程有自己的context,通信是通过ipctcp。你可以很容易把松耦合的线程转移到单独的进程中。

  这是我们第一次用PAIR socket做例子。为什么用PAIR呢?其他的socket组合看起来也能工作,但是它们用信号的时候都会有副作用:

  • 你可以用PUSH发送,PULL接收。这看起来很简单也能工作,但记住PUSH会把message发送给所有可能的接收者。如果你不小心启动了两个接收者(比如已经有一个接收者在跑着了,你又启动了一个),你可能会“丢失”一半的信号。PAIR的优势之一就是最多只能有一个连接,这个socket对是排他的。
  • 你可以用DEALER发送,用ROUTER接收。但是ROUTER会把你的message封装到一个”信封”中,这意味着零字节长度的信号会变成一个多帧message。如果你不关心数据,把所有接收到的东西都当成一个有效的信号,并且不会从这个socket上读取多次,那就没问题。但是,如果你想要发送真正的数据,你会忽然发现ROUTER给你的是“错误的”message。DEALER也会分发message,带来跟PUSH一样的风险。
  • 你可以用PUB发送,用SUB接收。这会正确递送你的message并且PUB不会像PUSH或DEALER那样分发数据。但是,你需要用一个空的订阅信息配置SUB,这会比较麻烦。

  综合以上理由,PAIR是线程对间同步的最佳选择。

节点同步

  当你想在网络中同步一组节点的时候,PAIR socket就不会那么有帮助了额。这是少数几个线程和节点策略的不同之一。原则上讲,节点会随时接入和退出,但线程通常是静态的。PAIR scoket不会在远端节点退出和再次接入的时候自动重连。

f22

  线程和节点之间第二个明显的不同是你通常会有固定数目的线程,但节点的数目通常是可变的。我们拿先前的一个例子(天气预报server和client),用节点同步去保证订阅者不会在启动的时候丢失数据。

  下面是该应用如何工作的:

  • 发布者提前知道会有多少个订阅者,这是从某个地方得到的魔法数据(注:凭空知道的,可能提前配置好的静态数据,不是程序通过自动计算等方式得出的)。
  • 发布者启动并等待所有的订阅者接入。这是节点同步的部分。每个订阅者建立订阅然后通过另一个socket告诉发布者它已经准备好接收数据了。
  • 当发布者知道所有的订阅者都已经准备好了,它就开始发布数据。

  在这个例子中,我们会使用REQ-REP socket流去同步订阅者和发布者,下面是发布者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// syncpub: Synchronized publisher in C
// Synchronized publisher
#include "zhelpers.h"
#define SUBSCRIBERS_EXPECTED 10 //// We wait for 10 subscribers //

int main (void){
void *context = zmq_ctx_new ();

// Socket to talk to clients
void *publisher = zmq_socket (context, ZMQ_PUB);

int sndhwm = 1100000;
zmq_setsockopt (publisher, ZMQ_SNDHWM, &sndhwm, sizeof (int));

zmq_bind (publisher, "tcp://*:5561");

// Socket to receive signals
void *syncservice = zmq_socket (context, ZMQ_REP);
zmq_bind (syncservice, "tcp://*:5562");

// Get synchronization from subscribers
printf ("Waiting for subscribers\n");
int subscribers = 0;
while (subscribers < SUBSCRIBERS_EXPECTED) {
// - wait for synchronization request
char *string = s_recv (syncservice);
free (string);
// - send synchronization reply
s_send (syncservice, "");
subscribers++;
}
// Now broadcast exactly 1M updates followed by END
printf ("Broadcasting messages\n");
int update_nbr;
for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
s_send (publisher, "Rhubarb");

s_send (publisher, "END");

zmq_close (publisher);
zmq_close (syncservice);
zmq_ctx_destroy (context);
return 0;
}

  下面是订阅者的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// syncsub: Synchronized subscriber in C
// Synchronized subscriber

#include "zhelpers.h"
int main(void){
void *context = zmq_ctx_new ();

// First, connect our subscriber socket
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5561");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);

// 0MQ is so fast, we need to wait a while…
sleep (1);

// Second, synchronize with publisher
void *syncclient = zmq_socket (context, ZMQ_REQ);
zmq_connect (syncclient, "tcp://localhost:5562");

// - send a synchronization request
s_send (syncclient, "");

// - wait for synchronization reply
char *string = s_recv (syncclient);
free (string);

// Third, get our updates and report how many we got
int update_nbr = 0;
while (1) {
char *string = s_recv (subscriber);
if (strcmp (string, "END") == 0) {
free (string);
break;
}
free (string);
update_nbr++;
}
printf ("Received %d updates\n", update_nbr);

zmq_close (subscriber);
zmq_close (syncclient);
zmq_ctx_destroy (context);
return 0;
}

  下面这个Bash shell脚本会启动10个订阅者,然后启动发布者:

1
2
3
4
5
6
echo "Starting subscribers..."
for ((a=0; a<10; a++)); do
syncsub &
done
echo "Starting publisher..."
syncpub

  它会给我们以下让人高兴的输出:

1
2
3
4
5
6
7
Starting subscribers...
Starting publisher...
Received 1000000 updates
Received 1000000 updates
...
Received 1000000 updates
Received 1000000 updates

  我们不能假定SUB连接会在REQ/REP会话建立完成时建立好。如果你使用除inproc之外的任何协议,都不能保证外部连接以某种顺序完成。所以,例子中暴力的在订阅动作之前强制的睡眠了1秒,然后才去发送REQ/REP同步信号。

  一个更健壮的模型应该是:

  • 发布者打开PUB socket,然后发送”Hello”message(不是数据)。
  • 订阅者连接SUB socket,等它们接收到一个Hello message的时候通过REQ/REP socket对告诉发布者它们准备好了。
  • 当发布者收到了所有需要的确认之后,它就开始发送真正的数据。

零拷贝

  zmq的message API能让你不用拷贝数据就直接从应用缓冲区发送和接收message。我们称这为零拷贝,其能在一些应用中提高性能。

  你应该在高频率发送大数据块(成千bytes)这种典型情况的时候考虑使用零拷贝。对较小的数据块或者较低的发送频率来说,使用零拷贝会让你的代码一团糟,弊远大于利。像所有的优化手段那样,只在你知道它有帮助的时候才使用,并且优化前后做好权衡

  要使用零拷贝,你需要用zmq_msg_init_data()去创建一条指向已经用malloc()或其他内存管理器申请过的数据块的message,然后把它传给zmq_msg_send()。当你创建message的时候,你也要传进去一个函数,zmq会在发送完调用这个函数去释放掉数据块的内存。下面是个最简单的例子,假设buffer是申请在堆上的1000字节的内存块:

1
2
3
4
5
6
7
void my_free (void *data, void *hint) {
free (data);
}
// Send message from buffer, which we allocate and 0MQ will free for us
zmq_msg_t message;
zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_msg_send (&message, socket, 0); 

  注意你不需要在发送完message之后调用zmq_msg_close()了——libzmq会在它发送完数据之后自动做这个工作。

  没办法在接收的时候使用零拷贝:zmq会给你个缓冲区,你想用多久就用多久,但它不会把数据直接写到应用缓冲区中。

  在写程序时候,zmq的多帧message跟零拷贝能完美的协同工作。在传统的消息队列中,你需要把几个不同的缓冲区收集起来放到一个你能发送的缓冲区中。这意味着需要复制数据,使用zmq,你就可以把不同来源的缓冲区当成单独的message帧一起发送出去。把每个缓冲区当成由长度分隔的帧。对应用层来说,它就像一系列的发送和接收调用。在zmq内部,这些不同的部分是用单独的系统调用去发送到网络中和从网络中读取的。所以它非常高效。

Pub-Sub message信封

  在pub-sub模型中,我们把键分割成的单独的message帧称作信封。如果你想用pub-sub信封,需要自己去实现。这是可选的,在前一个pub-sub例子中我们并没有这么做。对简单的情况来说使用pub-sub信封有点儿麻烦,但在实际的情形中这就会比较清晰,因为那里键和数据是自然独立的东西。

f23

  回忆下订阅时候做的前缀匹配。这就是说,它们会查看”所有以XYZ开头的message”。这里一个明显的问题是:这么把键跟数据分割开来,不让前缀匹配匹配到真实数据。最好的答案是用一个信封,因为这种匹配不会超过一个帧的范围。下面这个简单的例子展示了pub-sub信封在代码中看起来是怎样的。这个发布者发送两种类型的message:A和B.

  信封持有message的类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// psenvpub: Pub-Sub envelope publisher in C
// Pubsub envelope publisher
// Note that the zhelpers.h file also provides s_sendmore

#include "zhelpers.h"

int main(void){
// Prepare our context and publisher
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5563");

while (1) {
// Write two messages, each with an envelope and content
s_sendmore (publisher, "A");
s_send (publisher, "We don't want to see this");
s_sendmore (publisher, "B");
s_send (publisher, "We would like to see this");
sleep (1);
}
// We never get here, but clean up anyhow
zmq_close (publisher);
zmq_ctx_destroy (context);
return 0;
}

  订阅者只想接收B类型的message:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// psenvsub: Pub-Sub envelope subscriber in C

// Pubsub envelope subscriber

#include "zhelpers.h"

int main(void){
// Prepare our context and subscriber
void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5563");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "B", 1);

while (1) {
// Read envelope with address
char *address = s_recv (subscriber);
// Read message contents
char *contents = s_recv (subscriber);
printf ("[%s] %s\n", address, contents);
free (address);
free (contents);
}
// We never get here, but clean up anyhow
zmq_close (subscriber);
zmq_ctx_destroy (context);
return 0;
}

  当你运行这两个程序,订阅者会显示以下的东西:

1
2
3
4
[B] We would like to see this
[B] We would like to see this
[B] We would like to see this
...

  这个例子显示了订阅者过滤器拒绝或者接受真个的多帧message(键值加上数据)。你永远不会值得到一个多帧message的某个部分。如果你订阅了多个发布者,你想知道它们的地址以便通过另一个socket给它们发送数据(这就是个典型的应用场景),你需要创建一个三帧的message。

f24

高水位标记

  当你能在进程间快速发送message的时候,你很快就能发现内存是个稀缺的资源,会一点点被用完。进程某处的几秒延迟能让一个server由于积压数据而崩溃,除非你能理解问题出在哪儿并提前做出预防。

  这里的问题是:想象你用进程A给B高频率的发送message,B会处理接收的message。一瞬间B变得很忙很忙(垃圾收集,CPU过载,不管出于什么原因),不能在很短的时间及时处理message。它可能是几秒钟的大垃圾收集工作,或者遇到更严重的问题时的更长时间的任务,这时A进程继续发送给B的message会发生什么事呢?一部分会存放在B的网络缓冲区汇总,一部分会在传输网络中,一些会在A的网络缓冲区中,剩下的会在A的内存中积累,在A发送快速的积累起来。如果你不做些预防工作,A很快会用光内存然后挂掉。

  这是消息中间件一个始终存在的、经典的问题。表面上看B出错会让事情变得更严重,B是一个典型的为用户写的应用,A是不能控制的。

  答案是什么呢?一个解决方法是把问题抛到上游。A正从别的不知名的地方获取message,然后上游逻辑告诉它,“停!”。这叫做流控制。这听起来是可行的,但如果你正在发送一条Twitter呢?你要告诉全世界在B完成它的任务的时候都等等吗?

  流控制在一些情形能工作,但在另一些情况下并不能。传输层不能告诉应用层去“停止”,就像地铁系统并不能告诉一大单生意:“让你的人员都再工作半小时,我太忙了”。另一个答案是设置缓冲区的大小限制,然后当我们达到这些限制的时候,就采取一些而别的措施。在一些情形中(不是在一个地铁系统中),答案是放弃一些message。在别的情况下,最好的策略是等待一下。
  
  zmq用HWM(high-wather mark高水位)的概念去定义内部管道的容量。每个进入socket或者出去socket的连接都有它自己的管道和发送和/或接收的HWM,根据socket类型的不同有不同的情况。一些socket(PUB,PUSH)只有发送高缓冲区,一些(SUB,PULL,REQ,REP)只有接收缓冲区,另一些(DEALER,ROUTER,PAIR)两种都有。

  在zmq v2.x版本中,HWM被默认为无限大小。这很容易,但对一些大数据量的发布者来说这是个致命的错误。在zmq v3.x版本中,默认被设置为1,000, 这就相当灵敏了。如果你还在用zmq v2.x版本,你最好总是设置好socket的HWM为1000来跟zmq v3.x版本相适应,或者通盘考虑message大小和期望的订阅者的性能以后确定的其他数值。

  当socket到达高水位后,它会根据socket的类型来决定是阻塞住还是丢掉数据。PUB和ROUTER socket在到达高水位后会丢掉数据,其他的socket类型会阻塞住。在inproc传输协议中,发送者和接收者共享同一个缓冲区,因此实际的高水位是两端设置高水位的和。

  最后,高水位并不准确;默认你能在缓冲区中保存最多1000条message,但实际上真正的缓冲区大小会小的多(差不多一半大小),这是libzmq的队列实现方式的问题。

丢失message问题的解决办法

  当你用zmq编写应用的时候,你会不止一次遇到这个问题:没有收到你希望收到的message。我们把一些常见的原因都归结到下面这个图标中了:

f25

  下面是关于图表的一些说明:

  • 在SUB socket上,用zmq_setsocketopt()ZMQ_SUBSCRIBE设置订阅,否则你不会收到message。因为你是根据前缀订阅message的,如果你订阅了””(一个空的订阅设置),你会得到所有类型的message。
  • 如果你在PUB socket已经启动并发送数据之后才启动SUB socket(比如说建立一个跟PUB socket的连接),你会丢掉在连接建立之前的所有数据。如果这是个问题的话,你需要设计你的架构,让SUB socket先启动,然后才让PUB socket再开始发布数据。
  • 及时你对SUB和PUB socket做了同步机制,也有可能丢失数据。这是因为内部的队列在连接真正建立起来并没有被创建。如果你能换一下绑定/连接的方向,让SUB socket绑定,让PUB连接,你会发现这更符合你期望的。
  • 如果用了REP和REQ socket,并且没严格遵守发送/接收/发送/接收的同步顺序,zmq会报告错误,你可以忽略该错误。然后,那看起来就像是你丢失了message。如果你要使用REQ或者REP,需要严格遵守了发送/接收的顺序,并且在实际代码中总是检测zmq调用的错误。
  • 如果你在使用PUSH socket,你会发现第一个连接上的PULL socket会抓取一个没有公平分配的message份额。message精确的分配只在所有PULL socket成功建立连接后才会发生,这需要几毫秒的时间。作为一种PUSH/PULL的替代,对低数据量的情况,考虑使用ROUTER/DEALER和负载均衡模式
  • 如果你在线程间共享了socket,必要这么做。这样会造成随机的奇怪问题和宕机。
  • 如果你在用inproc,确保两端的socket都在同一个上下文context中。否则连接会不成功。并且,需要先绑定,后连接。inproc不像tcp那样是非连接协议。
  • 如果你在用ROUTER socket,发送不正确的认证id帧(或者忘记发送一个认证id帧)会很容易造成收不到message。通常在ROUTER socket上设置ZMQ_ROUTER_MANDATORY选项是个好主意,但也需要每次检查调用函数的返回码。
  • 最后,如果你真的找不到 哪里出错了,做一个最小的测试例子重现问题,然后去zmq社区找找帮助。