0%

zmq中文指南_3

高级request-reply模型

  在第二章我们通过开发一系列小程序来全部了解了使用zmq的基础,每次都去发现zmq的一个新的方面。我们在这章会继续这种探索,不过这次探索的是基于zmq核心的请求-回复模式的高级模式。

  我们会讨论到:

  • 请求-回复机制是怎么工作的
  • 怎么组合使用REQ, REP, DEALER, ROUTER socket
  • 详细的查看ROUTER socket是如何工作的
  • 负载均衡模型
  • 构建一个简单的负载均衡消息中介
  • 为zmq设计一个高级API
  • 构建一个异步的请求-回复服务器
  • 一个详细的内中介路由例子

请求-回复机制

  我们已经简单看过多帧message了,现在我们看看一个主要的应用场景,就是回复message信封。信封是把数据用地址包装起来又不去修改数据本身的一种安全手段。通过把地址装进信封中,我们可以实现一些通用功能的中介设施,比如能创建、读取、移除地址而不管message信息的本身是什么或者结构是什么的API或者代理。

  在请求-回复模型中,信封持有回复信息的地址。这就是无状态的zmq网络如何去创建往返请求-回复对话的。

  当你用REQ和REP socket的时候你甚至看不到信封。这些socket 会自动处理信封的。但对大多数有意义的请求-回复模型,你会很想了解信封,特别是在用ROUTER socket的时候。我们会一步步讲解这些的。

最简单的回复信封

  一个请求-回复的交换包含一个请求message,和一个最后的回复message。在这个简单的请求-回复模型中,每个请求都会有一个回复。在更高级的模型中,请求和回复可以完全异步的。但回复信封总是以相同方式工作。

  zmq的回复信封通常包含0个或多个回复地址,然后是一个空帧(信封分隔符),再然后才是message本身(0个或多个帧)。信封是由在一个链上协同工作的多个socket创建的。我们接下来会详细说明。

  我们从REQ socket发出”Hello”开始。REQ socket会创建最简单的回复信封,该信封没有地址,只有一个空的分隔帧和包含”Hello”字符串的message帧。这是个双帧message。

f26

  REP socket会做匹配工作:它会剥去信封,直到找到分割帧,保存全部的信封(包括分割帧),然后把”Hello”字符串向上传递给应用层。因此在我们最开始内部用请求-回复信封的Hello World例子中应用层并没有看到信封。

  如果你检查了在hwclient和hwserver之间的网络数据流,你会发现:每个请求和每个回复实际上是两个帧,一个空帧然后才是message体。这看起来对简单的REQ-REP会话没什么意义。然而当我们开始探索ROUTER和DEALER怎么处理信封的时候你就会知道原因了。

扩展的回复信封

  现在让我们在中间用ROUTER-DEALER代理扩展REQ-REP对,看看回复信封的影响。这是我们在第二章已经看到过的扩展的请求-回复模型。事实上,我们可以插入任意个数的代理步骤,整个机制是相同的。

f27

  用伪代码表示这个代理做的工作:

1
2
3
4
5
6
7
8
9
prepare context, frontend and backend sockets
while true:
poll on both sockets
if frontend had input:
read all frames from frontend
send to backend
if backend had input:
read all frames from backend
send to frontend

  ROUTER socket跟其他的socket不同,它会跟踪它有的每个连接,并且把这告诉给调用者。它告诉调用者的方法是在收到的每个message最开头添加一个连接的标识(indetity)帧。一个标识帧,有时候也被称作一个地址,只是一个无意义的二进制字符串除了告诉你”这是一个独一无二的连接句柄”。然后,当你通过ROUTER socket发送message的时候,你需要首先发送一个标识帧。

  zmq_socket()的手册页如下:

当收到message的时候,ZMQ_ROUTER socket会在把message传递给应用之前自动在message的前边加上一个标识对端的标识帧。message采用公平队列的方式从所有连接上来的对端接收。当发送message的时候,ZMQ_ROUTER socket会自动移除第一个帧,并用它去决定这个message该路由给谁。

  需要注意的是zmqv2.2及以前的版本用UUID作为标识,zmqv3.3及以后的使用short int作为标识。这对网络性能有一定影响,但只是对你用多代理节点的时候才些影响,这种情况很少见的。最大的改变是移除了libzmq库对UUID库的依赖。

  标识是个比较难理解的概念,但对你成为zmq专家至关重要。ROUTER socket会为跟它相连的每个连接创建一个随机标识。如果有3个REQ socket连接到一个ROUTER socket上,它就会创建3个随机标识,对应每个REQ socket。

  因此如果我们继续已经研究着的例子,假设REQ socket有一个3字节的标识ABC。在内部,这意味着ROUTER socket会保持一个hash表,它可以在hash表中找到ABC,并因此找到对应REQ socket的TCP连接。

  当我们从ROUTER socket上收到message的时候,我们会得到三个帧。

f28

  代理循环的核心是”从一个socket读取数据,写到另一个socket中去”,所以我们一个个把这三个帧发送到DEALER socket中去。如果你现在抓取一下网络数据包,你会发现有三个帧从DEALER socket发送到REP socket。REP socket跟以前做的一样,截取包括新回复地址的全部信封,再把”Hello”递送给调用者。

  顺便说下REQ socket指能每次处理一个请求-回复会话,如果你想一次读取多个请求或发送多个回复,而没严格遵守发送-接收循环的话,就会收到错误。

  现在你就应该能看到回复路径了。当hwserver回复”World”,REP socket用它保存的信封封装这个帧,发送给DEALER socket一个三帧的回复message。

f29

  现在DEALER读取这三个帧,通过ROUTER socket把它们全部发送出去。ROUTER socket把第一帧拿出来,也就是ABC标识,然后根据这个标识查找其连接,然后ROUTER socket会把剩下的两个帧发送出去。

f30

  REQ socket收到这两个帧的message,检查第一帧是不是空分割帧,在例子中确实是。REQ socket丢掉空分割帧,把”World”递送给调用程序,把它神奇的显示给新接触zmq的我们。

这样的好处是什么?

  老实说,严格的请求-回复模型或者扩展的请求回复模型的应用场景多少有点儿限制。全都是因为该模型没有从服务器崩溃这种常见错误中恢复过来的简单办法。我们会在第四章看到更多关于这种问题的讨论。但一旦你掌握了这四种socket是如何处理信封的,以及它们怎么相互通信的,你就能做很多有用的事情。我们已经看过了ROUTER是怎么利用回复信封去判断该把回复返回给哪个client REQ socket去。现在我们换个角度来看看:

  • 每次ROUTER会给你个message,它会通过标识帧告诉你该message来自哪个对端。
  • 你可以用一个hash表(用标识作为键)来跟踪新建立连接的对端。
  • 如果你在message的第一帧加上标识帧,ROUTER会把它收到的message异步的发给连接上来的对端。

  ROUTER socket不关心整个信封。它们并不知道空分割帧。它们只关心能让它们找到需要把message发送给哪个连接对端的标识帧。

复习下请求-回复socket

  让我们来复习以下:

  • REQ socket发送一个第一帧是空分割帧的message到网络中。REQ socket是同步的。REQ socket总是发送一个请求然后就等着一个回复。REQ socket每次只跟一个对端通信。如果你把一个REQ socket连到多个对端上,请求会被分散开来,每次从一个对端收到一个回复。(注:REQ socket连接n个对端,发出去1个请求,该请求不会被复制成n份,只会保持1份,会被随机分配——其实是平均分发——到其中一个对端,并期望从该对端接收一个回复。)
  • REP socket会读取并保存包括空分割帧在内的信封(注:也包括所有的标识帧),然后把剩下的帧递送给调用者。REP socket是同步的,每次只跟一个对端通信。如果你把一个REP socket连到多个对端,它会从这些对端按公平队列的顺序读取请求,并且回复会被送到最近一个发来请求的那个相同的对端。
  • DEALER socket会忽略回复信封,把它当成普通的多帧message。DEALER socket跟PUSH和PULL组合一样是异步的。它们在所有的连接中平均分发message,并从所有连接中按公平队列的方式接收message。
  • ROUTER socket跟DEALER一样会忽略回复信封。它会为连接创建标识,把该标识作为收到的message的第一帧递送给调用者。反过来,当调用者要发送一个message,它会把message的第一帧作为标识去查找要发送到哪个连接去。ROUTER是异步的。

请求-回复组合

  我们有四个请求-回复的socket,每个都有特定的行为。我们已经看过它们在简单的和扩展的请求-回复模型中是怎么连接的了。但这些socket构建的模块可以用来解决很多问题。

  合法的组合是以下这些:

  • REQ to REQ
  • DEALER to REP
  • REQ to ROUTER
  • DEALER to ROUTER
  • DEALER to DEALER
  • ROUTER to ROUTER

  下面这些组合是非法的(我会解释为什么):

  • REQ to REQ
  • REQ to DEALER
  • REP to REP
  • REP to ROUTER

  有一些技巧来记住这些组合。DEALER像一个异步的REQ socket,ROUTER就像异步的REP socket。使用REQ socket的地方,就可以用DEALER——只不过需要我们自己去读取写入信封。使用REP socket的地方就可以用ROUTER——我们必须自己去管理标识帧。

  把REQ和DEALER socket想象成”client”,把REP和ROUTER socket想象成”server”。大多数时候,你都可以绑定REP和ROUTER socket,让REQ和DEALER socket去连接它们。情况不总是这么简单的,但这是个简单的开始。

REQ to REP组合

  我们已经知道了REQ client怎么跟REP server通信的,但现在让我们来看看另一个方面:REQ client 必须初始化message流。一个REP server在没有收到请求的时候是不能发信息给REQ client的。技术上来说,这是不可能的,如果你试下就知道API会返回EFSM错误给你。

DEALER to REP组合

  现在,用DEALER代替REQ client。这给了我们一个异步的client,它能跟多个REP server通信。如果我们用DEALER重写”Hello World” client,我们就能够不用等待回复而发送任意数目的”Hello”请求了。

  当我们用DEALER跟一个REP socket通信,我们必须准确模拟像REQ socket那样的信封,否则REP socket会把这些message当做无效的数据丢掉。所以,要发送一个message,我们需要:

  • 发送一个带有MORE标志的空message帧,然后
  • 发送message本身。

  当我们收到一个message的时候,需要:

  • 接收第一帧,如果它不为空,就丢掉整个message;
  • 接收剩余的帧,把它(们)递送给调用者。

REQ to ROUTER 组合

  跟我们用DEALER代替REQ一样,我们也可以用ROUTER代替REP。这给了我们一个异步的server,它能同时跟多个REQ client通信。如果我们用ROUTER重写”Hello World”,我们就能够并行的处理任意数目的”Hello”了。我们在第二章mtserver的例子看到过这种用法。

  我们可以用两种不同的方法使用ROUTER:

  • 在前端和后端socket之间作为代理交换message。
  • 当做应用读取message,并处理message。

  在第一种情景中,ROUTER只是简单的读取所有帧,包括添加的标识帧,然后传送过去。在第二种情景中,ROUTER必须知道它要回复的信封的格式。因为对端是个REQ socket, ROUTER先拿到标识帧,然后是一个空分割帧,再然后是数据帧。

DEALER to ROUTER组合

  现在我们用DEALER和ROUTER换出REQ和REP来得到最强大的socket组合,就是DEALER跟ROUTER通信。它给了我们异步的client跟异步的server的通信,两端对message格式都有完全的控制权。

  因为DEALER和ROUTER都能操作任意格式的message,如果你希望安全的使用它们,你必须多多少少成为个协议设计者。至少你需要决定是不是希望模拟REQ/REP回复信封。它依赖于你是否真的需要发送回复。

DEALER to DEALER 组合

  你可以用一个ROUTER换掉REP,你也可以用一个DEALER换掉REP,只要这个DEALER只跟一个对端通信。

  当你用DEALER换掉REP,你的worker会忽然变成全异步的,可以返回任意数量的回复。代价就是你必须自己管理回复信封,正确得到它们,否则什么都不会正常工作。我们稍后会看一个已经能工作的例子。现在对我们来说DEALER to DEALER是个有点儿技巧的模式,让人高兴的是我们很少需要它。

ROUTER to ROUTER 组合

  这听起来是个完美的N-to-N连接,但这是最难用的组合。除非你非常了解zmq,否则别去用它。我们会在第四章Freelance模式中看到一个例子,在第八章有个用DEALER to ROUTER的端对端连接的替代的设计。

非法组合

  大多数情况下,尝试去连接client和client,或者server和server是个坏主意且不能正常工作。但相对于只给出模糊的警告,我会更详细的介绍一下:

  • REQ to REQ:两端都想一开始就发送message给对方,这只会在你排好序让对端在同时交换message的时候才工作。只是想想就够头疼的了。
  • REQ to DEALER:理论上你可以这样做,但这会在你加入第二个REQ的时候崩溃掉,因为DEALER不知道要把回复发送给哪一个对端。因此REQ socket会疑惑收到的回复message是不是给自己的。
  • REQ to REP:两端都在等着对方发送第一条message。
  • REP to ROUTER:ROUTER socket理论上会启动这个对话并能发送一个合法格式的请求,只要它知道REP socket已经建立了连接并且它知道连接的标识帧。相对DEALER to ROUTER的组合,这完全是一团糟,并不会有什么新东西加进来。

  关于合法和不合法之间区别的一个通用的判别思想是zmq socket连接总是偏向绑定的那一端,而不是连接的那一端。更进一步来说,哪一端绑定哪一端连接并不是随意的,而是遵循自然形成的模型。我们希望”总是在那儿”的那一端要绑定:它可能是server,中介,发布者或者一个收集者。”来来去去”的那一端要连接:它可能是client或者worker。记住这些会帮你设计更好的zmq应用架构。

深入理解ROUTER socket

  让我们更深入的来看下ROUTER socket。我们已经知道它们怎么把指定的message路由到特定的连接对端去。我会更详细的解释我们怎么标识这些连接的,以及ROUTER socket在它不能发送message的时候会做什么。

标识和位置

  在zmq中标识的概念特指ROUTER socket怎么定位区分跟它相连的连接。更广泛的说,标识在回复信封中被当做地址使用。在多数情况下,标识是任意的,只对ROUTER socket有效:它会在hash表中查找键。单独来讲,一个对端可能有的地址可以是物理性的(一个网络端点比如”tcp://192.168.55.117:5670”)或逻辑性的(一个UUID或email地址或其他唯一key)。

  一个用ROUTER socket的应用在建立起来必须的hash表之后,就可以把逻辑地址转换成一个标识去跟指定的对端通信了。因为ROUTER socket只在对端发送一个message之后才知道该连接的标识,所以你只能回复过去message,不能主动发起连接。(注:在实际中可以这样做,使用ROUTER的一端在连接建立之前就知道了标识是什么,该标识应该是预先设定好的,那在连接建立之后,ROUTER一端就可以首先发送message。)

  即使你调转下用法,让ROUTER去连接对端而不是等着对端连接过来这也是真的。但你可以强制ROUTER socket使用一个逻辑地址而不是它自己生成的标识。zmq_setsockopt手册页称这为设置socket标识。工作流程如下:

  • 对端应用在绑定或者连接之前设置它自己的socket(DEALER或者REQ)的ZMQ_IDENTITY选项。
  • 通常是对端去连接已经绑定好的ROUTER socket。但ROUTER也能反过来连接对端。
  • 在连接的时候,对端socket告诉ROUTER socket:”该连接请用这个标识”。
    - 如果对端没有那样说,ROUTER就会为该连接生成一个随机标识。
    - ROUTER socket现在为从该连接发过来的所有message都提供这个逻辑地址作为message的前缀标识帧。
    - ROUTER socket也希望出去的message的第一个标识帧都是该逻辑地址。

  下面是两个对端连接一个ROUTER socket的例子,一个强制用了逻辑地址”PEER2”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// identity: Identity check in C
// Demonstarte request-reply identities

#include "zhelpers.h"

int main(void){
void *context = zmq_ctx_new ();
void *sink = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (sink, "inproc://example");

// First allow 0MQ to set the identity
void *anonymous = zmq_socket (context, ZMQ_REQ);
zmq_connect (anonymous, "inproc://example");
s_send (anonymous, "ROUTER uses a generated UUID");
s_dump (sink);

// Then set the identity ourselves
void *identified = zmq_socket (context, ZMQ_REQ);
zmq_setsockopt (identified, ZMQ_IDENTITY, "PEER2", 5);
zmq_connect (identified, "inproc://example");
s_send (identified, "ROUTER socket uses REQ's socket identity");
s_dump (sink);

zmq_close (sink);
zmq_close (anonymous);
zmq_close (identified);
zmq_ctx_destroy (context);
return 0;
}

  下面是该程序打印出来的信息:

1
2
3
4
5
6
7
8
----------------------------------------
[005] 006B8B4567
[000]
[026] ROUTER uses a generated UUID
----------------------------------------
[005] PEER2
[000]
[038] ROUTER uses REQ's socket identity

ROUTER错误处理

  ROUTER socket对那些没办法发送出去的message有一些野蛮的处理方式:直接静默丢掉。在工作代码中这是种合理的态度,但这让调试变得很难。我们学习的时候常常会弄错”第一帧必须是标识帧”这个前提,那在我们弄错的时候ROUTER也不会报错就相当没有建设性了。

  从zmq v3.2版本开始有个socket选项,你可以设置来捕捉该错误:ZMQ_ROUTER_MANDATORY。在ROUTER socket上设置该选项,然后当你发送一个不可到达的标识的时候,socket会发出一个EHOSTUNREACH的错误。

负载均衡模型

  现在让我们来看一些代码。我们会看到怎么把一个ROUTER socket连到一个REQ socket,然后是一个DEALER socket上。这两个例子有着相同的逻辑,这就是负载均衡模型。该模型是我们第一个介绍用ROUTER socket来做特定路由的例子,不仅仅是当做一个回复通道来用。

  负载均衡模型非常通用,我们会在本书看到好几次。它用简单的轮询路由(像PUSH和DEALER提供的那样)解决了在任务没有大致花费相同时间的情况下轮训会变得低效的问题。

  我们分析下邮局的例子。邮局里每个柜台都会有一队人,这些人有的在买邮票(快速简单的交易),一些人在开户(非常慢的交易),然后你会发现买邮票的人被卡在队伍中间了。跟邮局的例子一样,如果你的消息传输架构是不公平的,人们会变得很恼火。

  邮局里的解决办法就是新建一个单独的队列,好让一两个用户在等着慢交易的同时,其他用户会根据先来先服务的顺序继续得到服务。

  PUSH和DEALER使用过分简单的实现方式的一个原因就是效率问题。如果你到任何一个美国主要机场,你会发现等着通关的很长的队伍。机场工作人员会提前把人们引到柜台前的队伍中,而不是用一个单独的队列。提前多走50步会让每个乘客节省一到两分钟。因为每个过关检查的时间大概都差不多,这差不多是公平的。这就是PUSH和DEALER的策略:尽可能早的发送工作任务,好让传输距离更短。

  zmq有个经常出现的主题:现实世界中的问题是不同的,你需要对不同问题使用不同的解决方法。机场的情况跟邮局的不同,不能相提并论。

  让我们回到一个worker(DEALER或者REQ)跟一个中介(ROUTER)连接的例子来。该中介必须知道worker什么时候准备好了,维持一个worker的列表,好每次选择*最久没用过的(the least recently used)*一个worker。

  事实上解决办法相当简单:worker在它们启动的时候和完成一个任务的时候都发送一个”ready”的消息。中介一个个拿到这些消息,每当它拿到一个消息,该消息就是从最久没用过的worker发来的。因为我们使用的是一个ROUTER socket,我们能拿到一个标识好让我们能重新返回给该worker新的任务。

  这是请求-回复模型的一个调整,因为任务是在回复的时候发出去的,针对该任务的回复是用一个新请求返回的。下面的代码例子应该会让它清楚点儿。

ROUTER 中介和REQ worker

  下面这个例子是一个ROUTER 中介跟一组REQ worker通信的使用负载均衡模型的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// rtreq:ROUTER-to-REQ in C
// ROUTER-to-REQ example

#include "zhelpers.h"
#include <pthread.h>
#define NBR_WORKERS 10

static void *
worker_task (void *args)
{
void *context = zmq_ctx_new ();
void *worker = zmq_socket (context, ZMQ_REQ);
s_set_id (worker); // Set a printable identity
zmq_connect (worker, "tcp://localhost:5671");

int total = 0;
while (1) {
// Tell the broker we're ready for work
s_send (worker, "Hi Boss");

// Get workload from broker, until finished
char *workload = s_recv (worker);
int finished = (strcmp (workload, "Fired!") == 0);
free (workload);
if (finished) {
printf ("Completed: %d tasks\n", total);
break;
}
total++;

// Do some random work
s_sleep (randof (500) + 1);
}
zmq_close (worker);
zmq_ctx_destroy (context);
return NULL;
}

// While this example runs in a single process, that is only to make
// it easier to start and stop the example. Each thread has its own
// context and conceptually acts as a separate process.

int main (void)
{
void *context = zmq_ctx_new ();
void *broker = zmq_socket (context, ZMQ_ROUTER);

zmq_bind (broker, "tcp://*:5671");
srandom ((unsigned) time (NULL));

int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_task, NULL);
}
// Run for five seconds and then tell workers to end
int64_t end_time = s_clock () + 5000;
int workers_fired = 0;
while (1) {
// Next message gives us least recently used worker
char *identity = s_recv (broker);
s_sendmore (broker, identity);
free (identity);
free (s_recv (broker)); // Envelope delimiter
free (s_recv (broker)); // Response from worker
s_sendmore (broker, "");

// Encourage workers until it's time to fire them
if (s_clock () < end_time)
s_send (broker, "Work harder");
else {
s_send (broker, "Fired!");
if (++workers_fired == NBR_WORKERS)
break;
}
}
zmq_close (broker);
zmq_ctx_destroy (context);
return 0;
}

  该例子运行5s,然后每个worker会打印出它们总共处理了多少各任务,我们希望得到一个相对平均的任务量:

1
2
3
4
5
6
7
8
9
10
Completed: 20 tasks
Completed: 18 tasks
Completed: 21 tasks
Completed: 23 tasks
Completed: 19 tasks
Completed: 21 tasks
Completed: 17 tasks
Completed: 17 tasks
Completed: 25 tasks
Completed: 19 tasks

例子中为了跟worker通信,我们必须创建一个跟REQ类似的信封,也就是包含一个标识帧加上一个空分割帧。

f31

ROUTER 中介和DEALER workers

  你能用REQ的地方就能用DEALER。主要有两个不同:

  • REQ socket总是在任何数据帧前边添加一个空帧;DEALER 不是。
  • REQ socket在收到回复之前只会发送一个信息; DEALER是全异步的。

  异步和同步行为对我们的例子没影响,因为我们用的是严格的request-reply模型。它对我们后边会提到的从失败中恢复有比较大的相关性,我们会在第四章提到。

  现在我们来看一个差不多一样的例子,但是用DEALER socket代替了REQ socket:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// rtdealer: ROUTER-to-DEALER in C
// ROUTER-to-DEALER example

#include "zhelpers.h"
#include <pthread.h>
#define NBR_WORKERS 10

static void *
worker_task (void *args)
{
void *context = zmq_ctx_new ();
void *worker = zmq_socket (context, ZMQ_DEALER);
s_set_id (worker); // Set a printable identity
zmq_connect (worker, "tcp://localhost:5671");

int total = 0;
while (1) {
// Tell the broker we're ready for work
s_sendmore (worker, "");
s_send (worker, "Hi Boss");

// Get workload from broker, until finished
free (s_recv (worker)); // Envelope delimiter
char *workload = s_recv (worker);
int finished = (strcmp (workload, "Fired!") == 0);
free (workload);
if (finished) {
printf ("Completed: %d tasks\n", total);
break;
}
total++;

// Do some random work
s_sleep (randof (500) + 1);
}
zmq_close (worker);
zmq_ctx_destroy (context);
return NULL;
}

// While this example runs in a single process, that is just to make
// it easier to start and stop the example. Each thread has its own
// context and conceptually acts as a separate process.

int main (void)
{
void *context = zmq_ctx_new ();
void *broker = zmq_socket (context, ZMQ_ROUTER);

zmq_bind (broker, "tcp://*:5671");
srandom ((unsigned) time (NULL));

int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_task, NULL);
}
// Run for five seconds and then tell workers to end
int64_t end_time = s_clock () + 5000;
int workers_fired = 0;
while (1) {
// Next message gives us least recently used worker
char *identity = s_recv (broker);
s_sendmore (broker, identity);
free (identity);
free (s_recv (broker)); // Envelope delimiter
free (s_recv (broker)); // Response from worker
s_sendmore (broker, "");

// Encourage workers until it's time to fire them
if (s_clock () < end_time)
s_send (broker, "Work harder");
else {
s_send (broker, "Fired!");
if (++workers_fired == NBR_WORKERS)
break;
}
}
zmq_close (broker);
zmq_ctx_destroy (context);
return 0;
}

  除了worker使用了DEALER socket代码差不多是一样的,并且也在数据帧之前写入读取空帧。这是因为我在实现的时候想跟REQ worker保持兼容。

  然而,记住使用空帧的理由:它允许多节点去扩展request,该request会在一个REP socket结束,该REP socket使用空帧去截取数据帧。

  如果我们从来不需要用REP socket,我们可以在两端都去掉空帧,这会让事情变得容易些。这就是我对纯DEALER 到 ROUTER 协议使用的设计。

一个负载均衡的消息broker

  前一个例子是不完整的。它可以用虚拟的请求回复管理一组worker,但是他没有办法通知client。如果我们加上了第二个前端ROUTER socket,让它来接收client的请求,然后把我们的例子转换成一个能在前端后端交换信息的proxy,我们就得到了一个有用可复用的小型负载均衡broker。

f32

  Broker做以下任务:

  • 接收一组client的连接
  • 接收一组worker的连接
  • 接收从client来的请求,并保存在一个单独的队列中
  • 使用负载均衡模块给worker发送请求
  • 从worker接收返回的信息
  • 把这些返回的信息返回给最开始发出任务的client

  Broker代码相当长,但值得去理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
// lbbroker: Load balancing broker in C
// Load-balancing broker
// Clients and workers are shown here in-process

#include "zhelpers.h"
#include <pthread.h>
#define NBR_CLIENTS 10
#define NBR_WORKERS 3

// Dequeue operation for queue implemented as array of anything
#define DEQUEUE(q) memmove (&(q)[0], &(q)[1], sizeof (q) - sizeof (q [0]))

// Basic request-reply client using REQ socket
// Because s_send and s_recv can't handle 0MQ binary identities, we
// set a printable text identity to allow routing.
//
static void *
client_task (void *args)
{
void *context = zmq_ctx_new ();
void *client = zmq_socket (context, ZMQ_REQ);
s_set_id (client); // Set a printable identity
zmq_connect (client, "ipc://frontend.ipc");

// Send request, get reply
s_send (client, "HELLO");
char *reply = s_recv (client);
printf ("Client: %s\n", reply);
free (reply);
zmq_close (client);
zmq_ctx_destroy (context);
return NULL;
}

// While this example runs in a single process, that is just to make
// it easier to start and stop the example. Each thread has its own
// context and conceptually acts as a separate process.
// This is the worker task, using a REQ socket to do load-balancing.
// Because s_send and s_recv can't handle 0MQ binary identities, we
// set a printable text identity to allow routing.

static void *
worker_task (void *args)
{
void *context = zmq_ctx_new ();
void *worker = zmq_socket (context, ZMQ_REQ);
s_set_id (worker); // Set a printable identity
zmq_connect (worker, "ipc://backend.ipc");

// Tell broker we're ready for work
s_send (worker, "READY");

while (1) {
// Read and save all frames until we get an empty frame
// In this example there is only 1, but there could be more
char *identity = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
free (empty);

// Get request, send reply
char *request = s_recv (worker);
printf ("Worker: %s\n", request);
free (request);

s_sendmore (worker, identity);
s_sendmore (worker, "");
s_send (worker, "OK");
free (identity);
}
zmq_close (worker);
zmq_ctx_destroy (context);
return NULL;
}

// This is the main task. It starts the clients and workers, and then
// routes requests between the two layers. Workers signal READY when
// they start; after that we treat them as ready when they reply with
// a response back to a client. The load-balancing data structure is
// just a queue of next available workers.

int main (void)
{
// Prepare our context and sockets
void *context = zmq_ctx_new ();
void *frontend = zmq_socket (context, ZMQ_ROUTER);
void *backend = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (frontend, "ipc://frontend.ipc");
zmq_bind (backend, "ipc://backend.ipc");

int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) {
pthread_t client;
pthread_create (&client, NULL, client_task, NULL);
}
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_task, NULL);
}
// Here is the main loop for the least-recently-used queue. It has two
// sockets; a frontend for clients and a backend for workers. It polls
// the backend in all cases, and polls the frontend only when there are
// one or more workers ready. This is a neat way to use 0MQ's own queues
// to hold messages we're not ready to process yet. When we get a client
// reply, we pop the next available worker and send the request to it,
// including the originating client identity. When a worker replies, we
// requeue that worker and forward the reply to the original client
// using the reply envelope.

// Queue of available workers
int available_workers = 0;
char *worker_queue [10];

while (1) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
int rc = zmq_poll (items, available_workers ? 2 : 1, -1);
if (rc == -1)
break; // Interrupted

// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
// Queue worker identity for load-balancing
char *worker_id = s_recv (backend);
assert (available_workers < NBR_WORKERS);
worker_queue [available_workers++] = worker_id;

// Second frame is empty
char *empty = s_recv (backend);
assert (empty [0] == 0);
free (empty);

// Third frame is READY or else a client reply identity
char *client_id = s_recv (backend);

// If client reply, send rest back to frontend
if (strcmp (client_id, "READY") != 0) {
empty = s_recv (backend);
assert (empty [0] == 0);
free (empty);
char *reply = s_recv (backend);
s_sendmore (frontend, client_id);
s_sendmore (frontend, "");
s_send (frontend, reply);
free (reply);
if (--client_nbr == 0)
break; // Exit after N messages
}
free (client_id);
}
// Here is how we handle a client request:

if (items [1].revents & ZMQ_POLLIN) {
// Now get next client request, route to last-used worker
// Client request is [identity][empty][request]
char *client_id = s_recv (frontend);
char *empty = s_recv (frontend);
assert (empty [0] == 0);
free (empty);
char *request = s_recv (frontend);

s_sendmore (backend, worker_queue [0]);
s_sendmore (backend, "");
s_sendmore (backend, client_id);
s_sendmore (backend, "");
s_send (backend, request);

free (client_id);
free (request);

// Dequeue and drop the next worker identity
free (worker_queue [0]);
DEQUEUE (worker_queue);
available_workers--;
}
}
zmq_close (frontend);
zmq_close (backend);
zmq_ctx_destroy (context);
return 0;
}

  这段程序难理解的部分在于(a)每个socket读写的信封,(b)负载均衡算法。我们会一个个讨论,从信封格式开始。

  让我们从client到worker再反过来疏通一遍request-reply链。代码中我们设置了client和worker socket 的id用来追踪消息帧。事实上,我们可以让ROUTER socket自己去为建立的连接创建id。我们假设client的id是“CLIENT”,worker的id是“WORKER”。Client程序发送一个包含“hello”的单帧。

f33

  因为REQ socket加上了它自己的空帧,ROUTER socket加上了它自己的连接id,proxy从前端ROUTER socket能督导client地址、空帧和数据帧部分。

f34

  Broker 把这坨东西发给worker,加上选择的worker地址和一个空帧,好让worker那一端的REQ高兴(REQ看到一个空帧就高兴)。

f35

  这个复杂的信封栈在后端的ROUTER socket第一次被消化一点,移走了第一个帧。然后worker端的REQ socket移走了空帧,把余下的东西给了worker程序使用。

f36

  Worker必须保存信封(包括空帧),然后就可以对数据帧为所欲为了。注意REQ socket会自动的做这些事情,但是我们正在使用的是REQ-ROUTER模式,这样就能用负载均衡了。

  在返回的路径上,消息跟他们来时一样,比如说后端socket给broker一个5帧的消息,broker给前端socket一个3帧的消息,client会得到一个1帧的消息。

  现在让我们来瞧瞧负载均衡算法。它要求client和worker都使用REQ socket,好让worker正确的存储回复他们得到的信封。算法是这样的:

  • 创建一个总是选择后端的选择池,和一个只在后端有一个或更多的worker可用时才选择前端的选择池。
  • 无限等待事件发生
  • 如果后端有事件发生,我们会得到一个“ready”或者一个回复。不论哪种情形,我们都把worker的地址存到工作队列中,如果剩下的部分是个对client的回复,我们就通过前端socket把它发给client。
  • 如果前端有事件发生,我们会得到一个client的请求,pop出下一个worker(最后使用的),把请求发给后端。这意味着要发送worker 地址,空帧,和三部分的client请求。

  现在你就能看出来你可以复用和通过worker提供的多样化的初始化“ready”信息扩展负载均衡算法。例如,worker可能启动后做一个自我的性能测试,告诉broker他们多快,然后broker就可以选择最快的可用worker而不是最后被使用的。

一个针对ZMQ的高级API

  我们先把request-reply拍在沙滩上,来看另一个不同的领域:ZMQ 的API本身。这样绕道是因为:我们写越来越复杂的例子,低级的ZMQ API开始看起来发了疯式的增长。看下我们前边的写负载均衡broker中的worker线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
while (true) {
// Get one address frame and empty delimiter
char *address = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
free (empty);

// Get request, send reply
char *request = s_recv (worker);
printf ("Worker: %s\n", request);
free (request);

s_sendmore (worker, address);
s_sendmore (worker, "");
s_send (worker, "OK");
free (address);
}

该代码甚至不可复用,因为它仅仅处理了信封里一个reply地址,并且在zmq api上做了一些包装。如果我们用原生的libzmq API,必须这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
while (true) {
// Get one address frame and empty delimiter
char address [255];
int address_size = zmq_recv (worker, address, 255, 0);
if (address_size == -1)
break;

char empty [1];
int empty_size = zmq_recv (worker, empty, 1, 0);
zmq_recv (worker, &empty, 0);
assert (empty_size <= 0);
if (empty_size == -1)
break;

// Get request, send reply
char request [256];
int request_size = zmq_recv (worker, request, 255, 0);
if (request_size == -1)
return NULL;
request [request_size] = 0;
printf ("Worker: %s\n", request);

zmq_send (worker, address, address_size, ZMQ_SNDMORE);
zmq_send (worker, empty, 0, ZMQ_SNDMORE);
zmq_send (worker, "OK", 2, 0);
}

  代码越长越写的慢,也更难理解。到目前为止,我一直用原生的API,因为作为ZMQ的使用者,我们需要知道熟悉的了解。但是当它挡着我们的路的时候,就必须当个问题去解决了。

  我们不能简单地改变ZMQ API,它已经书面化了并被很多人认同和遵守了。相反,我们可以在我们的经验上构建一个高级API,更具体的是我们写那些复杂的request-reply模型时的经验。

  我们希望有个API让我们一次收发一整个消息,包括任意数量的回复地址的消息;让我们使用最少的代码写出我们想要的功能。

  构建一个好的消息API相当困难。我们有个概念上的问题:zmq使用“message”去描述多帧消息和消息的一个帧。我们有个期望的问题:有时候它需要自然的打印出消息的字符串数据和二进制块。还有很对技术问题,特别是我们不想过多复制数据。

  构建一个好的API是所有语言的挑战,尽管我用的是C。不管你使用哪种语言,认真想想你怎么去做你的语言绑定。

高级API的特性

  我的解决办法是用三个相对自然和显然的概念:string(在s_send 和s_recv中就已经使用的),frame(一个消息帧),和message(一个或多个消息帧)。下面是worker的代码,用这些概念及API重写的:

1
2
3
4
5
while (true) {
zmsg_t *msg = zmsg_recv (worker);
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}

  减少要写和要读的代码量是相当爽的:结果就是更易读易理解。让我们继续这个过程来审查zmq的其他方面。下面是我希望一个高级API应有的素养,基于我对使用zmq的经验:

  • 自动处理socket。我发现手动关闭socket相当笨蛋,并且有时候必须明确规定linger超时时间(并不是所有情况都需要)。如果当我关闭context的时候就能自动关闭socket就很好啦。
  • 方便的线程管理。每一个重要的zmq程序都会使用线程,但是POSIX线程不方便。所以一个好的高级API 应该能把这些隐藏在一个方便的层里。
  • 从父线程通向子线程的通道。这是个经常性的问题:怎么在父子线程之间发送信号。我们的API应该提供一个zmq消息通道(使用PAIR socket和inproc自动创建)。
  • 方便的clock。甚至得到精确到毫秒的时间的解决方法,或者睡眠个毫秒级都是不方便的。现实的zmq程序需要方便的时钟,所以我们的api需要提供他们。
  • 用反射代替zmq_poll。Poll的循环是简单的,但是太复杂。通过写了很多这样的东西,我们发现是在一遍一遍做相同的事情:计算计时器,当socket准备好以后唤醒代码。一个带socket读取和计时器的反射机制可以节省很多重复性的工作。
  • 恰当的处理Ctrl-C,我们已经见识了怎么捕获一个中断。如果这发生在所有的应用中会很有用。

CZMQ高级API

  把上边这些愿望列表转换成实际的c语言代码就是CZMQ。这种高级封装事实上是从先前的例子版本中发展而来的。它用一些方便的层组合了zmq的一些工作机制,并且提供了一些像hash表和list一样的容器(对c很重要,对其他语言就不咋地)。

  下面是用高级API重写的负载均衡broker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// lbbroker2: Load balancing broker using high-level API in C
// Load-balancing broker
// Demonstrates use of the CZMQ API

#include "czmq.h"

#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define WORKER_READY "\001" // Signals worker is ready

// Basic request-reply client using REQ socket
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://frontend.ipc");

// Send request, get reply
while (true) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break;
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}

// Worker using REQ socket to do load-balancing
//
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://backend.ipc");

// Tell broker we're ready for work
zframe_t *frame = zframe_new (WORKER_READY, 1);
zframe_send (&frame, worker, 0);

// Process messages as they arrive
while (true) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}

// Now we come to the main task. This has the identical functionality to
// the previous lbbroker broker example, but uses CZMQ to start child
// threads, to hold the list of workers, and to read and send messages:

int main (void)
{
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "ipc://frontend.ipc");
zsocket_bind (backend, "ipc://backend.ipc");

int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (client_task, NULL);
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (worker_task, NULL);

// Queue of available workers
zlist_t *workers = zlist_new ();

// Here is the main loop for the load balancer. It works the same way
// as the previous example, but is a lot shorter because CZMQ gives
// us an API that does more with fewer calls:
while (true) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
if (rc == -1)
break; // Interrupted

// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
// Use worker identity for load-balancing
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // Interrupted
zframe_t *identity = zmsg_unwrap (msg);
zlist_append (workers, identity);

// Forward message to client if it's not a READY
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// Get client request, route to first available worker
zmsg_t *msg = zmsg_recv (frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
zmsg_send (&msg, backend);
}
}
}
// When we're done, clean up properly
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
}

  CZMQ提供的一个很有用的东西是干净的处理中断。这意味着Ctrl-C会使任何阻塞着的zmq调用返回-1并置errno为EINTR。这种情况下高级API会返回NULL。所以你就可以像这样干净的退出loop:

1
2
3
4
5
6
7
8
9
while (true) {
zstr_send (client, "Hello");
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupted
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}

或者,如果你调用了zmq_poll(),检查返回码:

1
2
if (zmq_poll (items, 2, 1000 * 1000) == -1)
break; // Interrupted

  前一个例子仍然用了zmq_poll()。那反射机制呢?CZMQ的zloop反射机制很简洁但很强大。它能让你:

  • 在任何一个socket上设置reader,比如不管什么时候socket有输入就调用读的代码。
  • 取消一个socket上的reader。
  • 设置一个一次性计时器或者有特定间隔时间的多个计时器。
  • 取消一个计时器。

  Zloop内部使用zmq_poll()实现。每次你添加或移除reader它都会重置自己的选择池,并且计算选择超时时间去匹配下次的计时器。然后,它会唤醒reader或者计时器操作函数去处理关注的事件。

  当我们用反射机制时,我们的代码就完全十八变了。主逻辑看起来就像是这样的:

1
2
3
4
zloop_t *reactor = zloop_new ();
zloop_reader (reactor, self->backend, s_handle_backend, self);
zloop_start (reactor);
zloop_destroy (&reactor);

  真正处理信息的逻辑放在了专用的函数或方法中。你可能不喜欢这种风格——只是口味问题。它最大的作用就是把时钟和socket事件混在了一起。在本文剩余的部分中,我们会在简单的场景中使用zmq_poll(),在复杂的例子中使用zloop。

  下面是再次重写的负载均衡,这次使用zloop:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// lbbroker3: Load balancing broker using zloop in C
// Load-balancing broker
// Demonstrates use of the CZMQ API and reactor style
//
// The client and worker tasks are identical from the previous example.

#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define WORKER_READY "\001" // Signals worker is ready

// Basic request-reply client using REQ socket
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://frontend.ipc");

// Send request, get reply
while (true) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break;
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}

// Worker using REQ socket to do load-balancing
//
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://backend.ipc");

// Tell broker we're ready for work
zframe_t *frame = zframe_new (WORKER_READY, 1);
zframe_send (&frame, worker, 0);

// Process messages as they arrive
while (true) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted
//zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}

// Our load-balancer structure, passed to reactor handlers
typedef struct {
void *frontend; // Listen to clients
void *backend; // Listen to workers
zlist_t *workers; // List of ready workers
} lbbroker_t;

// In the reactor design, each time a message arrives on a socket, the
// reactor passes it to a handler function. We have two handlers; one
// for the frontend, one for the backend:

// Handle input from client, on frontend
int s_handle_frontend (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
lbbroker_t *self = (lbbroker_t *) arg;
zmsg_t *msg = zmsg_recv (self->frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (self->workers));
zmsg_send (&msg, self->backend);

// Cancel reader on frontend if we went from 1 to 0 workers
if (zlist_size (self->workers) == 0) {
zmq_pollitem_t poller = { self->frontend, 0, ZMQ_POLLIN };
zloop_poller_end (loop, &poller);
}
}
return 0;
}

// Handle input from worker, on backend
int s_handle_backend (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
// Use worker identity for load-balancing
lbbroker_t *self = (lbbroker_t *) arg;
zmsg_t *msg = zmsg_recv (self->backend);
if (msg) {
zframe_t *identity = zmsg_unwrap (msg);
zlist_append (self->workers, identity);

// Enable reader on frontend if we went from 0 to 1 workers
if (zlist_size (self->workers) == 1) {
zmq_pollitem_t poller = { self->frontend, 0, ZMQ_POLLIN };
zloop_poller (loop, &poller, s_handle_frontend, self);
}
// Forward message to client if it's not a READY
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, self->frontend);
}
return 0;
}

// And the main task now sets up child tasks, then starts its reactor.
// If you press Ctrl-C, the reactor exits and the main task shuts down.
// Because the reactor is a CZMQ class, this example may not translate
// into all languages equally well.

int main (void)
{
zctx_t *ctx = zctx_new ();
lbbroker_t *self = (lbbroker_t *) zmalloc (sizeof (lbbroker_t));
self->frontend = zsocket_new (ctx, ZMQ_ROUTER);
self->backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (self->frontend, "ipc://frontend.ipc");
zsocket_bind (self->backend, "ipc://backend.ipc");

int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (client_task, NULL);
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (worker_task, NULL);

// Queue of available workers
self->workers = zlist_new ();

// Prepare reactor and fire it up
zloop_t *reactor = zloop_new ();
zmq_pollitem_t poller = { self->backend, 0, ZMQ_POLLIN };
zloop_poller (reactor, &poller, s_handle_backend, self);
zloop_start (reactor);
zloop_destroy (&reactor);

// When we're done, clean up properly
while (zlist_size (self->workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (self->workers);
zframe_destroy (&frame);
}
zlist_destroy (&self->workers);
zctx_destroy (&ctx);
free (self);
return 0;
}

  当我们发送Ctrl-C时让程序正确的关掉需要点儿技巧。如果你使用zctx类它会自动注册信号处理进程,但你的代码必须合作。你当zmq_poll返回-1或者zstr_recv, zframe_recv, zmsg_recv中的任何一个返回NULL时你必须中断任何循环。如果你有嵌套的循环,在外层变量上设置!zctx_interrupted条件会很有用。

  如果你在使用子线程,他们不会收到中断信号。为了让他们关闭掉,你可以:

  • 如果他们共享同一个context,销毁context,这种情况下任何阻塞的调用都会结束,附带ETERM。
  • 如果你他们正在使用他们自己的context,给他们发送关闭的指令。这时候你需要额外的socket管道。

异步 client/server 模型

  在ROUTER和DEALER例子中,我们看到server异步连接多个worker的1-N情景。我们也可以反过来得到一个N-1的多client连接一个server的异步架构。

f37

  它是这样工作的:

  • client连接server并且发出请求。
  • 对每个请求,server发送0或多个回复
  • client可以发出多个请求而不需要非得等着有回复
  • server可以发送多个回复而不需要非得等着有请求。

  Show me the code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// asyncsrv: Asynchronous client/server in C
// Asynchronous client-to-server (DEALER to ROUTER)
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task has its own
// context and conceptually acts as a separate process.

#include "czmq.h"

// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.

static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_DEALER);

// Set random identity to make tracing easier
char identity [10];
sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
zsocket_set_identity (client, identity);
zsocket_connect (client, "tcp://localhost:5570");

zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
int request_nbr = 0;
while (true) {
// Tick once per second, pulling in arriving messages
int centitick;
for (centitick = 0; centitick < 100; centitick++) {
zmq_poll (items, 1, 10 * ZMQ_POLL_MSEC);
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (client);
zframe_print (zmsg_last (msg), identity);
zmsg_destroy (&msg);
}
}
zstr_send (client, "request #%d", ++request_nbr);
}
zctx_destroy (&ctx);
return NULL;
}

// This is our server task.
// It uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.

static void server_worker (void *args, zctx_t *ctx, void *pipe);

void *server_task (void *args)
{
// Frontend socket talks to clients over TCP
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5570");

// Backend socket talks to workers over inproc
void *backend = zsocket_new (ctx, ZMQ_DEALER);
zsocket_bind (backend, "inproc://backend");

// Launch pool of worker threads, precise number is not critical
int thread_nbr;
for (thread_nbr = 0; thread_nbr < 5; thread_nbr++)
zthread_fork (ctx, server_worker, NULL);

// Connect backend to frontend via a proxy
zmq_proxy (frontend, backend, NULL);

zctx_destroy (&ctx);
return NULL;
}

// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:

static void
server_worker (void *args, zctx_t *ctx, void *pipe)
{
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (worker, "inproc://backend");

while (true) {
// The DEALER socket gives us the reply envelope and message
zmsg_t *msg = zmsg_recv (worker);
zframe_t *identity = zmsg_pop (msg);
zframe_t *content = zmsg_pop (msg);
assert (content);
zmsg_destroy (&msg);

// Send 0..4 replies back
int reply, replies = randof (5);
for (reply = 0; reply < replies; reply++) {
// Sleep for some fraction of a second
zclock_sleep (randof (1000) + 1);
zframe_send (&identity, worker, ZFRAME_REUSE + ZFRAME_MORE);
zframe_send (&content, worker, ZFRAME_REUSE);
}
zframe_destroy (&identity);
zframe_destroy (&content);
}
}

// The main thread simply starts several clients and a server, and then
// waits for the server to finish.

int main (void)
{
zthread_new (client_task, NULL);
zthread_new (client_task, NULL);
zthread_new (client_task, NULL);
zthread_new (server_task, NULL);
zclock_sleep (5 * 1000); // Run for 5 seconds then quit
return 0;
}

  例子运行在一个进程中,用多线程模拟出真实的多进程架构。当你运行例子的时候,你会看到三个client(每个都有个随机ID),打印出他们从server得到的回复。仔细看你会发现每个client请求会得到0或多个回复。

  对这段代码有些说明:

  • client每秒发送一个请求,得到0或多个回复。用zmq_poll()时,我们不能简单的等1秒,否则我们会在收到最后一条回复后1s才能发出新的请求。所以我们定了一个较高的频率(每次会等1/100s)。
  • server使用了一个worker的线程池,每个worker线程同步的处理一个请求。它用一个内部队列连接到前端socket。它用zmq_proxy()连接前端和后端socket。

f38

注意我们在client和server之间用的是DEALER to ROUTER会话,但在server主线程和worker之间我们用的是DEALER to DEALER。如果worker是严格的同步的,我们会选用REP。然而,由于需要发送多个回复,我们就需要用一个异步的socket。我们不想路由回复,它们总是回复到发送给我们请求的一个单独的server线程中。

让我们来看看路由的信封。client发送一个只有一个帧的message,server线程收到一个2帧message(原始的message前边加上client的标识)。我们把这两个帧继续发送给worker,worker会把这个message当做一个常规的回复信封,然后把它作为一个2帧message返回给我们。我们再把第一帧当做标识去把第二帧当做回复路由到原来的client。

上边的过程像下边图示那样:

f381

  现在考虑socket:我们可以用负载均衡的ROUTER to DEALER模型去跟worker通信,但这比较多余。在这个例子中,一个DEALER to DEALER模型就可以了:优势是每个请求有较低的延迟,但缺点是高概率的负载失衡。在这个例子中简单化处理就行了。

  当你在构建需要维持跟client有状态通信的server的时候,你会遇到一个经典的问题。如果server保持有每个client的一些状态,并且client持续的接入退出,那最后server就会用掉所有可用资源。甚至相同的client持续接入,如果你用默认的标识,那每个连接看起来也会像一个新的连接的。

  我们在上边的例子中其实做了点儿欺骗性的事情,我们只是把状态保持了很短的时间(一个worker处理一个请求的时间)然后就把状态丢掉了。但这对很多情形是不合适的。为了在一个有状态的异步server中正确的管理client的状态,你必须:

  • 建立client到server的心跳机制。在我们的例子中,我们每秒发送一个请求,这个确实可以当心跳用。
  • 把client的标识(不管是默认生成的还是明确设置的)作为key存储状态。
  • 检测停止的心跳。如果从client在两秒内(举个例子)没有发来请求,server就能检测到并且销毁它保持的关于那个client的所有状态。

实际工作的例子:内置中介路由机制

  让我们把到目前学习到的这些知识组装起来构建一个真正的应用。我们会经过好几次迭代一步步构架该应用。我们最好的客户很着急的找我们设计一个大型的云计算集群。他介绍了一下大致情况,一个横跨很多数据中心的云,每个数据中心都是client和worker的集群,每个数据中心作为一个整体进行工作。因为我们很明智的知道实践总会打击理论,我们就计划用zmq搭建一个能工作的模拟系统。客户急于在他老板改变主意之前就确定下来这个交易,并且他已经在Twitter上读了很多关于zmq的东西,就同意了我们的做法。

确定细节

  几次通过喝咖啡讨论之后,我们就想进入代码编写的阶段了,但一个小声音提醒我们在把这个大型解决方案完全弄错之前最好先得到足够多的细节。”云做的是哪种工作?”我们问道。

  客户解释到:

  • worker跑在各种不同的硬件上,但都能处理任何种类的任务。每个集群都有几百个worker,总共有几十个集群。
  • client给worker创建任务。每个任务都是一个独立的任务单元,所有的client都想找个能用的worker,尽可能快的给它发送任务。会有很多client,并且能任意连上去掉。
  • 真正的困难是在任何时候都能增加和移除集群。一个集群能带着它自己的worker和client立即离开或者加入云。
  • 如果在它们自己的及群众没有可用的worker,client的任务就会发送到云里边其他可用的worker。
  • client每次发送一个任务,等着回复。如果在X秒内没有得到回复,就会再发送一次该任务。这不是我们要关心的,clinet的API已经做好了这些。
  • worker每次处理一个任务,它们是很简单的野兽。如果它们挂掉了,就会用某种脚本重启。

  所以我们又检查了一遍以确保正确理解了需求:

  • “有某种超级网关把集群相互连接起来,对不?”我们问道。客户说:”当然,我们并不是傻瓜。”
  • “我们讨论的是什么量级的系统?”我们问。客户回复说:”每个集群最多有1000个client,每个每秒最多10个请求,请求信息很小,回复信息也很小,每个不会超过1k字节。”

  我们做了一点儿计算,看看该系统是不是能用纯TCP协议很好的工作。2500client X 10/s X 1000bytes X 2个方向 = 50MB/s或者400Mb/s,对一个1Gb的网络来说没问题。

  一个摆在面前的问题是要求不能用其他的硬件和协议,只是靠某种聪明的路由算法和良好的设计。我们先来设计一个集群(一个数据中心),然后看看怎么把集群连在一起。

单集群架构

  worker和client是同步的。我们想用负载均衡模型把任务路由给worker。worker都是完全一样的:我们的系统没有其他的服务。worker是匿名的:client从不会直接定位到它们。我们在这里不需要提供递送、回复等操作的确认。

  基于我们已经检查过的一些原因,client和worker并不会相互之间直接通信。需要做一些东西让节点能动态的添加移除。所以我们的基础模型包括了先前看到过的请求-回复message中介。

f39

扩展到多集群

  现在我们把该集群扩展到多集群。每个集群都有一套client和worker,一个broker(中介)把它们结合起来。

f40

  问题是:我们怎么让一个集群中的client跟另一个集群中的worker通信呢?下面是一些可能的实现,每种都有利有弊:

  • client能直接跟两个broker连接。好处是不需要修改broker或者worker,但client会变得很复杂,而且需要对整体的拓扑结构非常小心。如果我们想加入第三个或第四个集群,那所有的client都会收到影响。另外我们必须把路由和故障转移逻辑移到client,这看起来一点儿都不美妙。
  • worker可以直接跟两个broker连接。但REQ worker做不到,它们只能回复到一个broker。我们可以用REP,但REP没有像负载均衡那样的可定制的broker-to-worker路由机制,它只有内建的负载均衡功能。那是不行的:如果我们想把任务发送给空闲的worker,我们恰巧就需要负载均衡机制。一个解决办法是让worker节点用ROUTER socket。我们把这个想法标记为”Idea #1”。
  • 两个broker之间能相互连接。这看起来最简洁,因为它需要的附加连接最少。我们不能在运行时添加集群,但这可能超出了我们需要考虑的范围。现在client和worker不需要知道真实的网络拓扑结构,broker会在它们有多余的处理能力的时候就相互通知。我们把这个标记为”Idea #2”。

  我们先详细看下 Idea #1。在这个模型中,woker会跟两个broker连接,从任何一个都可以接收任务。

f41

  这看起来是可行的。但,它没有提供我们想要的:client在可能的时候首先拿到本地的worker,在需要等的时候才去找其他集群的worker。并且worker可能给两个broker都发送了”准备就绪”的信号,然后一次得到两个任务,同时其他的worker还是空闲着。看起来该设计又失败了,因为我们又把路由逻辑放到了边缘。

  现在来看看Idea #2。我们只把broker之间相互连接,不去动client或者worker,这就像我们熟悉的REQ那样。

f42

  该设计很吸引人因为问题都在一个地方集中解决了,对其他地方都没影响。主要来说,broker会开启跟其他集群的秘密通道,悄悄说”嘿,我有一些多余的容量,如果你有太多client,叫我一声我们会帮你解决的。”

  事实上这是一个更复杂的路由算法:broker互相成为对方的次级承包商。在我们编写代码之前,关于该设计还有一些其他的事情要做:

  • 它对正常情况(client和worker在同一个集群中)采取默认处理,对非正常的情况(在集群之间转移任务)需要做些额外的工作。
  • 它能让我们为不同类型的工作处理不同的信息流。这意味这我们能分别处理它们,比如说,使用不同类型的网络连接。
  • 感觉上它能平滑的扩展。相互之间连接三个或更多个broker并不会过度复杂。如果发现这是个问题的化,也可以通过加一个超级broker去解决。

 现在我们要做一个能工作的例子了。我们会把整个集群都包装到一个进程中。那显然不是实际的样子,但它能让模拟更简单点儿,并且模拟能继续扩展成真正的多个进程。这正式zmq美妙的地方——你能在一个微小的规模上设计,然后把它扩展成一个庞大规模的系统。线程变成进程,然后成为物理节点,这期间模型和逻辑都不会改变。每个”集群”进程都包含client线程,worker线程和一个broker线程。

  到目前我们已经很好的了解了该系统的基础模型:

  • REQ client(REQ)线程创建任务并把它们发送给broker(ROUTER)。
  • REQ worker(REQ)线程处理任务并把结果返回给broker(ROUTER)。
  • broker用负载均衡模型把任务加进队列病分发出去。

联邦制vs对等连接(Federation Versus Peering)

  相互连接broker有几种可能的方法。我们想要的是能够告诉其他broker”我们有容量”,然后能收到多个任务。我们也需要能告诉别的broker”停,我们满载了”。它并不需要很完美:有时候我们可以接收一些不能立即处理的工作,然后再尽快去处理它们。

  最简单的互连方式是联邦制,在该方法中broker互相模拟成对方的client和worker。我们可以通过把一个broker的前端socket跟其他broker的后端socket相连实现。注意同时把一个socket绑定到一个端点上并把它连接到其他端点上是合法的。

f43

  这给了我们在两个broker都简单的逻辑和一个合理的机制:有(译注:此处原文为when there are no workers,应该是错的)worker的时候,就告诉其他broker自己”准备就绪”,然后从其他broker那里接收一个任务。但问题也是它太简单了。一个联邦制的broker只能一次处理一个任务。如果broker模拟成一个锁同步(lock-step)的client和worker,它当然就是锁同步的了,如果有很多空闲的worker也不会被用到(译注:因为锁同步的时候只有一次只能处理一个任务)。我们的broker需要全异步的连接。

  联邦模型对有些路由情景很有用,特别是服务导向架构(SOAs),该架构是根据服务名称和远近关系而不是负载均衡或者轮询来路由数据的。所以别认为它没有一点儿用,它只是对我们面对的情况不适用而已。

  抛开联邦模型,让我们看看对等连接的办法,在该办法中broker对其他每个broker都非常了解并通过特权通道跟它们通信。我们来详细看看,假设我们想互联N个broker。每个broker都有(N-1)个对端,所有的broker都有完全一样的代码和逻辑。在broker之间有两个独立的信息流:

  • 任何时候每个broker都需要告诉它的对端它现在有多少个可用的worker。这会是相当简单的信息——只是一个周期刷新的数量。显然(并且正确)的socket模型是pub-sub。所以每个broker启动一个PUB socket,并在它上边发布状态信息,每个broker同时启动一个SUB socket并且连接到每个其他broker的PUB socket上,然后从它的对端接收状态信息。
  • 每个broekr需要一种异步的把任务递送给一个对端然后得到回复的方法。我们用ROUTER socket做这件事:没有其他的组合能用。每个broker有两个这种socket:一个等着接收任务,另一个去委派任务。如果我们不使用两个socket,那在每个时刻都需要做更多的工作去判断我们是在读一个请求还是一个回复信息。那意味这需要在message信封中增加更多的控制信息。

  这里同样也有在broekr和它本地的client跟worker之间的信息流。

命名仪式

  在broker中我们必须管理三个信息流 X 每个流两个socket = 6 个socket。选择好的命名对一个多socket协作的程序至关重要。socket一些事情,它们做的事情应该是命名的基础。良好的命名能让你在几周之后某个寒冷周一早上读懂这些代码,并且不会感到很痛苦。

  让我们为socket举行一个萨满的命名仪式。这三个信息流是:

  • 一个local请求-回复流,在broker和它的client、worker之间。
  • 一个cloud请求-回复流,在broker和它对等的broker之间。
  • 一个状态流,在broekr和它对等的broker之间。

  找到那些有意义的并且等长的名字意味着我们的代码能很好的对齐。这并不是个大问题,但注意到细节会有帮助的。对每个信息流broker有两个socket,我们可以正交的称它们为frontendbackend。我们以前经常使用这两个名字。frontend接收信息或者任务,backend把它们发送给其他对端。概念上流是从前端流向后端(回复是反方向的)。

  在我们为这个指南里写的所有代码中,我们会使用以下这些socket名字:

  • localfelocalbe:local流。
  • cloudfecloudbe:cloud流。
  • statefestatebe:state流。

  对我们的传输协议来说,因为我们要在一个节点中模拟所有的东西,我们会都使用ipc协议。它有像tcp那样面向连接的优势(不像inproc那样是非连接的),并且我们也不需要IP地址或者DNS名字,这些在这里是让人痛苦的。相反,我们称ipc端点为something-local, something-cloud, something-state,这里something是我们模拟的集群的名字。

  你可能会以为命名很费事。为什么不直接叫它们s1,s2,s3,s4呢?答案是如果你的大脑不是个完美的机器,你就需要在读代码的时候有很多帮助,然后我们就会发现这些名字很有帮助。很容易记着”三个流,两个方向”而不是”六个不同的socket”。

f44

  注意我们把cloudbe连接到每个其他broker的cloudfe上,同理又把statebe连接到每个其他broker的statefe上。

状态流原型

  因为每个socket流因为粗心都会有它自己的一点陷阱,我们就需要在实际代码中一个一个检验,而不是把它们全弄出来再去检验。当我们对每个信息流都调试好以后,就可以把它们放到一个完整的程序中。

f45

  下面是它怎么工作的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//peering1: Prototype state flow in C
// Broker peering simulation (part 1)
// Prototypes the state flow

#include "czmq.h"

int main (int argc, char *argv [])
{
// First argument is this broker's name
// Other arguments are our peers' names
//
if (argc < 2) {
printf ("syntax: peering1 me {you}…\n");
return 0;
}
char *self = argv [1];
printf ("I: preparing broker at %s…\n", self);
srandom ((unsigned) time (NULL));

zctx_t *ctx = zctx_new ();

// Bind state backend to endpoint
void *statebe = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (statebe, "ipc://%s-state.ipc", self);

// Connect statefe to all peers
void *statefe = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (statefe, "");
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: connecting to state backend at '%s'\n", peer);
zsocket_connect (statefe, "ipc://%s-state.ipc", peer);
}
// The main loop sends out status messages to peers, and collects
// status messages back from peers. The zmq_poll timeout defines
// our own heartbeat:

while (true) {
// Poll for activity, or 1 second timeout
zmq_pollitem_t items [] = { { statefe, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

// Handle incoming status messages
if (items [0].revents & ZMQ_POLLIN) {
char *peer_name = zstr_recv (statefe);
char *available = zstr_recv (statefe);
printf ("%s - %s workers free\n", peer_name, available);
free (peer_name);
free (available);
}
else {
// Send random values for worker availability
zstr_sendm (statebe, self);
zstr_send (statebe, "%d", randof (10));
}
}
zctx_destroy (&ctx);
return EXIT_SUCCESS;
}

  注意这段代码:

  • 每个broker有一个标识,我们用来构建ipc端点名字。一个真实的broker需要用TCP并且会有一个更复杂的模式。我们会在本书稍后地方看看这种模式,但现在,使用自动生成的ipc名称会让我们忽略到哪儿去拿TCP/IP地址或名字的问题。
  • 我们使用一个zmq_poll()循环作为程序的核心。该循环处理进来的message和发送状态message。我们只在没有得到任何进入的信息并且已经等了1秒的时候才发送状态信息。如果我们每得到一个任务就发送一条状态信息,就会造成信息风暴。
  • 我们使用一个2帧的pub-sub message包装发送者的地址和数据。注意我们需要知道发布者的地址好给它发送任务,并且发送任务的唯一办法就是发送的message包含该确切地址。
  • 我们不在订阅者上边设置标识,因为如果我们这样做了,就会在连接到运行着的broker的时候得到过时的状态信息。
  • 我们没有在发布者上边设置HWM,但如果我们在用着zmq v2.x版本,那就是个明智的决定。

  现在编译这个小程序,让他跑三次,好模拟三个集群,分别称它们为DC1,DC2,DC3(名字是任意的)。我们在三个单独的窗口中分别跑这三行命令(每个窗口一行命令):

1
2
3
peering1 DC1 DC2 DC3  #  Start DC1 and connect to DC2 and DC3
peering1 DC2 DC1 DC3 # Start DC2 and connect to DC1 and DC3
peering1 DC3 DC1 DC2 # Start DC3 and connect to DC1 and DC2

  你会看到每个集群向对端报告自己的状态,过几秒之后它们就会每秒打印一次随机数字了。自己尝试一下,你会发现三个broker都会匹配同步到每秒更新一次状态。

  在实际中,我们并不会周期性的发送状态信息,只在我们状态改变的时候,比如说当一个worker变成可用或者不可用的时候才发送。这看起来会有很多的信息流,但状态信息很小,并且我们已经建立的集群间通信是超级快的。

如果我们想以确定的周期发送状态信息,可以写个子线程,在那个子线程启动statebe socket。然后我们在主线程不规律地把状态信息更新发给该子线程,让子线程去把它们合并成规律的出信息流。这比我们在这儿需要的要多些工作。

local和cloud流原型

  现在来设计通过本地和云端 socket的任务流原型。程序从客户端拉取请求,然后随机分发到本地worker或者云端对端。

f46

  这段代码会比较复杂,在我们了解代码之前,让我们先详细拆解一下核心路由逻辑,并把它精简到一个还算健壮的设计。

  我们需要两个队列,一个存储本地client的请求,一个存储云端client的请求,需要一个从本地和云端前端socket拉取message的操作,把它们存放到各自的队列中去。但这种操作是多余的,因为zmq本身就已经是自带队列的。所以,我们把zmq socket的缓存区当做队列使用好了。

    这就是我们在负载均衡broker中使用的技术,它能很好的工作。只当有地方发送请求的时候我们才从两个前端读取信息。我们会总是从后端读取信息,因为它们给我们的是路由返回来的回复。只要后端没有通知我们,就没有必要去查看前端。

  来看看我们的主循环:

  • 轮询后端是否活跃。当我们得到一条message,它可能是来自worker的”准备就绪”或者是一个回复。如果是回复,通过本地或者云端前端路由回去。
  • 如果一个worker回复了,它就成为可用的了,我们把它加进队列并计数。
  • 当有worker可用,可以获取一个请求;如果可用的worker,那请求就会路由到本地worker或者随机的一个云端对端去。

  随机的把任务分发到一个对端broekr而不是本地worker是通过集群模拟的任务分摊。它是静默的,但对该策略是良好的。

  我们用broker的标识来在broker之间路由message。该原型中每个broker都会有一个我们在命令行提供的名字。只要这些名字不跟zmq自动为client节点生成的UUID重复,我们就能确定到底是把回复路由给一个client还是一个broker。

  下面是代码怎么工作的。有意思的部分从”Interesting part”注释标注的地方开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// peering2: Prototype local and cloud flow in C
// Broker peering simulation (part 2)
// Prototypes the request-reply flow

#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define WORKER_READY "\001" // Signals worker is ready

// Our own name; in practice this would be configured per node
static char *self;

// The client task does a request-reply dialog using a standard
// synchronous REQ socket:

static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://%s-localfe.ipc", self);

while (true) {
// Send request, get reply
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupted
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}

// The worker task plugs into the load-balancer using a REQ
// socket:

static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://%s-localbe.ipc", self);

// Tell broker we're ready for work
zframe_t *frame = zframe_new (WORKER_READY, 1);
zframe_send (&frame, worker, 0);

// Process messages as they arrive
while (true) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted

zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}

// The main task begins by setting-up its frontend and backend sockets
// and then starting its client and worker tasks:

int main (int argc, char *argv [])
{
// First argument is this broker's name
// Other arguments are our peers' names
//
if (argc < 2) {
printf ("syntax: peering2 me {you}…\n");
return 0;
}
self = argv [1];
printf ("I: preparing broker at %s…\n", self);
srandom ((unsigned) time (NULL));

zctx_t *ctx = zctx_new ();

// Bind cloud frontend to endpoint
void *cloudfe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_set_identity (cloudfe, self);
zsocket_bind (cloudfe, "ipc://%s-cloud.ipc", self);

// Connect cloud backend to all peers
void *cloudbe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_set_identity (cloudbe, self);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: connecting to cloud frontend at '%s'\n", peer);
zsocket_connect (cloudbe, "ipc://%s-cloud.ipc", peer);
}
// Prepare local frontend and backend
void *localfe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localfe, "ipc://%s-localfe.ipc", self);
void *localbe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localbe, "ipc://%s-localbe.ipc", self);

// Get user to tell us when we can start…
printf ("Press Enter when all brokers are started: ");
getchar ();

// Start local workers
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (worker_task, NULL);

// Start local clients
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (client_task, NULL);

// Here, we handle the request-reply flow. We're using load-balancing
// to poll workers at all times, and clients only when there are one //
// or more workers available.//

// Least recently used queue of available workers
int capacity = 0;
zlist_t *workers = zlist_new ();

while (true) {
// First, route any waiting replies from workers
zmq_pollitem_t backends [] = {
{ localbe, 0, ZMQ_POLLIN, 0 },
{ cloudbe, 0, ZMQ_POLLIN, 0 }
};
// If we have no workers, wait indefinitely
int rc = zmq_poll (backends, 2,
capacity? 1000 * ZMQ_POLL_MSEC: -1);
if (rc == -1)
break; // Interrupted

// Handle reply from local worker
zmsg_t *msg = NULL;
if (backends [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localbe);
if (!msg)
break; // Interrupted
zframe_t *identity = zmsg_unwrap (msg);
zlist_append (workers, identity);
capacity++;

// If it's READY, don't route the message any further
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0)
zmsg_destroy (&msg);
}
// Or handle reply from peer broker
else
if (backends [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv (cloudbe);
if (!msg)
break; // Interrupted
// We don't use peer broker identity for anything
zframe_t *identity = zmsg_unwrap (msg);
zframe_destroy (&identity);
}
// Route reply to cloud if it's addressed to a broker
for (argn = 2; msg && argn < argc; argn++) {
char *data = (char *) zframe_data (zmsg_first (msg));
size_t size = zframe_size (zmsg_first (msg));
if (size == strlen (argv [argn])
&& memcmp (data, argv [argn], size) == 0)
zmsg_send (&msg, cloudfe);
}
// Route reply to client if we still need to
if (msg)
zmsg_send (&msg, localfe);

// Now we route as many client requests as we have worker capacity
// for. We may reroute requests from our local frontend, but not from //
// the cloud frontend. We reroute randomly now, just to test things
// out. In the next version, we'll do this properly by calculating
// cloud capacity://

while (capacity) {
zmq_pollitem_t frontends [] = {
{ localfe, 0, ZMQ_POLLIN, 0 },
{ cloudfe, 0, ZMQ_POLLIN, 0 }
};
rc = zmq_poll (frontends, 2, 0);
assert (rc >= 0);
int reroutable = 0;
// We'll do peer brokers first, to prevent starvation
if (frontends [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv (cloudfe);
reroutable = 0;
}
else
if (frontends [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localfe);
reroutable = 1;
}
else
break; // No work, go back to backends

// If reroutable, send to cloud 20% of the time
// Here we'd normally use cloud status information
//
if (reroutable && argc > 2 && randof (5) == 0) {
// Route to random broker peer
int peer = randof (argc - 2) + 2;
zmsg_pushmem (msg, argv [peer], strlen (argv [peer]));
zmsg_send (&msg, cloudbe);
}
else {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zmsg_wrap (msg, frame);
zmsg_send (&msg, localbe);
capacity--;
}
}
}
// When we're done, clean up properly
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return EXIT_SUCCESS;
}

  比如在两个窗口各启动一个实例来运行该原型:

1
2
peering2 me you
peering2 you me

  对这段代码的一些说明:

  • 在最后的C代码中,使用zmsg类让生活更美好,并让我们的代码更短。它显然是一种抽象。如果你用C去写zmq应用,你应该使用CZMQ。
  • 因为我们没从对端获取任何状态信息,我们默认它们是运行着的。当你要启动所有的broker的时候代码会提示你确认。在实际情况中,我们不会向那些没告诉我们它们存在的broker发送任何东西。

  你会发现这段代码会永远运行下去。如果有任何错误路由的message的时候,client会锁住,并且broker会打印追踪信息并停止。你可以通过断掉任何一个broker来证明。其他的broker会试着向云端发送请求,接着它的client一个接一个都被锁上,等着回复信息。

全部整合到一起

  让我们把这些都整合到一个单独的程序里去。跟以前一样,我们会把整个集群跑在一个进程中。我们要把先前的两个例子合并到一个合适的能工作的设计中去,让你能模拟任意数量的集群。

  该程序大小是前边两个原型的综合,大约270行代码。这样来模拟一个包含client和worker和云端负载分摊的集群相当好。下面是代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
// peering3: Full cluster simulation in C
// Broker peering simulation (part 3)
// Prototypes the full flow of status and tasks

#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 5
#define WORKER_READY "\001" // Signals worker is ready

// Our own name; in practice, this would be configured per node
static char *self;

// This is the client task. It issues a burst of requests and then
// sleeps for a few seconds. This simulates sporadic activity; when
// a number of clients are active at once, the local workers should
// be overloaded. The client uses a REQ socket for requests and also
// pushes statistics to the monitor socket:

static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://%s-localfe.ipc", self);
void *monitor = zsocket_new (ctx, ZMQ_PUSH);
zsocket_connect (monitor, "ipc://%s-monitor.ipc", self);

while (true) {
sleep (randof (5));
int burst = randof (15);
while (burst--) {
char task_id [5];
sprintf (task_id, "%04X", randof (0x10000));

// Send request with random hex ID
zstr_send (client, task_id);

// Wait max ten seconds for a reply, then complain
zmq_pollitem_t pollset [1] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (pollset, 1, 10 * 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

if (pollset [0].revents & ZMQ_POLLIN) {
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupted
// Worker is supposed to answer us with our task id
assert (streq (reply, task_id));
zstr_send (monitor, "%s", reply);
free (reply);
}
else {
zstr_send (monitor,
"E: CLIENT EXIT - lost task %s", task_id);
return NULL;
}
}
}
zctx_destroy (&ctx);
return NULL;
}

// This is the worker task, which uses a REQ socket to plug into the
// load-balancer. It's the same stub worker task that you've seen in
// other examples:

static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://%s-localbe.ipc", self);

// Tell broker we're ready for work
zframe_t *frame = zframe_new (WORKER_READY, 1);
zframe_send (&frame, worker, 0);

// Process messages as they arrive
while (true) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted

// Workers are busy for 0/1 seconds
sleep (randof (2));
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}

// The main task begins by setting up all its sockets. The local frontend
// talks to clients, and our local backend talks to workers. The cloud
// frontend talks to peer brokers as if they were clients, and the cloud
// backend talks to peer brokers as if they were workers. The state
// backend publishes regular state messages, and the state frontend
// subscribes to all state backends to collect these messages. Finally,
// we use a PULL monitor socket to collect printable messages from tasks:

int main (int argc, char *argv [])
{
// First argument is this broker's name
// Other arguments are our peers' names
if (argc < 2) {
printf ("syntax: peering3 me {you}…\n");
return 0;
}
self = argv [1];
printf ("I: preparing broker at %s…\n", self);
srandom ((unsigned) time (NULL));

// Prepare local frontend and backend
zctx_t *ctx = zctx_new ();
void *localfe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localfe, "ipc://%s-localfe.ipc", self);

void *localbe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localbe, "ipc://%s-localbe.ipc", self);

// Bind cloud frontend to endpoint
void *cloudfe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_set_identity (cloudfe, self);
zsocket_bind (cloudfe, "ipc://%s-cloud.ipc", self);

// Connect cloud backend to all peers
void *cloudbe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_set_identity (cloudbe, self);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: connecting to cloud frontend at '%s'\n", peer);
zsocket_connect (cloudbe, "ipc://%s-cloud.ipc", peer);
}
// Bind state backend to endpoint
void *statebe = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (statebe, "ipc://%s-state.ipc", self);

// Connect state frontend to all peers
void *statefe = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (statefe, "");
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: connecting to state backend at '%s'\n", peer);
zsocket_connect (statefe, "ipc://%s-state.ipc", peer);
}
// Prepare monitor socket
void *monitor = zsocket_new (ctx, ZMQ_PULL);
zsocket_bind (monitor, "ipc://%s-monitor.ipc", self);

// After binding and connecting all our sockets, we start our child
// tasks - workers and clients:

int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (worker_task, NULL);

// Start local clients
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (client_task, NULL);

// Queue of available workers
int local_capacity = 0;
int cloud_capacity = 0;
zlist_t *workers = zlist_new ();

// The main loop has two parts. First, we poll workers and our two service
// sockets (statefe and monitor), in any case. If we have no ready workers,
// then there's no point in looking at incoming requests. These can remain //
// on their internal 0MQ queues://

while (true) {
zmq_pollitem_t primary [] = {
{ localbe, 0, ZMQ_POLLIN, 0 },
{ cloudbe, 0, ZMQ_POLLIN, 0 },
{ statefe, 0, ZMQ_POLLIN, 0 },
{ monitor, 0, ZMQ_POLLIN, 0 }
};
// If we have no workers ready, wait indefinitely
int rc = zmq_poll (primary, 4,
local_capacity? 1000 * ZMQ_POLL_MSEC: -1);
if (rc == -1)
break; // Interrupted

// Track if capacity changes during this iteration
int previous = local_capacity;
zmsg_t *msg = NULL; // Reply from local worker

if (primary [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localbe);
if (!msg)
break; // Interrupted
zframe_t *identity = zmsg_unwrap (msg);
zlist_append (workers, identity);
local_capacity++;

// If it's READY, don't route the message any further
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0)
zmsg_destroy (&msg);
}
// Or handle reply from peer broker
else
if (primary [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv (cloudbe);
if (!msg)
break; // Interrupted
// We don't use peer broker identity for anything
zframe_t *identity = zmsg_unwrap (msg);
zframe_destroy (&identity);
}
// Route reply to cloud if it's addressed to a broker
for (argn = 2; msg && argn < argc; argn++) {
char *data = (char *) zframe_data (zmsg_first (msg));
size_t size = zframe_size (zmsg_first (msg));
if (size == strlen (argv [argn])
&& memcmp (data, argv [argn], size) == 0)
zmsg_send (&msg, cloudfe);
}
// Route reply to client if we still need to
if (msg)
zmsg_send (&msg, localfe);

// If we have input messages on our statefe or monitor sockets, we
// can process these immediately:

if (primary [2].revents & ZMQ_POLLIN) {
char *peer = zstr_recv (statefe);
char *status = zstr_recv (statefe);
cloud_capacity = atoi (status);
free (peer);
free (status);
}
if (primary [3].revents & ZMQ_POLLIN) {
char *status = zstr_recv (monitor);
printf ("%s\n", status);
free (status);
}
// Now route as many clients requests as we can handle. If we have
// local capacity, we poll both localfe and cloudfe. If we have cloud
// capacity only, we poll just localfe. We route any request locally
// if we can, else we route to the cloud.

while (local_capacity + cloud_capacity) {
zmq_pollitem_t secondary [] = {
{ localfe, 0, ZMQ_POLLIN, 0 },
{ cloudfe, 0, ZMQ_POLLIN, 0 }
};
if (local_capacity)
rc = zmq_poll (secondary, 2, 0);
else
rc = zmq_poll (secondary, 1, 0);
assert (rc >= 0);

if (secondary [0].revents & ZMQ_POLLIN)
msg = zmsg_recv (localfe);
else
if (secondary [1].revents & ZMQ_POLLIN)
msg = zmsg_recv (cloudfe);
else
break; // No work, go back to primary

if (local_capacity) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zmsg_wrap (msg, frame);
zmsg_send (&msg, localbe);
local_capacity--;
}
else {
// Route to random broker peer
int peer = randof (argc - 2) + 2;
zmsg_pushmem (msg, argv [peer], strlen (argv [peer]));
zmsg_send (&msg, cloudbe);
}
}
// We broadcast capacity messages to other peers; to reduce chatter,
// we do this only if our capacity changed.

if (local_capacity != previous) {
// We stick our own identity onto the envelope
zstr_sendm (statebe, self);
// Broadcast new capacity
zstr_send (statebe, "%d", local_capacity);
}
}
// When we're done, clean up properly
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return EXIT_SUCCESS;
}

这是个重要的程序,大约花费了一天的时间让它工作起来。下面是一些重点:

  • client线程检测和报告一个失败的请求。它们会轮询一个回复,如果在一个循环(10秒)结束还没收到回复,就打印一个错误信息。

  • client线程并不直接打印,而是向一个监控socket(PUSH)发送一条信息,主循环再去收集(PULL)这些信息并打印出来。这是我们看到的第一个用zmq socket做监控和日志记录的例子:这是个很大的应用场景,我们稍后会详细介绍。

  • client会模拟不同的负载任务,让集群能在某个随机时刻100%负荷,好让任务能转移到云端。client和worker的数量以及在client和worker线程之间的延迟控制着这些。多试试,看你能不能做出个更贴近实际的模拟系统。

  • 主循环使用了两个轮询集。事实上可以用三个的:信息流,前端和后端。但在早期原型中,如果后端没有容量那前端就没必要接收信息了。

    在开发过程中出现了一些新的问题:

  • client会被冻结,原因是请求或者回复在某个地方丢了。回忆下ROUTER socket会把那些它不能路由的信息丢掉。这里第一个办法是调整client线程,让它去检测并报告这样的问题。第二,我在主循环中每次接收和每次发送message之前都调用了zmsg_dump(),直到该问题解决才清理。

  • 主循环会错误的从不止一个准备就绪的socket中读取信息。这会造成第一个信息丢失。我通过只读第一个准备好的socket修复了该问题。

  • zmsg类没有正确的把UUID编码为C字符串。这使UUID只有0字节长度。我通过修改zmsg让它把UUID编码成可打印的十六进制字符串修复了这个问题。

    这个模拟没有检测一个云端对端退出的情况。如果你启动了好几个对端,然后停掉其中一个,因为它可能已经通知了其他broker它有容量,其他broker在它退出之后仍向它发送任务。你可以试试,然后会发现client报告丢失了请求。解决办法有两个:第一个,指把容量信息保存很短的时间,那如果一个对端退出,它的容量信息也能很快归零。第二种,增加爱可靠的请求-回复链。我们会在下一章了解可靠性的问题。