0%

zmq中文指南_4

第4章-可靠的请求-回复模型

  第三章高级请求-回复模型讲述了zmq的请求-回复模型在实际工作例子中的高级用法。本章来看看可靠性的普遍问题,并在zmq的核心请求-回复模型的基础之上构建一套可靠的消息传输模型。

  本章我们会着眼于用户空间的请求回复模型(pattern),以及帮助设计你自己的zmq架构的可复用模型(model):

  • *懒海盗(Lazy Pirate)*模型:client侧的可靠的请求-回复
  • *简单海盗(Simple Pirate)*模型:使用负载均衡的可靠的请求-回复
  • *偏执海盗(Paranoid Pirate)*模型:使用心跳的可靠的请求-回复
  • *管家(Majordome)*模型:服务导向的可靠的队列
  • *泰坦尼克(Titanic)*模型:硬件基础的/非连接的可靠的队列
  • *双子星(Binary Star)*模型:主备份服务器故障转移
  • *自由者(Freelance)*模型:无中介(broker)的可靠的请求-回复

什么是”可靠性”?

  大多数人说”可靠性”但并不知道它们的意思。我们只能针对故障的处理来定义可靠性。也就是说,如果我们能处理一套良好定义和理解的故障,那我们就可以说系统对这些故障是可靠的。不多也不会少。所以我们来看下在一个分布式的zmq应用中可能出现的失败有那些,下面大致按照发生的可能性递减排序的:

  • 应用代码是最坏的罪犯。它能崩溃或者退出,僵死或者停止回应,对输入运行的太慢,内存泄露等等。
  • 系统代码——比如我们用zmq写的broker——也可能跟应用代码因为一样的原因死掉。系统代码应该比应用代码更可靠,但它也是能崩溃或者死掉,特别是储存慢连接的message的时候耗尽内存。
  • 消息队列可能溢出,特别是系统代码试着暴力处理慢client的消息。当一个消息队列溢出,它就开始丢弃消息。所以我们会的到”丢失”的消息。
  • 网络可能失效(比如说,WiFi转换或者超出范围了)。zmq在这种情况下会自动重连,但同时message可能会丢失。
  • 硬件可能失效,跑在那台机上的所有进程都会挂掉。
  • 网络可能因为外界的原因失效,比如说交换机上的一些端口会死掉然后网络上的这些部分就会不可抵达。
  • 整个数据中心可能遭雷击、地震、火灾或者电压过压后者冷却失效。

  让一个软件系统能应对所有这些可能的故障是件异常困难且昂贵的工作,这超出了本书的范围。

  因为上边列表中前五个情况覆盖了差不多99.9%的现实世界的问题(根据我做的一个高度系统的研究,该研究同时告诉我78%的统计数据是现场编造的,而且在你没证明自己错之前不要相信统计数据),这就是我们要检查的。如果你在一个很有钱的大公司,想做好后两种情况,请尽快联系我的公司!我家后院还有个大洞等着弄成个游泳池的。

设计可靠性

  为了把事情弄的简单点儿,我们定义可靠性就是”在代码僵死或崩溃的时候继续保持系统正常工作”,一种系统就快要”死掉”的情形。但我们想保持工作正常这件事要比消息传输复杂的多。我们需要检查每个zmq核心消息模型,看看到底怎在代码要死掉的时候让它正常工作(如果我们可以的话)。

  一个一个来看:

  • 请求-回复:如果server死掉(在处理一个请求的时候),client能检测出来,因为它拿不到回复了。然后它会在愤怒中放弃,等待,稍后重试,找另一个server,等等。如果client死掉,我们现在把它归到”别人的问题”中去。
  • 发布-订阅:如果clinet死掉(已经拿到了一些数据),server是不会知道的。发布-订阅模型不会从client向server发送任何信息。但client能用其他通道跟server连接,比如说通过请求-回复模型,去询问”请重发我错过的东西”。如果server死掉,就超出这里讨论的范围了。订阅者也能自己检测到是不是运行的太慢,如果太慢就去做一些处理(比如通知管理员然后退出)。
  • 管道:如果一个worker死掉(在工作时),ventilator是不会知道的。管道像时间一个只会流向一个方向。但下游收集者能检测到有个任务没完成,就会给ventilator返回个信息说”嘿,重新发送324号任务!”如果是ventilator或者收集者挂掉了,不管上游client开始发发送了什么任务作业,它都会等待超时并且重新发送全部任务。这并不优雅,但系统代码真的不应该经常死掉,我也不会一直考虑这个问题。

  本章我们只关注请求-回复模型,这是可靠消息机制最容易实现的模型。

  基本的请求-回复模型(REQ client跟一个REP server做阻塞的发送/接收动作)在处理最常见的故障类型的时候很局限。如果server在处理请求的时候崩溃了,那client就会永远停到那儿。如果网络把请求或者回复信息给弄丢了,client也会永远停在那儿。

  请求-回复模型仍然比TCP好得多,得益于zmq的自动重连,负载均衡等机制。但这对真实的工作仍然不够好。唯一你能真正相信基本的请求-回复模型的地方就是在同一个进程中的两个线程之间。因为这里不用网络失效或者独立的server进程死掉。

  但,通过增加点儿额外的工作,这个简陋的模型就会变成真实情景中分布式网络的一个良好基础,然后我们会得到一套可靠的请求-回复(reliable request-reply:RRR)模型,我喜欢称之为海盗模型(我希望你最终会知道这个笑话的~)。

  以我的经验,大致有三种方法去连接client和server。每种都需要特定的可靠性的措施:

  • 多client直接跟一个单独的server连接。使用场景:一个单独的已知server,client需要去跟它通信。我们需要处理的故障类型为:server崩溃和重启,网络断开。
  • 多client连接到一个broker代理,该代理把任务分发给多个worker。使用场景:面向服务的事务处理。我们需要处理的故障类型为:worker崩溃和重启,worker太忙,worker过载,队列崩溃和重启,网络断开。
  • 多client连接到多server,没有使用中间代理。使用场景:分布式服务比如名字检索。我们需要处理的故障类型为:server崩溃或重启,server太忙,server过载,网络断开。

  每种方法都有利有弊,常常需要你综合使用它们。我们会详细说明这三种方法。

client侧可靠性(懒海盗模型 Lazy Pirate Pattern)

  通过改变一下client,我们就能得到非常简单的可靠的请求-回复模型。我们称它为懒海盗模型。我们不去做一个阻塞的接收,而是:

  • 轮询REQ socket,指在真的有回复到达的时候才读取。
  • 如果回复没在预定时间内到达,就重发请求。
  • 重试几次仍没有回复,就放弃该连接。

  如果你使用REQ socket但是没有遵循严格的发送/接收顺序,就会得到错误(技术上来说,REQ socket实现了一个小型有限状态机来强制发送/请求的ping-pong,因此错误代码被称作”EFSM”)。在海盗模型中如果使用REQ socket就会有点儿恼人,因为我们可能会在收到回复之前发送好几个请求。

  简单正确的做法是在检测到一个错误后就关掉并重启REQ socket:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//lpclient: Lazy Pirate client in C
// Lazy Pirate client
// Use zmq_poll to do a safe request-reply
// To run, start lpserver and then randomly kill/restart it

#include "czmq.h"
#define REQUEST_TIMEOUT 2500 // msecs, (> 1000!)
#define REQUEST_RETRIES 3 // Before we abandon
#define SERVER_ENDPOINT "tcp://localhost:5555"

int main (void)
{
zctx_t *ctx = zctx_new ();
printf ("I: connecting to server…\n");
void *client = zsocket_new (ctx, ZMQ_REQ);
assert (client);
zsocket_connect (client, SERVER_ENDPOINT);

int sequence = 0;
int retries_left = REQUEST_RETRIES;
while (retries_left && !zctx_interrupted) {
// We send a request, then we work to get a reply
char request [10];
sprintf (request, "%d", ++sequence);
zstr_send (client, request);

int expect_reply = 1;
while (expect_reply) {
// Poll socket for a reply, with timeout
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

// Here we process a server reply and exit our loop if the
// reply is valid. If we didn't a reply we close the client
// socket and resend the request. We try a number of times
// before finally abandoning:

if (items [0].revents & ZMQ_POLLIN) {
// We got a reply from the server, must match sequence
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupted
if (atoi (reply) == sequence) {
printf ("I: server replied OK (%s)\n", reply);
retries_left = REQUEST_RETRIES;
expect_reply = 0;
}
else
printf ("E: malformed reply from server: %s\n",
reply);

free (reply);
}
else
if (--retries_left == 0) {
printf ("E: server seems to be offline, abandoning\n");
break;
}
else {
printf ("W: no response from server, retrying…\n");
// Old socket is confused; close it and open a new one
zsocket_destroy (ctx, client);
printf ("I: reconnecting to server…\n");
client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, SERVER_ENDPOINT);
// Send request again, on new socket
zstr_send (client, request);
}
}
}
zctx_destroy (&ctx);
return 0;
}

  匹配的server程序为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// lpserver: Lazy Pirate server in C
// Lazy Pirate server
// Binds REQ socket to tcp://*:5555
// Like hwserver except:
// - echoes request as-is
// - randomly runs slowly, or exits to simulate a crash.

#include "zhelpers.h"

int main (void)
{
srandom ((unsigned) time (NULL));

void *context = zmq_ctx_new ();
void *server = zmq_socket (context, ZMQ_REP);
zmq_bind (server, "tcp://*:5555");

int cycles = 0;
while (1) {
char *request = s_recv (server);
cycles++;

// Simulate various problems, after a few cycles
if (cycles > 3 && randof (3) == 0) {
printf ("I: simulating a crash\n");
break;
}
else
if (cycles > 3 && randof (3) == 0) {
printf ("I: simulating CPU overload\n");
sleep (2);
}
printf ("I: normal request (%s)\n", request);
sleep (1); // Do some heavy work
s_send (server, request);
free (request);
}
zmq_close (server);
zmq_ctx_destroy (context);
return 0;
}

f47

  要运行该测试,需要在两个终端分别启动client和server。server会在几条message之后随机失效,你可以检查client的回复,下面是server的一个典型输出:

1
2
3
4
5
6
I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash

  这里是client的回复:

1
2
3
4
5
6
7
8
9
I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning

  client把每条message都入队并顺序检查回复:那些没有回复或者回复丢失的,那些回复超过一次的,或者乱序的。多运行几次这个测试,直到你能确信该机制能真正工作。你不需要在一个产品级应用中使用顺序计数,它们只是帮我们确认我们的设计。

  client使用REQ socket,并且强制性关闭/重启,因为REQ socket强制要求严格的发送/接收循环。你可以用DEALER代替,但这并不是一个良好的设计。首先,这意味着你需要模拟REQ对信封的操作(如果你忘了那是什么,不要紧,这里的好消息是你不用必须去这样做。译注:翻译的表达不出原文的味道,有点儿问题,需复查:if you’ve forgotten what that is, it’s a good sign you don’t want to have to it)。其次,这意味这可能会收到你不希望收到的回复。

  当我们有一组client连接一个单独的server的时候,只在client处理错误是可行的。它能处理server崩溃的问题,但只是在恢复指的是重启同一个server的情况下。如果这是个永久性的故障,比如说server硬件的断电,这种方法就不行了。因为在server的应用代码不管在哪种架构中都是最大的故障来源,依赖一个单独的server并不是个好主意。

  因此,利弊如下:

  • 利:易于理解和实现。
    - 利:跟现有的client和server应用代码很容易配合。
  • 利:zmq会自动重连知道它能正常工作。
  • 弊:不会故障转移到备份或者替代server上。

基本可靠队列(简单海盗模型Simple Pirate Pattern)

  第二种方法用一个队列代理扩展了懒海盗模型,好让我们能透明的访问多个server,这里能更准确的称作”worker”。我们从一个最小的能工作的模型——简单海盗模型——开始。

  在所有这些海盗模型中,worker都是无状态的。如果系统需要共享一些状态,比如说共享数据库,在我们设计消息传输框架的时候是不知道的。有个队列代理意味着worker能随时增减,不需要client知道关于worker的任何情况。如果一个worker挂掉了,另一个就会接替上。这是个简单良好的拓扑结构,只有一个缺点,就是中心队列它自己,它可能是个管理上的问题,且是个单点故障。

f48

  队列代理的基础是第三章中说到的负载均衡中介。我们需要做的关于处理死掉或者阻塞住的worker的工作最少是多少呢?答案是非常少。我们在client已经有了一套重试机制。素有使用负载均衡模型会工作的相当好。这也符合zmq的哲学,我们可以通过在中间插上一个原生的代理把端对端模型扩展成像请求-回复模型那种。

  我们不需要一个特别的client:还是用懒海盗模型中的。下面是队列的代码,这部分是对负载均衡中介的主要任务是特殊定制的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// spqueue: Simple Pirate queue in C
// Simple Pirate broker
// This is identical to load-balancing pattern, with no reliability
// mechanisms. It depends on the client for recovery. Runs forever.

#include "czmq.h"
#define WORKER_READY "\001" // Signals worker is ready

int main (void)
{
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555"); // For clients
zsocket_bind (backend, "tcp://*:5556"); // For workers

// Queue of available workers
zlist_t *workers = zlist_new ();

// The body of this example is exactly the same as lbbroker2.
while (true) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
if (rc == -1)
break; // Interrupted

// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
// Use worker identity for load-balancing
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // Interrupted
zframe_t *identity = zmsg_unwrap (msg);
zlist_append (workers, identity);

// Forward message to client if it's not a READY
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// Get client request, route to first available worker
zmsg_t *msg = zmsg_recv (frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
zmsg_send (&msg, backend);
}
}
}
// When we're done, clean up properly
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
}

  下面是worker代码,它使用了懒海盗模型的server,并为负载均衡模型做了适配(使用REQ “ready”信号通知):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// spworker: Simple Pirate worker in C
// Simple Pirate worker
// Connects REQ socket to tcp://*:5556
// Implements worker part of load-balancing

#include "czmq.h"
#define WORKER_READY "\001" // Signals worker is ready

int main (void)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);

// Set random identity to make tracing easier
srandom ((unsigned) time (NULL));
char identity [10];
sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
zmq_setsockopt (worker, ZMQ_IDENTITY, identity, strlen (identity));
zsocket_connect (worker, "tcp://localhost:5556");

// Tell broker we're ready for work
printf ("I: (%s) worker ready\n", identity);
zframe_t *frame = zframe_new (WORKER_READY, 1);
zframe_send (&frame, worker, 0);

int cycles = 0;
while (true) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted

// Simulate various problems, after a few cycles
cycles++;
if (cycles > 3 && randof (5) == 0) {
printf ("I: (%s) simulating a crash\n", identity);
zmsg_destroy (&msg);
break;
}
else
if (cycles > 3 && randof (5) == 0) {
printf ("I: (%s) simulating CPU overload\n", identity);
sleep (3);
if (zctx_interrupted)
break;
}
printf ("I: (%s) normal reply\n", identity);
sleep (1); // Do some heavy work
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return 0;
}

  要测试该模型,按任意顺序启动一组worker,一个懒海盗client,和队列。你会看到worker到最后都崩溃掉了,然后clinet就重试然后放弃。队列不会停止,你可以反复重启worker和client,该模型能用于任意数量的client和worker。

健壮的可靠队列(偏执海盗模型 Paranoid Pirate Pattern)

f49

  简单海盗队列模型工作的相当好,特别是因为它只是两个现有模型的组合。但,它仍有一些不足:

  • 在面对队列崩溃和重启这个问题上它不具备健壮性。client会恢复,但worker不会。尽管zmq会自动重连worker的socket,但在新启动的线程检测的时候,worker病没有发送准备就绪的信号,所以不存在。要修复这个问题,我们必须建立队列到worker之间心跳机制,以便让wokrer能检测到队列已经掉线。
  • 队列不会检测worker的故障,所以如果一个worker在空闲的时候断掉了,队列在发送请求之前并不会把它移除。client等待并重试,但什么都没有。这不是一个严重的问题,但不没有。要让它能正常工作,我们建立worker到队列的心跳,好让队列能在任何状态下检测到一个掉线的worker。

  我们会在一个可能有点儿迂腐的偏执海盗模型中修复这些问题。

  先前我们在worker上用了一个REQ socket。对偏执海盗的worker,我们会换成DEALER socket。这让我们能在任何时候都能发送和接收信息,而不是严格执行REQ强制要求的同步发送/接收。DEALER一点不好的是我们必须自己管理信封(重读第三章去了解这个概念)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// ppqueue: Paranoid Pirate queue in C
// Paranoid Pirate queue

#include "czmq.h"
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs

// Paranoid Pirate Protocol constants
#define PPP_READY "\001" // Signals worker is ready
#define PPP_HEARTBEAT "\002" // Signals worker heartbeat

// Here we define the worker class; a structure and a set of functions that
// act as constructor, destructor, and methods on worker objects:

typedef struct {
zframe_t *identity; // Identity of worker
char *id_string; // Printable identity
int64_t expiry; // Expires at this time
} worker_t;

// Construct new worker
static worker_t *
s_worker_new (zframe_t *identity)
{
worker_t *self = (worker_t *) zmalloc (sizeof (worker_t));
self->identity = identity;
self->id_string = zframe_strhex (identity);
self->expiry = zclock_time ()
+ HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
return self;
}

// Destroy specified worker object, including identity frame.
static void
s_worker_destroy (worker_t **self_p)
{
assert (self_p);
if (*self_p) {
worker_t *self = *self_p;
zframe_destroy (&self->identity);
free (self->id_string);
free (self);
*self_p = NULL;
}
}

// The ready method puts a worker to the end of the ready list:

static void
s_worker_ready (worker_t *self, zlist_t *workers)
{
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
if (streq (self->id_string, worker->id_string)) {
zlist_remove (workers, worker);
s_worker_destroy (&worker);
break;
}
worker = (worker_t *) zlist_next (workers);
}
zlist_append (workers, self);
}

// The next method returns the next available worker identity:

static zframe_t *
s_workers_next (zlist_t *workers)
{
worker_t *worker = zlist_pop (workers);
assert (worker);
zframe_t *frame = worker->identity;
worker->identity = NULL;
s_worker_destroy (&worker);
return frame;
}

// The purge method looks for and kills expired workers. We hold workers
// from oldest to most recent, so we stop at the first alive worker:

static void
s_workers_purge (zlist_t *workers)
{
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
if (zclock_time () < worker->expiry)
break; // Worker is alive, we're done here

zlist_remove (workers, worker);
s_worker_destroy (&worker);
worker = (worker_t *) zlist_first (workers);
}
}

// The main task is a load-balancer with heartbeating on workers so we
// can detect crashed or blocked worker tasks:

int main (void)
{
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555"); // For clients
zsocket_bind (backend, "tcp://*:5556"); // For workers

// List of available workers
zlist_t *workers = zlist_new ();

// Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;

while (true) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
int rc = zmq_poll (items, zlist_size (workers)? 2: 1,
HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
// Use worker identity for load-balancing
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // Interrupted

// Any sign of life from worker means it's ready
zframe_t *identity = zmsg_unwrap (msg);
worker_t *worker = s_worker_new (identity);
s_worker_ready (worker, workers);

// Validate control message, or return reply to client
if (zmsg_size (msg) == 1) {
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), PPP_READY, 1)
&& memcmp (zframe_data (frame), PPP_HEARTBEAT, 1)) {
printf ("E: invalid message from worker");
zmsg_dump (msg);
}
zmsg_destroy (&msg);
}
else
zmsg_send (&msg, frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// Now get next client request, route to next worker
zmsg_t *msg = zmsg_recv (frontend);
if (!msg)
break; // Interrupted
zmsg_push (msg, s_workers_next (workers));
zmsg_send (&msg, backend);
}
// We handle heartbeating after any socket activity. First, we send
// heartbeats to any idle workers if it's time. Then, we purge any
// dead workers:
if (zclock_time () >= heartbeat_at) {
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
zframe_send (&worker->identity, backend,
ZFRAME_REUSE + ZFRAME_MORE);
zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
zframe_send (&frame, backend, 0);
worker = (worker_t *) zlist_next (workers);
}
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
}
s_workers_purge (workers);
}
// When we're done, clean up properly
while (zlist_size (workers)) {
worker_t *worker = (worker_t *) zlist_pop (workers);
s_worker_destroy (&worker);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
}

  该队列用worker的心跳扩展了负载均衡模型。心跳是一种”简单”但很难正确做出来的东西。稍后我会更详细的解释一下。

  下面是偏执海盗模型的worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// ppworker: Paranoid Pirate worker in C
// Paranoid Pirate worker

#include "czmq.h"
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
#define INTERVAL_INIT 1000 // Initial reconnect
#define INTERVAL_MAX 32000 // After exponential backoff

// Paranoid Pirate Protocol constants
#define PPP_READY "\001" // Signals worker is ready
#define PPP_HEARTBEAT "\002" // Signals worker heartbeat

// Helper function that returns a new configured socket
// connected to the Paranoid Pirate queue

static void *
s_worker_socket (zctx_t *ctx) {
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (worker, "tcp://localhost:5556");

// Tell queue we're ready for work
printf ("I: worker ready\n");
zframe_t *frame = zframe_new (PPP_READY, 1);
zframe_send (&frame, worker, 0);

return worker;
}

// We have a single task that implements the worker side of the
// Paranoid Pirate Protocol (PPP). The interesting parts here are
// the heartbeating, which lets the worker detect if the queue has
// died, and vice versa:

int main (void)
{
zctx_t *ctx = zctx_new ();
void *worker = s_worker_socket (ctx);

// If liveness hits zero, queue is considered disconnected
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;

// Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;

srandom ((unsigned) time (NULL));
int cycles = 0;
while (true) {
zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

if (items [0].revents & ZMQ_POLLIN) {
// Get message
// - 3-part envelope + content -> request
// - 1-part HEARTBEAT -> heartbeat
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted

// To test the robustness of the queue implementation we //
// simulate various typical problems, such as the worker
// crashing or running very slowly. We do this after a few
// cycles so that the architecture can get up and running
// first:
if (zmsg_size (msg) == 3) {
cycles++;
if (cycles > 3 && randof (5) == 0) {
printf ("I: simulating a crash\n");
zmsg_destroy (&msg);
break;
}
else
if (cycles > 3 && randof (5) == 0) {
printf ("I: simulating CPU overload\n");
sleep (3);
if (zctx_interrupted)
break;
}
printf ("I: normal reply\n");
zmsg_send (&msg, worker);
liveness = HEARTBEAT_LIVENESS;
sleep (1); // Do some heavy work
if (zctx_interrupted)
break;
}
else
// When we get a heartbeat message from the queue, it means the
// queue was (recently) alive, so we must reset our liveness
// indicator:
if (zmsg_size (msg) == 1) {
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), PPP_HEARTBEAT, 1) == 0)
liveness = HEARTBEAT_LIVENESS;
else {
printf ("E: invalid message\n");
zmsg_dump (msg);
}
zmsg_destroy (&msg);
}
else {
printf ("E: invalid message\n");
zmsg_dump (msg);
}
interval = INTERVAL_INIT;
}
else
// If the queue hasn't sent us heartbeats in a while, destroy the
// socket and reconnect. This is the simplest most brutal way of
// discarding any messages we might have sent in the meantime://
if (--liveness == 0) {
printf ("W: heartbeat failure, can't reach queue\n");
printf ("W: reconnecting in %zd msec…\n", interval);
zclock_sleep (interval);

if (interval < INTERVAL_MAX)
interval *= 2;
zsocket_destroy (ctx, worker);
worker = s_worker_socket (ctx);
liveness = HEARTBEAT_LIVENESS;
}
// Send heartbeat to queue if it's time
if (zclock_time () > heartbeat_at) {
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
printf ("I: worker heartbeat\n");
zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
zframe_send (&frame, worker, 0);
}
}
zctx_destroy (&ctx);
return 0;
}

  对这个例子的一些解释:

  • 跟以前一样,代码里包含了故障模拟的部分。这让它(a)很难去调试,并且(b)复用的时候很危险。当你想调试它的时候,先把故障模拟给关掉。
  • worker使用了跟我们为懒海盗设计的client相似的重连机制,主要有两个不同:(a)它会做指数级回退(back-off),(b)它会无限重试(而client会在重试几次之后报告一个错误)。

  用以下的脚本试试client,queue和worker:

1
2
3
4
5
6
ppqueue &
for i in 1 2 3 4; do
ppworker &
sleep 1
done
lpclient &

  你应该能看到worker模拟崩溃然后一个接一个断掉,client最终放弃重试。你可以停掉和重启队列,client和worker会自动重连并继续工作。不管你对队列和worker做了什么,client永远不会得到一个乱序的回复:整个链路要不就正常工作,要不就client放弃重连。

心跳

  心跳解决知道一个对端是否在线的问题。这并不是专门针对zmq的问题。TCP有一个很长的超时时间(大概30min),这以为着它不可能知道对端是否死掉或者断线。

  正确实现心跳很不容易。在写偏执海盗例子的时候,我大概花费了5个小时来调试。剩下的请求-回复链大概只用了10分钟。很容易就创造了”故障”,比如说因为心跳没有正确发送,对端就检测到它们断线了。

  我们在这里看看人们在zmq中使用心跳的三个主要答案。

摆脱掉它

  最常见的办法就是不去做心跳,期望系统能正常工作。如果不是大多数就是有很多zmq应用这样做。在很多情景中zmq通过隐藏对端鼓励这样做(译注:这句话感觉不是这样的。ZeroMq encourages this by hiding peers in many cases.)。这样做会引起什么问题呢?

  • 我们在应用中使用ROUTER socket的时候,它会记录对端,如果对端断开又重连,应用就会有内存泄露(该应用为每个对端保持的资源),然后变得越来越慢。
  • 当我们用SUB或者DEALER为基础的数据接收socket,就不能分辨出哪些是正常的沉默(没有数据发送),那些是不正常的沉默(对端掉线或死掉)。如果一个接收者知道对端挂掉,那它就可能会转向一个备份路由了。
  • 如果你在用一个TCP连接,如果沉默了很长时间,那在一些网络中它就会自动挂掉。发送一些东西(技术上来说是”keep-alive”包而不是心跳),会保持网络存活。

单向心跳

  第二个选择是从一端每隔大约1s向对端发送一个心跳message。当对端在超时时间(比如几秒)内没有收到心跳,它就会认为对端死掉了。听起来很好,是吗?伤感的说并不是。这会在一些情况下正常工作,但在另一些情况下会有很多恼人的边缘状况。

  对发布-订阅模式来说,这可以工作,这也是唯一能用单向心跳的模型。SUB socket不能反过来跟PUB socket通信,但PUB socket能向它的订阅者发送”我还在线”的信息。

  优化一下的话,就是你可以只在没有数据发送的时候发送心跳。更进一步,如果网络活动是个问题的话(比如说移动网络中连接活动会需要电量),你可以越来越慢的发送心跳。只要接收者能检测到一个故障(心跳活动的突然终止),就是好的。

  这种设计会有几个特别的问题:

  • 在发送大量数据的时候会不准确,因为心跳包可能跟在数据后被延迟接收。如果心跳被延迟了,你可能得到错误的超时时间,并由于网络阻塞而断开连接。因此,总是把任何进入的数据都当成心跳,不管发送者有没有对心跳做优化。
  • 发布-订阅模型会在没有接收者的时候丢掉message,但PUSH和DEALER socket会把它们放到队列里。所以如果你向一个挂掉的对端发送心跳,然后它又重新连上线,那它就会得到你发送的所有心跳包,这些心跳包会是几千个。噢!
  • 该设计假设心跳的超时时间在整个网络中都是一样的,但这并不精确。一些对端希望很快的心跳以便快速检测故障,还有一些希望宽松的心跳,以便让网络休息休息节省电量。

Ping-Pong心跳

  第三种选择是用一个ping-pong会话。一个节点向对端发送一个ping命令,对端会回复一个pong命令。两个命令都没有任何有效负载。ping和pong并不是相互关联的。因为”client”和”server”的角色在一些网络中任意的,我们通常会随意指定一端发送一个ping然后接收一个pong。然而,动态的client最清楚网络拓扑结构中的超时时间,所以通常都是client去ping server。

  这对所有以ROUTER为基础的中介都管用。我们在第二种模型中使用优化手段能让它工作的更好:把所有进入的数据都当成pong,指在没有发送数据的时候发送ping。

偏执海盗的心跳

  对偏执海盗模型来说,我们选择第二种方法。这可能不是最简单的选择了:如果要在今天设计,我更可能去试试ping-pong的方法。但原则是相似的。心跳信息流在两个方向都是异步的,任何一端都能检测到对端是否”挂掉”并停止通信。

  在worker中,下面是我们怎么去处理来自队列的心跳的:

  • 我们计算一个活跃度,就是在判断队列挂掉之前我们丢失了多少心跳包。它从3开始,每丢失一个心跳包就减1.
  • 我们在zmq_poll循环中每次等1秒,这是我们的心跳间隔。
  • 如果在这段时间内有来自队列的任何信息,我们就把活跃度重置为3.
  • 如果这段时间内没有任何数据,就把活跃度减1.
  • 如果活跃度降为0,就认为队列已死。
  • 如果队列挂掉了,我们就销毁自己的socket,重新创建一个,再重连。
  • 为了避免打开和关闭太多的socket,在重连之前我们先等一个确定的时间间隔:每次都把等待时间翻倍,最多到32秒。

  下面是我们怎么处理队列的心跳的:

  • 在发送下次心跳的时候我们就计数:因为只是跟一个对端(队列)通信,这只是一个单独的变量。
  • zmq_poll循环中,不管什么时候过了这个时间点,我们都向队列发送一个心跳。

  下面是worker最重要的心跳部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#define HEARTBEAT_LIVENESS  3       //  3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
#define INTERVAL_INIT 1000 // Initial reconnect
#define INTERVAL_MAX 32000 // After exponential backoff


// If liveness hits zero, queue is considered disconnected
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;

// Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;

while (true) {
zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);

if (items [0].revents & ZMQ_POLLIN) {
// Receive any message from queue
liveness = HEARTBEAT_LIVENESS;
interval = INTERVAL_INIT;
}
else
if (--liveness == 0) {
zclock_sleep (interval);
if (interval < INTERVAL_MAX)
interval *= 2;
zsocket_destroy (ctx, worker);

liveness = HEARTBEAT_LIVENESS;
}
// Send heartbeat to queue if it's time
if (zclock_time () > heartbeat_at) {
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
// Send heartbeat message to queue
}
}

  队列做的事情是相同的,但为每个worker额外管理一个超时时间。

  这里有一些为设计你自己的心跳的小提示:

  • zmq_poll或者一个reactor作为你的应用的主逻辑的核心。
  • 以在两端建立心跳连接启动系统,模拟故障来测试,然后才建立其余的信息流。在后来才添加心跳是很需要技巧的。
  • 使用简单的追踪手段,比如说打印到控制台,来调试。为了帮助你在两个端点之间跟踪信息流,可以使用输出转储的方法,比如zmsg提供的那样,递增你的信息个数,以便查看是否有丢失。
  • 在一个实际应用中,心跳必须可配置,并且通常都是跟对端协商的结果。一些对端想要积极的心跳,比如说低到10秒一次,其他的对端可能需要高到30秒。
  • 如果不同的对端有不同的心跳间隔,你的轮询超时时间应该是这些中最低(时间最短)的一个。
  • 在你用来传输信息的那个socket上建立心跳,这样你的心跳也能当成一个keep-alive信息来防止网络连接死掉(一些防火墙对静默的连接并不友好)。

契约和协议

  如果你留意了,就会发现偏执海盗跟简单海盗并不相互协作,因为心跳的缘故。但我们怎么定义”协作”呢?为了保证协作关系,我们需要一种契约,一种让不同团队在不同时间不同地点写的代码能相互配合工作的一致性。我们称之为”协议(protocal)”。

  没有说明的尝试很有趣,但对真正的应用来说这并不是一个好的基础。(译注:orz…. It’s fun to experiment without specifications, but taht’s not a sensible basis for real applications.)如果我们想用另一种语言写一个worker会怎么样?我们必须读代码来看这东西是怎么工作的吗?如果我们因为某种原因想换种协议呢?甚至是一个简单的协议,如果它是成功的,那就会逐渐发展变得很复杂。

  缺少协议是一次性应用的一个确定迹象。所以让我们来为该协议写个契约。怎么做呢?

  在frc.zeromq.org有个wiki页专门作为公开的zmq协议的一个主页。要创建一个新的说明,如果需要就在该wiki也上注册,然后遵守指导说明。它相当直观,尽管写技术文件并不是每个人的菜。

  我用了差不多15分钟的时间写了新的Pirate Pattern Protocol。它不是一个大的说明,但它确实抓住了一些本质的东西(“你的队列跟PPP不匹配,请修改!”)。

  要把PPP变成一个真正的协议还需要做些额外的工作:

  • 在READY命令中还应该有一个协议版本号,好让我们能区分不同的PPP版本。
  • 就算是现在,READY和HEARTBEAT也不是完全跟请求和回复信息分开的。要让他们区分的明显,我们需要发送一个包含”信息类型”的message。

面向服务的可靠队列(管家模型 Majordome Pattern)

f50

  关于进度很好的一件事是在没有律师和委员会参与的情况下它能发展那么快(译注:orz…. The nice thing about progress is how fast it happens when lawyers and committees aren’t involved.)。这个一页纸的MDP说明把PPP变得更具体了。这就是我们如何去设计复杂的架构:从写协议开始,然后再去写软件实现它们。

  管家协议(MDP)用一种有趣的方法扩展并增强了PPP协议:它给client发送的请求增加了一个”服务名”,让worker去注册特定的服务。增加服务名就把我们的偏执海盗队列变成了面向服务的中介。关于MDP的一个比较好的事情是它基于现有的代码,一个简单的原始协议(PPP)和一套解决清晰问题的明确的改进。这就很容易去实现它。

  要实现管家模型,我们需要为client和worker写一个框架。在能找到一个更简单的API就能正常工作的情况下,要求程序员必须读具体的说明书显然是不明智的。

  所以我们的第一个协议(MDP本身)定义了分布式框架的各个部分怎么相互通信的,第二个协议定义了应用程序怎么去跟我们要设计的这个框架通信的。

  管家模型有两部分,一个client端和一个worker端。因为我们要同时写client和worker应用程序,就需要两个API。下面是client的API使用框架,使用了简单的面向对象方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//  Majordomo Protocol client example
// Uses the mdcli API to hide all MDP aspects

// Lets us build this source without creating a library
#include "mdcliapi.c"

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);

int count;
for (count = 0; count < 100000; count++) {
zmsg_t *request = zmsg_new ();
zmsg_pushstr (request, "Hello world");
zmsg_t *reply = mdcli_send (session, "echo", &request);
if (reply)
zmsg_destroy (&reply);
else
break; // Interrupt or failure
}
printf ("%d requests/replies processed\n", count);
mdcli_destroy (&session);
return 0;
}

  这就是它了,我们启动一个跟中介的会话,发送一条请求message,得到一个回复message,最后关闭连接。下面是worker的API使用框架:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//  Majordomo Protocol worker example
// Uses the mdwrk API to hide all MDP aspects

// Lets us build this source without creating a library
#include "mdwrkapi.c"

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdwrk_t *session = mdwrk_new (
"tcp://localhost:5555", "echo", verbose);

zmsg_t *reply = NULL;
while (true) {
zmsg_t *request = mdwrk_recv (session, &reply);
if (request == NULL)
break; // Worker was interrupted
reply = request; // Echo is complex… :-)
}
mdwrk_destroy (&session);
return 0;
}

  差不多是对称的,但worker的会话有点儿小不同。worker首先会调用recv(),返回一个空的回复,其后它会返回当前的回复,然后得到一个新的请求。

  client和worker的API用起来相当简单,因为它们都依赖于我们已经设计的偏执海盗模型的代码。下面是client的API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// mdcliapi: Majordomo client API in C
// mdcliapi class - Majordomo Protocol Client API
// Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.

#include "mdcliapi.h"

// Structure of our class
// We access these properties only via class methods

struct _mdcli_t {
zctx_t *ctx; // Our context
char *broker;
void *client; // Socket to broker
int verbose; // Print activity to stdout
int timeout; // Request timeout
int retries; // Request retries
};

// Connect or reconnect to broker

void s_mdcli_connect_to_broker (mdcli_t *self)
{
if (self->client)
zsocket_destroy (self->ctx, self->client);
self->client = zsocket_new (self->ctx, ZMQ_REQ);
zmq_connect (self->client, self->broker);
if (self->verbose)
zclock_log ("I: connecting to broker at %s…", self->broker);
}

// Here we have the constructor and destructor for our class:

// Constructor

mdcli_t *
mdcli_new (char *broker, int verbose)
{
assert (broker);

mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->verbose = verbose;
self->timeout = 2500; // msecs
self->retries = 3; // Before we abandon

s_mdcli_connect_to_broker (self);
return self;
}

// Destructor

void
mdcli_destroy (mdcli_t **self_p)
{
assert (self_p);
if (*self_p) {
mdcli_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self);
*self_p = NULL;
}
}

// These are the class methods. We can set the request timeout and number
// of retry attempts before sending requests:

// Set request timeout

void
mdcli_set_timeout (mdcli_t *self, int timeout)
{
assert (self);
self->timeout = timeout;
}

// Set request retries

void
mdcli_set_retries (mdcli_t *self, int retries)
{
assert (self);
self->retries = retries;
}

// Here is the send method. It sends a request to the broker and gets
// a reply even if it has to retry several times. It takes ownership of
// the request message, and destroys it when sent. It returns the reply
// message, or NULL if there was no reply after multiple attempts:

zmsg_t *
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
assert (self);
assert (request_p);
zmsg_t *request = *request_p;

// Prefix request with protocol frames
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 2: Service name (printable string)
zmsg_pushstr (request, service);
zmsg_pushstr (request, MDPC_CLIENT);
if (self->verbose) {
zclock_log ("I: send request to '%s' service:", service);
zmsg_dump (request);
}
int retries_left = self->retries;
while (retries_left && !zctx_interrupted) {
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->client);

zmq_pollitem_t items [] = {
{ self->client, 0, ZMQ_POLLIN, 0 }
};
// On any blocking call, libzmq will return -1 if there was
// an error; we could in theory check for different error codes,
// but in practice it's OK to assume it was EINTR (Ctrl-C):

int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

// If we got a reply, process it
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->client);
if (self->verbose) {
zclock_log ("I: received reply:");
zmsg_dump (msg);
}
// We would handle malformed replies better in real code
assert (zmsg_size (msg) >= 3);

zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPC_CLIENT));
zframe_destroy (&header);

zframe_t *reply_service = zmsg_pop (msg);
assert (zframe_streq (reply_service, service));
zframe_destroy (&reply_service);

zmsg_destroy (&request);
return msg; // Success
}
else
if (--retries_left) {
if (self->verbose)
zclock_log ("W: no reply, reconnecting…");
s_mdcli_connect_to_broker (self);
}
else {
if (self->verbose)
zclock_log ("W: permanent error, abandoning");
break; // Give up
}
}
if (zctx_interrupted)
printf ("W: interrupt received, killing client…\n");
zmsg_destroy (&request);
return NULL;
}

  让我们在实际运行中看看client的API,下面这个测试程序做100k次请求-回复循环:

// mdclient: Majordomo client application in C
// Majordomo Protocol client example
// Uses the mdcli API to hide all MDP aspects

// Lets us build this source without creating a library
#include "mdcliapi.c"

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);

int count;
for (count = 0; count < 100000; count++) {
zmsg_t *request = zmsg_new ();
zmsg_pushstr (request, "Hello world");
zmsg_t *reply = mdcli_send (session, "echo", &request);
if (reply)
zmsg_destroy (&reply);
else
break; // Interrupt or failure
}
printf ("%d requests/replies processed\n", count);
mdcli_destroy (&session);
return 0;
}

  下面是worker的API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// mdwrkapi: Majordomo worker API in C
// mdwrkapi class - Majordomo Protocol Worker API
// Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.

#include "mdwrkapi.h"

// Reliability parameters
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable

// This is the structure of a worker API instance. We use a pseudo-OO
// approach in a lot of the C examples, as well as the CZMQ binding:

// Structure of our class
// We access these properties only via class methods

struct _mdwrk_t {
zctx_t *ctx; // Our context
char *broker;
char *service;
void *worker; // Socket to broker
int verbose; // Print activity to stdout

// Heartbeat management
uint64_t heartbeat_at; // When to send HEARTBEAT
size_t liveness; // How many attempts left
int heartbeat; // Heartbeat delay, msecs
int reconnect; // Reconnect delay, msecs

int expect_reply; // Zero only at start
zframe_t *reply_to; // Return identity, if any
};

// We have two utility functions; to send a message to the broker and
// to (re)connect to the broker:

// Send message to broker
// If no msg is provided, creates one internally

static void
s_mdwrk_send_to_broker (mdwrk_t *self, char *command, char *option,
zmsg_t *msg)
{
msg = msg? zmsg_dup (msg): zmsg_new ();

// Stack protocol envelope to start of message
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);
zmsg_pushstr (msg, "");

if (self->verbose) {
zclock_log ("I: sending %s to broker",
mdps_commands [(int) *command]);
zmsg_dump (msg);
}
zmsg_send (&msg, self->worker);
}

// Connect or reconnect to broker

void s_mdwrk_connect_to_broker (mdwrk_t *self)
{
if (self->worker)
zsocket_destroy (self->ctx, self->worker);
self->worker = zsocket_new (self->ctx, ZMQ_DEALER);
zmq_connect (self->worker, self->broker);
if (self->verbose)
zclock_log ("I: connecting to broker at %s…", self->broker);

// Register service with broker
s_mdwrk_send_to_broker (self, MDPW_READY, self->service, NULL);

// If liveness hits zero, queue is considered disconnected
self->liveness = HEARTBEAT_LIVENESS;
self->heartbeat_at = zclock_time () + self->heartbeat;
}

// Here we have the constructor and destructor for our mdwrk class:

// Constructor

mdwrk_t *
mdwrk_new (char *broker,char *service, int verbose)
{
assert (broker);
assert (service);

mdwrk_t *self = (mdwrk_t *) zmalloc (sizeof (mdwrk_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->service = strdup (service);
self->verbose = verbose;
self->heartbeat = 2500; // msecs
self->reconnect = 2500; // msecs

s_mdwrk_connect_to_broker (self);
return self;
}

// Destructor

void
mdwrk_destroy (mdwrk_t **self_p)
{
assert (self_p);
if (*self_p) {
mdwrk_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self->service);
free (self);
*self_p = NULL;
}
}

// We provide two methods to configure the worker API. You can set the
// heartbeat interval and retries to match the expected network performance.

// Set heartbeat delay

void
mdwrk_set_heartbeat (mdwrk_t *self, int heartbeat)
{
self->heartbeat = heartbeat;
}

// Set reconnect delay

void
mdwrk_set_reconnect (mdwrk_t *self, int reconnect)
{
self->reconnect = reconnect;
}

// This is the recv method; it's a little misnamed because it first sends
// any reply and then waits for a new request. If you have a better name
// for this, let me know.

// Send reply, if any, to broker and wait for next request.

zmsg_t *
mdwrk_recv (mdwrk_t *self, zmsg_t **reply_p)
{
// Format and send the reply if we were provided one
assert (reply_p);
zmsg_t *reply = *reply_p;
assert (reply || !self->expect_reply);
if (reply) {
assert (self->reply_to);
zmsg_wrap (reply, self->reply_to);
s_mdwrk_send_to_broker (self, MDPW_REPLY, NULL, reply);
zmsg_destroy (reply_p);
}
self->expect_reply = 1;

while (true) {
zmq_pollitem_t items [] = {
{ self->worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, self->heartbeat * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->worker);
if (!msg)
break; // Interrupted
if (self->verbose) {
zclock_log ("I: received message from broker:");
zmsg_dump (msg);
}
self->liveness = HEARTBEAT_LIVENESS;

// Don't try to handle errors, just assert noisily
assert (zmsg_size (msg) >= 3);

zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
zframe_destroy (&empty);

zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPW_WORKER));
zframe_destroy (&header);

zframe_t *command = zmsg_pop (msg);
if (zframe_streq (command, MDPW_REQUEST)) {
// We should pop and save as many addresses as there are
// up to a null part, but for now, just save one…
self->reply_to = zmsg_unwrap (msg);
zframe_destroy (&command);
// Here is where we actually have a message to process; we
// return it to the caller application:

return msg; // We have a request to process
}
else
if (zframe_streq (command, MDPW_HEARTBEAT))
; // Do nothing for heartbeats
else
if (zframe_streq (command, MDPW_DISCONNECT))
s_mdwrk_connect_to_broker (self);
else {
zclock_log ("E: invalid input message");
zmsg_dump (msg);
}
zframe_destroy (&command);
zmsg_destroy (&msg);
}
else
if (--self->liveness == 0) {
if (self->verbose)
zclock_log ("W: disconnected from broker - retrying…");
zclock_sleep (self->reconnect);
s_mdwrk_connect_to_broker (self);
}
// Send HEARTBEAT if it's time
if (zclock_time () > self->heartbeat_at) {
s_mdwrk_send_to_broker (self, MDPW_HEARTBEAT, NULL, NULL);
self->heartbeat_at = zclock_time () + self->heartbeat;
}
}
if (zctx_interrupted)
printf ("W: interrupt received, killing worker…\n");
return NULL;
}

  在实际运行中看看worker的API,下面这个测试例子是实现一个echo服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//  mdworker: Majordomo worker application in C
// Majordomo Protocol worker example
// Uses the mdwrk API to hide all MDP aspects

// Lets us build this source without creating a library
#include "mdwrkapi.c"

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdwrk_t *session = mdwrk_new (
"tcp://localhost:5555", "echo", verbose);

zmsg_t *reply = NULL;
while (true) {
zmsg_t *request = mdwrk_recv (session, &reply);
if (request == NULL)
break; // Worker was interrupted
reply = request; // Echo is complex… :-)
}
mdwrk_destroy (&session);
return 0;
}

  这里有一些对worker API代码的说明:

  • 这些API是单线程的。也就是说,worker不会在后台发送心跳。让人高兴的是,这正是我们想要的:如果worker应用被卡住,心跳就会停止,然后中介就会停止向worker继续发送请求。
  • worker的API不会做指数级的延迟,没必要增加额外的复杂度了。
  • API不会做任何的错误报告,如果有些东西没有按预期的进行,它们就会引起一个断言(或者依赖语言的异常)。这对引用实现来说很理想,任何协议的错误都会立马显现。对真正的应用来说,API应该对无效的信息有处理措施。

  你可能会想知道在zmq会在对端掉线又上线的能自动重连的情况下,为什么worker API还需要手动关闭它的socket然后启动一个新的。要理解需要回头看看简单海盗和偏执海盗的worker。尽管zmq会自动重连,但这对worker重新注册到broker这件事并不够。我知道至少有两个办法。最简单的也就是我们在这里用的这个,是让worker用心跳去监控连接,如果它判断中介挂掉了,就关闭并重启一个新的socket。另一种是让broker对那些没注册的worker重新注册。这需要协议的支持。

  现在让我们来设计管家的中介。他的核心结构是一套队列,每个服务一个队列。我们可以创建这些队列来作为worker上线(可以删除它们来作为worker下线,但别忘记现在这么做只是为了简单)。另外,我们为每个服务都维持一个队列。

  下面是中介:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
//  mdbroker: Majordomo broker in C
// Majordomo Protocol broker
// A minimal C implementation of the Majordomo Protocol as defined in
// http://rfc.zeromq.org/spec:7 and http://rfc.zeromq.org/spec:8.

#include "czmq.h"
#include "mdp.h"

// We'd normally pull these from config data

#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 2500 // msecs
#define HEARTBEAT_EXPIRY HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

// The broker class defines a single broker instance:

typedef struct {
zctx_t *ctx; // Our context
void *socket; // Socket for clients & workers
int verbose; // Print activity to stdout
char *endpoint; // Broker binds to this endpoint
zhash_t *services; // Hash of known services
zhash_t *workers; // Hash of known workers
zlist_t *waiting; // List of waiting workers
uint64_t heartbeat_at; // When to send HEARTBEAT
} broker_t;

static broker_t *
s_broker_new (int verbose);
static void
s_broker_destroy (broker_t **self_p);
static void
s_broker_bind (broker_t *self, char *endpoint);
static void
s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
s_broker_client_msg (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
s_broker_purge (broker_t *self);

// The service class defines a single service instance:

typedef struct {
broker_t *broker; // Broker instance
char *name; // Service name
zlist_t *requests; // List of client requests
zlist_t *waiting; // List of waiting workers
size_t workers; // How many workers we have
} service_t;

static service_t *
s_service_require (broker_t *self, zframe_t *service_frame);
static void
s_service_destroy (void *argument);
static void
s_service_dispatch (service_t *service, zmsg_t *msg);

// The worker class defines a single worker, idle or active:

typedef struct {
broker_t *broker; // Broker instance
char *id_string; // Identity of worker as string
zframe_t *identity; // Identity frame for routing
service_t *service; // Owning service, if known
int64_t expiry; // When worker expires, if no heartbeat
} worker_t;

static worker_t *
s_worker_require (broker_t *self, zframe_t *identity);
static void
s_worker_delete (worker_t *self, int disconnect);
static void
s_worker_destroy (void *argument);
static void
s_worker_send (worker_t *self, char *command, char *option,
zmsg_t *msg);
static void
s_worker_waiting (worker_t *self);

// Here are the constructor and destructor for the broker:

static broker_t *
s_broker_new (int verbose)
{
broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));

// Initialize broker state
self->ctx = zctx_new ();
self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
self->verbose = verbose;
self->services = zhash_new ();
self->workers = zhash_new ();
self->waiting = zlist_new ();
self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
return self;
}

static void
s_broker_destroy (broker_t **self_p)
{
assert (self_p);
if (*self_p) {
broker_t *self = *self_p;
zctx_destroy (&self->ctx);
zhash_destroy (&self->services);
zhash_destroy (&self->workers);
zlist_destroy (&self->waiting);
free (self);
*self_p = NULL;
}
}

// This method binds the broker instance to an endpoint. We can call
// this multiple times. Note that MDP uses a single socket for both clients
// and workers:

void
s_broker_bind (broker_t *self, char *endpoint)
{
zsocket_bind (self->socket, endpoint);
zclock_log ("I: MDP broker/0.2.0 is active at %s", endpoint);
}

// This method processes one READY, REPLY, HEARTBEAT, or
// DISCONNECT message sent to the broker by a worker:

static void
s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert (zmsg_size (msg) >= 1); // At least, command

zframe_t *command = zmsg_pop (msg);
char *id_string = zframe_strhex (sender);
int worker_ready = (zhash_lookup (self->workers, id_string) != NULL);
free (id_string);
worker_t *worker = s_worker_require (self, sender);

if (zframe_streq (command, MDPW_READY)) {
if (worker_ready) // Not first command in session
s_worker_delete (worker, 1);
else
if (zframe_size (sender) >= 4 // Reserved service name
&& memcmp (zframe_data (sender), "mmi.", 4) == 0)
s_worker_delete (worker, 1);
else {
// Attach worker to service and mark as idle
zframe_t *service_frame = zmsg_pop (msg);
worker->service = s_service_require (self, service_frame);
worker->service->workers++;
s_worker_waiting (worker);
zframe_destroy (&service_frame);
}
}
else
if (zframe_streq (command, MDPW_REPLY)) {
if (worker_ready) {
// Remove and save client return envelope and insert the
// protocol header and service name, then rewrap envelope.
zframe_t *client = zmsg_unwrap (msg);
zmsg_pushstr (msg, worker->service->name);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
s_worker_waiting (worker);
}
else
s_worker_delete (worker, 1);
}
else
if (zframe_streq (command, MDPW_HEARTBEAT)) {
if (worker_ready)
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
else
s_worker_delete (worker, 1);
}
else
if (zframe_streq (command, MDPW_DISCONNECT))
s_worker_delete (worker, 0);
else {
zclock_log ("E: invalid input message");
zmsg_dump (msg);
}
free (command);
zmsg_destroy (&msg);
}

// Process a request coming from a client. We implement MMI requests
// directly here (at present, we implement only the mmi.service request):

static void
s_broker_client_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert (zmsg_size (msg) >= 2); // Service name + body

zframe_t *service_frame = zmsg_pop (msg);
service_t *service = s_service_require (self, service_frame);

// Set reply return identity to client sender
zmsg_wrap (msg, zframe_dup (sender));

// If we got a MMI service request, process that internally
if (zframe_size (service_frame) >= 4
&& memcmp (zframe_data (service_frame), "mmi.", 4) == 0) {
char *return_code;
if (zframe_streq (service_frame, "mmi.service")) {
char *name = zframe_strdup (zmsg_last (msg));
service_t *service =
(service_t *) zhash_lookup (self->services, name);
return_code = service && service->workers? "200": "404";
free (name);
}
else
return_code = "501";

zframe_reset (zmsg_last (msg), return_code, strlen (return_code));

// Remove & save client return envelope and insert the
// protocol header and service name, then rewrap envelope.
zframe_t *client = zmsg_unwrap (msg);
zmsg_push (msg, zframe_dup (service_frame));
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
}
else
// Else dispatch the message to the requested service
s_service_dispatch (service, msg);
zframe_destroy (&service_frame);
}

// This method deletes any idle workers that haven't pinged us in a
// while. We hold workers from oldest to most recent so we can stop
// scanning whenever we find a live worker. This means we'll mainly stop
// at the first worker, which is essential when we have large numbers of
// workers (we call this method in our critical path):

static void
s_broker_purge (broker_t *self)
{
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker) {
if (zclock_time () < worker->expiry)
break; // Worker is alive, we're done here
if (self->verbose)
zclock_log ("I: deleting expired worker: %s",
worker->id_string);

s_worker_delete (worker, 0);
worker = (worker_t *) zlist_first (self->waiting);
}
}

// Here is the implementation of the methods that work on a service:

// Lazy constructor that locates a service by name or creates a new
// service if there is no service already with that name.

static service_t *
s_service_require (broker_t *self, zframe_t *service_frame)
{
assert (service_frame);
char *name = zframe_strdup (service_frame);

service_t *service =
(service_t *) zhash_lookup (self->services, name);
if (service == NULL) {
service = (service_t *) zmalloc (sizeof (service_t));
service->broker = self;
service->name = name;
service->requests = zlist_new ();
service->waiting = zlist_new ();
zhash_insert (self->services, name, service);
zhash_freefn (self->services, name, s_service_destroy);
if (self->verbose)
zclock_log ("I: added service: %s", name);
}
else
free (name);

return service;
}

// Service destructor is called automatically whenever the service is
// removed from broker->services.

static void
s_service_destroy (void *argument)
{
service_t *service = (service_t *) argument;
while (zlist_size (service->requests)) {
zmsg_t *msg = zlist_pop (service->requests);
zmsg_destroy (&msg);
}
zlist_destroy (&service->requests);
zlist_destroy (&service->waiting);
free (service->name);
free (service);
}

// This method sends requests to waiting workers:

static void
s_service_dispatch (service_t *self, zmsg_t *msg)
{
assert (self);
if (msg) // Queue message if any
zlist_append (self->requests, msg);

s_broker_purge (self->broker);
while (zlist_size (self->waiting) && zlist_size (self->requests)) {
worker_t *worker = zlist_pop (self->waiting);
zlist_remove (self->broker->waiting, worker);
zmsg_t *msg = zlist_pop (self->requests);
s_worker_send (worker, MDPW_REQUEST, NULL, msg);
zmsg_destroy (&msg);
}
}

// Here is the implementation of the methods that work on a worker:

// Lazy constructor that locates a worker by identity, or creates a new
// worker if there is no worker already with that identity.

static worker_t *
s_worker_require (broker_t *self, zframe_t *identity)
{
assert (identity);

// self->workers is keyed off worker identity
char *id_string = zframe_strhex (identity);
worker_t *worker =
(worker_t *) zhash_lookup (self->workers, id_string);

if (worker == NULL) {
worker = (worker_t *) zmalloc (sizeof (worker_t));
worker->broker = self;
worker->id_string = id_string;
worker->identity = zframe_dup (identity);
zhash_insert (self->workers, id_string, worker);
zhash_freefn (self->workers, id_string, s_worker_destroy);
if (self->verbose)
zclock_log ("I: registering new worker: %s", id_string);
}
else
free (id_string);
return worker;
}

// This method deletes the current worker.

static void
s_worker_delete (worker_t *self, int disconnect)
{
assert (self);
if (disconnect)
s_worker_send (self, MDPW_DISCONNECT, NULL, NULL);

if (self->service) {
zlist_remove (self->service->waiting, self);
self->service->workers--;
}
zlist_remove (self->broker->waiting, self);
// This implicitly calls s_worker_destroy
zhash_delete (self->broker->workers, self->id_string);
}

// Worker destructor is called automatically whenever the worker is
// removed from broker->workers.

static void
s_worker_destroy (void *argument)
{
worker_t *self = (worker_t *) argument;
zframe_destroy (&self->identity);
free (self->id_string);
free (self);
}

// This method formats and sends a command to a worker. The caller may
// also provide a command option, and a message payload:

static void
s_worker_send (worker_t *self, char *command, char *option, zmsg_t *msg)
{
msg = msg? zmsg_dup (msg): zmsg_new ();

// Stack protocol envelope to start of message
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);

// Stack routing envelope to start of message
zmsg_wrap (msg, zframe_dup (self->identity));

if (self->broker->verbose) {
zclock_log ("I: sending %s to worker",
mdps_commands [(int) *command]);
zmsg_dump (msg);
}
zmsg_send (&msg, self->broker->socket);
}

// This worker is now waiting for work

static void
s_worker_waiting (worker_t *self)
{
// Queue to broker and service waiting lists
assert (self->broker);
zlist_append (self->broker->waiting, self);
zlist_append (self->service->waiting, self);
self->expiry = zclock_time () + HEARTBEAT_EXPIRY;
s_service_dispatch (self->service, NULL);
}

// Finally, here is the main task. We create a new broker instance and
// then process messages on the broker socket:

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));

broker_t *self = s_broker_new (verbose);
s_broker_bind (self, "tcp://*:5555");

// Get and process messages forever or until interrupted
while (true) {
zmq_pollitem_t items [] = {
{ self->socket, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

// Process next input message, if any
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->socket);
if (!msg)
break; // Interrupted
if (self->verbose) {
zclock_log ("I: received message:");
zmsg_dump (msg);
}
zframe_t *sender = zmsg_pop (msg);
zframe_t *empty = zmsg_pop (msg);
zframe_t *header = zmsg_pop (msg);

if (zframe_streq (header, MDPC_CLIENT))
s_broker_client_msg (self, sender, msg);
else
if (zframe_streq (header, MDPW_WORKER))
s_broker_worker_msg (self, sender, msg);
else {
zclock_log ("E: invalid message:");
zmsg_dump (msg);
zmsg_destroy (&msg);
}
zframe_destroy (&sender);
zframe_destroy (&empty);
zframe_destroy (&header);
}
// Disconnect and delete any expired workers
// Send heartbeats to idle workers if needed
if (zclock_time () > self->heartbeat_at) {
s_broker_purge (self);
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker) {
s_worker_send (worker, MDPW_HEARTBEAT, NULL, NULL);
worker = (worker_t *) zlist_next (self->waiting);
}
self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
}
}
if (zctx_interrupted)
printf ("W: interrupt received, shutting down…\n");

s_broker_destroy (&self);
return 0;
}

  这是目前我们看到的最复杂的例子。大概500行代码。为了写这个并让它有某种程度的健壮性,用了我两天时间。然而,这仍是一个完整的面向服务中介的一小部分代码。

  这里有一些关于它的说明:

  • 管家模型协议让我们能在一个单独的socket上管理client和worker。这对部署和管理中介相当友好:它只位于一个zmq的端点,而不是像大多数代理那样需要两个。
  • 该中介正确实现了完整的MDP/0.1(就我现在知道的),包括在broker发送无效的命令、心跳或其他信息时候断开的处理。
  • 可以将它扩展到在多线程上运行,每个线程管理一个socket和一套client与worker。这对分割大型架构可能有用。已经设计良好的围绕一个broker的类的C代码让这变得很容易。
  • 一个主/备模型或者实时/实时broker的可靠模型也很容易实现,因为broker本质上来说除了服务出现就没有状态。如果在client和worker第一次没正确连接运行起来的时候,需要它们自己去选择另一个broker。
  • 例子中使用了5秒的心跳,主要是为了让你在追踪的时候尽量少点儿输出。现实的值对大多说局域网应用来说可以低点儿。但,每次重试都必须足够慢,好让一个服务重启,比如说至少10秒。

  我们稍后改进扩展了该协议和管家模型的实现,现在在我们的Github项目上。如果你想正确使用管家模型,就看看GitHub上的项目吧。

异步管家模型

  上节中实现的管家模型相当简单且愚蠢。client只是最开始简单海盗里边的,用一个性感的API封装了下。我才一台测试机上运行client,broker和worker的时候,大约14秒能处理100,000请求。部分原因是因为代码问题,它经常到处复制message帧,就像CPU在空余一样。但真正的问题是我们正在做网络的双向回路。zmq没有用Nagle’s algorithm,但双向回路仍然很慢。

  理论总是理论,实践才是检验真理的唯一标准。我们用一个简单的测试程序度量一个双向回路的开销。它发送一组message,第一个等着每个message的一个回复,第二个作为一个批处理作业来一次读取所有的这些回复。两种方法做的事情是相同的,但结果很不一样。我们模拟一个client,一个broker和一个worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
//  tripping: Round-trip demonstrator in C
// Round-trip demonstrator
// While this example runs in a single process, that is just to make
// it easier to start and stop the example. The client task signals to
// main when it's ready.

#include "czmq.h"

static void
client_task (void *args, zctx_t *ctx, void *pipe)
{
void *client = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (client, "tcp://localhost:5555");
printf ("Setting up test…\n");
zclock_sleep (100);

int requests;
int64_t start;

printf ("Synchronous round-trip test…\n");
start = zclock_time ();
for (requests = 0; requests < 10000; requests++) {
zstr_send (client, "hello");
char *reply = zstr_recv (client);
free (reply);
}
printf (" %d calls/second\n",
(1000 * 10000) / (int) (zclock_time () - start));

printf ("Asynchronous round-trip test…\n");
start = zclock_time ();
for (requests = 0; requests < 100000; requests++)
zstr_send (client, "hello");
for (requests = 0; requests < 100000; requests++) {
char *reply = zstr_recv (client);
free (reply);
}
printf (" %d calls/second\n",
(1000 * 100000) / (int) (zclock_time () - start));
zstr_send (pipe, "done");
}

// Here is the worker task. All it does is receive a message, and
// bounce it back the way it came:

static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (worker, "tcp://localhost:5556");

while (true) {
zmsg_t *msg = zmsg_recv (worker);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}

// Here is the broker task. It uses the zmq_proxy function to switch
// messages between frontend and backend:

static void *
broker_task (void *args)
{
// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_DEALER);
zsocket_bind (frontend, "tcp://*:5555");
void *backend = zsocket_new (ctx, ZMQ_DEALER);
zsocket_bind (backend, "tcp://*:5556");
zmq_proxy (frontend, backend, NULL);
zctx_destroy (&ctx);
return NULL;
}

// Finally, here's the main task, which starts the client, worker, and
// broker, and then runs until the client signals it to stop:

int main (void)
{
// Create threads
zctx_t *ctx = zctx_new ();
void *client = zthread_fork (ctx, client_task, NULL);
zthread_new (worker_task, NULL);
zthread_new (broker_task, NULL);

// Wait for signal on client pipe
char *signal = zstr_recv (client);
free (signal);

zctx_destroy (&ctx);
return 0;
}

  在我的开发机器上,结果显示:

1
2
3
4
5
Setting up test...
Synchronous round-trip test...
9057 calls/second
Asynchronous round-trip test...
173010 calls/second

  注意client线程在启动前有一个小停留,这是为了避免ROUTER socket的一个”特性”:如果你在发送信息的时候还没跟对端建立连接,那信息就会丢掉。这个例子中我们没有使用负载均衡机制,也就没有睡眠,如果worker线程接入得太慢,他就会丢失message,而把我们的测试毁掉。

  就像我们看到的那样,在最简单的例子中双向回路要比异步模型慢近20倍,异步模型就是”尽量把它往管道里推”。让我们看看是否能把它应用到管家模型让管家模型更快不能。

  首先,我们先调整client的API让发送和接收分在两个单独的方法中:

1
2
3
4
mdcli_t *mdcli_new     (char *broker);
void mdcli_destroy (mdcli_t **self_p);
int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
zmsg_t *mdcli_recv (mdcli_t *self);

  真的只要花很少时间就能把同步的client的API重构成异步的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
//  mdcliapi2: Majordomo asynchronuous clinet API in C
// mdcliapi2 class - Majordomo Protocol Client API
// Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.

#include "mdcliapi2.h"

// Structure of our class
// We access these properties only via class methods

struct _mdcli_t {
zctx_t *ctx; // Our context
char *broker;
void *client; // Socket to broker
int verbose; // Print activity to stdout
int timeout; // Request timeout
};

// Connect or reconnect to broker. In this asynchronous class we use a
// DEALER socket instead of a REQ socket; this lets us send any number
// of requests without waiting for a reply.

void s_mdcli_connect_to_broker (mdcli_t *self)
{
if (self->client)
zsocket_destroy (self->ctx, self->client);
self->client = zsocket_new (self->ctx, ZMQ_DEALER);
zmq_connect (self->client, self->broker);
if (self->verbose)
zclock_log ("I: connecting to broker at %s…", self->broker);
}

// The constructor and destructor are the same as in mdcliapi, except
// we don't do retries, so there's no retries property.
// ---------------------------------------------------------------------
// Constructor

mdcli_t *
mdcli_new (char *broker, int verbose)
{
assert (broker);

mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->verbose = verbose;
self->timeout = 2500; // msecs

s_mdcli_connect_to_broker (self);
return self;
}

// Destructor

void
mdcli_destroy (mdcli_t **self_p)
{
assert (self_p);
if (*self_p) {
mdcli_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self);
*self_p = NULL;
}
}

// Set request timeout

void
mdcli_set_timeout (mdcli_t *self, int timeout)
{
assert (self);
self->timeout = timeout;
}

// The send method now just sends one message, without waiting for a
// reply. Since we're using a DEALER socket we have to send an empty
// frame at the start, to create the same envelope that the REQ socket
// would normally make for us:

int
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
assert (self);
assert (request_p);
zmsg_t *request = *request_p;

// Prefix request with protocol frames
// Frame 0: empty (REQ emulation)
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 2: Service name (printable string)
zmsg_pushstr (request, service);
zmsg_pushstr (request, MDPC_CLIENT);
zmsg_pushstr (request, "");
if (self->verbose) {
zclock_log ("I: send request to '%s' service:", service);
zmsg_dump (request);
}
zmsg_send (&request, self->client);
return 0;
}

// The recv method waits for a reply message and returns that to the
// caller.
// ---------------------------------------------------------------------
// Returns the reply message or NULL if there was no reply. Does not
// attempt to recover from a broker failure, this is not possible
// without storing all unanswered requests and resending them all…

zmsg_t *
mdcli_recv (mdcli_t *self)
{
assert (self);

// Poll socket for a reply, with timeout
zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
if (rc == -1)
return NULL; // Interrupted

// If we got a reply, process it
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->client);
if (self->verbose) {
zclock_log ("I: received reply:");
zmsg_dump (msg);
}
// Don't try to handle errors, just assert noisily
assert (zmsg_size (msg) >= 4);

zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
zframe_destroy (&empty);

zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPC_CLIENT));
zframe_destroy (&header);

zframe_t *service = zmsg_pop (msg);
zframe_destroy (&service);

return msg; // Success
}
if (zctx_interrupted)
printf ("W: interrupt received, killing client…\n");
else
if (self->verbose)
zclock_log ("W: permanent error, abandoning request");

return NULL;
}

  不同之处为:

  • 我们用一个DEALER socket替换掉REQ,所以我们需要在每个请求和回复的message开始模拟一个REQ的空分割帧。
  • 我们不会重发请求。如果应用需要重试,它就会自己做。
  • 我们把同步的send方法拆分成了独立的sendrecv方法。
  • 改进的send是异步的,发送之后会立即收到回复。所以调用者能在收到回复之前发送很多message。
  • recv方法等待(有超时时间)一个回复然后返回给调用者。

  下面是对应的client测试程序,它会发送100,000条message然后收到100,000条回复:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//  mdclinet2: Majordomo client application in C
// Majordomo Protocol client example - asynchronous
// Uses the mdcli API to hide all MDP aspects

// Lets us build this source without creating a library
#include "mdcliapi2.c"

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);

int count;
for (count = 0; count < 100000; count++) {
zmsg_t *request = zmsg_new ();
zmsg_pushstr (request, "Hello world");
mdcli_send (session, "echo", &request);
}
for (count = 0; count < 100000; count++) {
zmsg_t *reply = mdcli_recv (session);
if (reply)
zmsg_destroy (&reply);
else
break; // Interrupted by Ctrl-C
}
printf ("%d replies received\n", count);
mdcli_destroy (&session);
return 0;
}

  broker和worker没变因为我们一点儿都没改动协议。我们会看到性能上的显著提升。下面是同步的client处理100k请求-回复循环:

1
2
3
4
5
6
$ time mdclient
100000 requests/replies processed

real 0m14.088s
user 0m1.310s
sys 0m2.670s

  下面是异步的client,和一个单独的worker:

1
2
3
4
5
6
$ time mdclient2
100000 replies received

real 0m8.730s
user 0m0.920s
sys 0m1.550s

  快了两倍。不是很糟糕,但我们启动10个worker可以看到提升了:

1
2
3
4
5
6
$ time mdclient2
100000 replies received

real 0m3.863s
user 0m0.730s
sys 0m0.470s

  它并不是全异步的,因为worker以严格的最后使用顺序来拿到message。但对更多的worker来说它扩展起来更好。在我的PC机上,8个或更多wokrer之后,处理速度就不会提升更快了。四核机器只能扩展成这样了。但通过几分钟的工作,我们就得到了4倍的性能提升。broker还没优化,它花费了大量的时间去到处复制message的帧,而不是使用零拷贝。但我们会用相当少的努力来获取高达每秒25K次可靠的请求/回复处理。

  然而,异步的管家模型也不总是好用。他有个致命的弱点,就是没有更多代码的话它没办法处理broker崩溃。如果你详细看了mdcliapi2的代码,你就会发现它在故障后没有尝试去重连。一个合适的重连机制需要以下这些东西:

  • 附带一个编号的请求和对应编号的回复,这个可以通过更改协议来实现。
  • 记录所有在client API出去的请求,比如说,所有还没收到回复的请求。
  • 故障发生的时候,client API负责重新发送所有记录的请求。

  它并不是一个协议破坏者,但确实显示了性能通常意味着复杂。使用管家模型是否值得呢?这要依据你的使用场景,在那些每个会话只用一次的名字查询服务中是不合适的,但对服务成千个client的web前端来说,可能就是合适的。

服务发现

  我们现在有了一个很好的面向服务的broker,但还没办法知道一个特定的服务是否存在。我们知道一个请求是否失败了,但不知道为什么失败。如果能问broker就很有用了,”echo服务还在运行吗?”最明显的办法就是修改我们的MDP/Clinet协议来增加询问的命令。但MDP/Client有着迷人的简单性。增加服务发现会让它跟MDP/Worker协议一样复杂。

  另一个选择是学习email做的那样,要求把不能递送的请求给退回来。这在一个异步模型中能很好工作,只是也要增加复杂性。我们需要能从回复的信息中分辨出来那些是退回的请求,并且正确处理它们。

  让我们用已经有的设计,在MDP之上设计而不是去修改它。服务发现本身也是一个服务。他可能是很多管理服务(比如”禁掉服务X”,”提供统计数据”等)其中的一个。我们想要的是一种通用的可扩展的解决方案,而不影响现有的协议和应用。

  所以在MDP之上有个小的RFC:管家模型管理接口 the Majordomo Management Interface(MMI)。我们已经在broekr中实现了它,但除非你读了所有的东西,很可能你会错过它。我来解释一下在broker中他是怎么工作的:

  • 一个client请求服务的时候先发送到mmi。,而不是路由到一个worker,我们会在内部处理路由方向的。
  • 我们只在broker中维持一个服务,就是mmi.服务,服务发现服务。
  • 请求的有效负载是额外服务的名字(真正的服务名字,由worker提供)。
  • 根据是否有worker注册了这项服务,broker返回”200”(OK)或者”404”(未发现服务)

  下面是我们在应用中怎么使用服务发现机制的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//  mmiecho: Service discovery over Majordomo in C
// MMI echo query example

// Lets us build this source without creating a library
#include "mdcliapi.c"

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);

// This is the service we want to look up
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "echo");

// This is the service we send our request to
zmsg_t *reply = mdcli_send (session, "mmi.service", &request);

if (reply) {
char *reply_code = zframe_strdup (zmsg_first (reply));
printf ("Lookup echo service: %s\n", reply_code);
free (reply_code);
zmsg_destroy (&reply);
}
else
printf ("E: no response from broker, make sure it's running\n");

mdcli_destroy (&session);
return 0;
}

  在运行和不运行worker的时候试试该测试,你会发现这个小程序会相应报告”200”或者”404”。在我们的例子broker中实现的MMI相当脆弱。例如,如果一个worker退出,server还保持”存在”。在实际中,broker应该在可配置的超时时间后仍没有worker就把该项服务给移除。

幂等服务

Idempotency is not something you take a pill for.它的意思是重复一个操作是安全的。检查时间是幂等的。把信用卡借给一个小孩子不是幂等的。尽管有很多client-to-server的使用情况是幂等的,但还有些不是。幂等的应用场景包括:

  • 无状态的任务分发,比如说一个管道中的server是无状态的,worker单纯依赖一个请求提供的状态来计算出回复。在这个场景中,多次计算同一个请求是安全的(尽管效率不高)。
  • 一个把逻辑地址转换成端点地址去绑定或者连接的名字查找服务。在这个场景中,多次查找相同的名字是安全的。

  下面是一些不幂等的应用场景:

  • 登陆服务。人们不希望多次输入相同的登陆信息。
  • 任何对下游节点有影响的服务,比如说传递信息到其他节点。如果服务多次拿到相同的请求,下游节点就会得到重复的信息。
  • 任何使用不幂等方法修改共享信息的服务,比如说,银行借出账户如果没有额外的处理那它就是不幂等的服务。

  在我们的服务器应用不是幂等的情况下,我们必须对它们什么时候可能崩溃要思考更多。如果一个应用在它空闲的时候挂掉,或者在处理请求的时候挂掉,通常是好的。我们可以用一个数据库事务去保证一个账户和信用卡总是一致的。如果服务器在发送回复的时候挂掉了,这就是个问题了,因为它以为它已经做好了自己的工作。

  如果在回复要发送到client的时候网络挂掉,相同的问题也会出现。client会以为server死掉了然后重新发送请求,server就会做两次相同的工作,这是我们不想看到的。

  要处理非幂等操作,可以使用以下相对标准的解决方案去检测和拒绝重复请求:

  • client必须为每个请求都加一个基于唯一client标识和唯一message序号的标签。
  • server在返回回复之前,需要先用client ID和message序号的组合作为键来存储该请求标签。
  • server在收到一个特定client的请求的时候,先检查是否有针对该client ID和message需要的回复,如果有,就不去处理请求,直接返回该回复。

非连接可靠性(泰坦尼克模型 Titanic Pattern)

  一旦你意识到管家模型是一个”可靠的”message broker的时候,你可能想去添加一些spinning rust(译注:硬盘?)(that is, ferrous-based hard disk platters)。毕竟,broker是要为整个消息系统服务的。这是个有诱惑力的想法,不过很伤感的说必须否决掉它。但简单粗暴是我的一个特长(译注:orz….  But brutal cynicism is one of my specialties)。因此,你不想在你的架构中加入持久化保障的一些原因是:

  • 就像你看到的那样,懒海盗模型工作的非常好。它贯穿了整个架构范围,从直接的client-to-server到分布式的队列代理。它确实假设了worker都是无状态的和幂等的。但我们可以不用求助持久化(rust感觉是翻译错了,我会在使用rust的地方都标注出来)就跨过该限制。
  • 持久化(rust)会带来很多问题,从性能慢到必须增加格外的部分去管理、维护。海盗模型的美妙之处就在于它们非常简单。它们不会崩溃。如果你仍的担心硬件,你可以换成一个peer-to-peer模型,这根本就没中介了。我会在本章稍后部分解释这个模型。

  虽然这样说,但还是有需要持久化(rust)为基础的可靠性的应用场景的,就是在异步分连接网络中。它解决了海盗模型的一个重要问题,就是client必须实时等着回复。如果一个client和worker只是偶尔连接一下(比如email),我们不能在client和worker之间使用无状态的网络连接,而必须把状态放在中间某个位置。

  所以,这里就有了泰坦尼克模型,这里不管client和worker多偶然的连接,我们都会把message都写入磁盘来保证它们不会丢失。就像为服务发现做的那样,我们会在MDP之上构建泰坦尼克模型而不是扩展MDP。这非常简单,因为它意味着我们可以在一个特别的worker上实现我们的无人看守(fire-and-forget,巡航导弹发射后自动寻找目标,不用人看管)可靠性,而不是在broker中去实现。这很好,因为:

  • 非常简单,因为分治:broker处理message路由,worker处理可靠性。
  • 它能让我们混合起来,broker用一种语言去写,worker用另一种语言。
  • 它让我们独立的开发无人看守技术。

  唯一的不足是需要额外的网络接口连接broker和硬盘,但跟好处相比是值得的。

  有很多方法去设计一个持久化的请求-回复框架。我们的目标是简单且无痛苦的那一个。我想了几个小时之后觉得能设计出来的最简单的设计是”代理服务”。也就是说,泰坦尼克一点儿不会影响worker。如果一个client想立即得到回复,它会直接跟服务通信并希望服务可用。如果一个client能等一会儿的话,那它就可以去跟泰坦尼克说”嘿,伙计,能帮我看下这条信息吗,我去买点儿东西?”

f51

  因此,泰坦尼克号既是worker又是一个client。在client和泰坦尼克之间的会话流向如下:

  • Client:请接收这个请求。Titanic: OK,好了。
  • Client:你有给我的回复了吗?Titanic: 是的,在这里。或者不,还没。
  • Client:好了,现在你可以清空那个请求了,我很高兴。Titanic: OK,好了。

  在Titanic和broker和worker之间的会话流向如下:

  • Titanic: 嘿,broker,有个咖啡服务吗? Broker:呃,好像有。
  • Titanic: 嘿,咖啡服务,帮我处理下这个。
  • Coffee:当然,这是结果。
  • Titanic: 真好!

  你可能遇到一些故障的情况。如果一个worker在处理请求的时候崩溃了,Titanic会无限重试。如果一个回复在某个地方丢了,Titanic也会重试。如果请求得到了处理但client并没收到回复,它会再发一遍请求。如果Titanic在处理请求或者回复的时候崩溃了,client会重试。只要请求被完整提交到了安全的储存位置,任务就不会丢失了。

  握手过程相当迂腐,但也可以是流水线式的,比如,client可以用异步的管家模型去做很多工作,稍后才拿到回复。

  我们需要某种方式让一个client得到它自己的回复。我们会有很多client去查询相同的服务,并且client伴随不同的标识掉线和上线。这里有个简单且相对安全的解决办法:

  • 每个请求生成一个唯一的ID(universally unique ID, UUID),在Titanic把请求加入队列之后会把该UUID返回给client。
  • 当一个client需要回复的时候,它必须给出原来那个请求的UUID。

  在实际应用中,client可能想安全的存储UUID,比如放到一个本地数据库里。

  在我们跳过该部分去写另一个规范的说明的时候,先来看看client怎么跟Titanic通信的。一种方法是使用一个单独的服务,发送三种不同的请求类型。另一种相对简单一点儿的方法是使用三个服务:

  • titanic.request:储存一个请求message,返回该请求的UUID。
  • titanic.reply:如果可能,就获取一个指定请求的UUID的回复。
  • titanic.close:确保回复已经存储并被处理过了。

  我们会设计一个多线程的worker,我么已经看到了用zmq设计多线程是很简单的事。但是,让我们先描述一下用zmq的message和帧怎么去表述Titanic。这给了我们泰坦尼克服务协议 Titanic Service Protocol(TSP)

  对client来说跟服务通信使用TSP显然要比直接用MDP要麻烦的多。下面是个最短的健壮的”echo”client例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//  ticlient: Titanic client example in C
// Titanic client example
// Implements client side of http://rfc.zeromq.org/spec:9

// Lets build this source without creating a library
#include "mdcliapi.c"

// Calls a TSP service
// Returns response if successful (status code 200 OK), else NULL
//
static zmsg_t *
s_service_call (mdcli_t *session, char *service, zmsg_t **request_p)
{
zmsg_t *reply = mdcli_send (session, service, request_p);
if (reply) {
zframe_t *status = zmsg_pop (reply);
if (zframe_streq (status, "200")) {
zframe_destroy (&status);
return reply;
}
else
if (zframe_streq (status, "400")) {
printf ("E: client fatal error, aborting\n");
exit (EXIT_FAILURE);
}
else
if (zframe_streq (status, "500")) {
printf ("E: server fatal error, aborting\n");
exit (EXIT_FAILURE);
}
}
else
exit (EXIT_SUCCESS); // Interrupted or failed

zmsg_destroy (&reply);
return NULL; // Didn't succeed; don't care why not
}

// The main task tests our service call by sending an echo request:

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);

// 1. Send 'echo' request to Titanic
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "echo");
zmsg_addstr (request, "Hello world");
zmsg_t *reply = s_service_call (
session, "titanic.request", &request);

zframe_t *uuid = NULL;
if (reply) {
uuid = zmsg_pop (reply);
zmsg_destroy (&reply);
zframe_print (uuid, "I: request UUID ");
}
// 2. Wait until we get a reply
while (!zctx_interrupted) {
zclock_sleep (100);
request = zmsg_new ();
zmsg_add (request, zframe_dup (uuid));
zmsg_t *reply = s_service_call (
session, "titanic.reply", &request);

if (reply) {
char *reply_string = zframe_strdup (zmsg_last (reply));
printf ("Reply: %s\n", reply_string);
free (reply_string);
zmsg_destroy (&reply);

// 3. Close request
request = zmsg_new ();
zmsg_add (request, zframe_dup (uuid));
reply = s_service_call (session, "titanic.close", &request);
zmsg_destroy (&reply);
break;
}
else {
printf ("I: no reply yet, trying again…\n");
zclock_sleep (5000); // Try again in 5 seconds
}
}
zframe_destroy (&uuid);
mdcli_destroy (&session);
return 0;
}

  当然,这可以也应该被封装起来成为某种框架或者API。要求一般的应用开发者去学习整个消息传输的细节是不好的:这会伤害它们的大脑,浪费时间,需要大量的工作维护很多问题的复杂性。另外,很难扩展。

  例如,在实际应用中client会在每个请求上都阻塞,我们更希望在执行任务的时候做一些有用的事。这需要谨慎设计一个跟client干净利索的通信的后台线程。这是你希望封装在一个良好的简单的API里的东西,不会让普通的开发者用错。这同样是我们用来设计管家模型的方法。

  下面是泰坦尼克模型的实现。server使用三个线程处理三种服务。它使用的应该是最暴力的方法来保存全部的信息:每条message一个文件。这相当简单,但也相当可怕。唯一复杂的部分是它维护了一个所有请求的队列,这是为了避免一次次从文件中读取信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
//  titanic: Titanic broker example in C
// Titanic service
// Implements server side of http://rfc.zeromq.org/spec:9

// Lets us build this source without creating a library
#include "mdwrkapi.c"
#include "mdcliapi.c"

#include "zfile.h"
#include <uuid/uuid.h>

// Return a new UUID as a printable character string
// Caller must free returned string when finished with it

static char *
s_generate_uuid (void)
{
char hex_char [] = "0123456789ABCDEF";
char *uuidstr = zmalloc (sizeof (uuid_t) * 2 + 1);
uuid_t uuid;
uuid_generate (uuid);
int byte_nbr;
for (byte_nbr = 0; byte_nbr < sizeof (uuid_t); byte_nbr++) {
uuidstr [byte_nbr * 2 + 0] = hex_char [uuid [byte_nbr] >> 4];
uuidstr [byte_nbr * 2 + 1] = hex_char [uuid [byte_nbr] & 15];
}
return uuidstr;
}

// Returns freshly allocated request filename for given UUID

#define TITANIC_DIR ".titanic"

static char *
s_request_filename (char *uuid) {
char *filename = malloc (256);
snprintf (filename, 256, TITANIC_DIR "/%s.req", uuid);
return filename;
}

// Returns freshly allocated reply filename for given UUID

static char *
s_reply_filename (char *uuid) {
char *filename = malloc (256);
snprintf (filename, 256, TITANIC_DIR "/%s.rep", uuid);
return filename;
}

// The titanic.request task waits for requests to this service. It writes
// each request to disk and returns a UUID to the client. The client picks
// up the reply asynchronously using the titanic.reply service:

static void
titanic_request (void *args, zctx_t *ctx, void *pipe)
{
mdwrk_t *worker = mdwrk_new (
"tcp://localhost:5555", "titanic.request", 0);
zmsg_t *reply = NULL;

while (true) {
// Send reply if it's not null
// And then get next request from broker
zmsg_t *request = mdwrk_recv (worker, &reply);
if (!request)
break; // Interrupted, exit

// Ensure message directory exists
zfile_mkdir (TITANIC_DIR);

// Generate UUID and save message to disk
char *uuid = s_generate_uuid ();
char *filename = s_request_filename (uuid);
FILE *file = fopen (filename, "w");
assert (file);
zmsg_save (request, file);
fclose (file);
free (filename);
zmsg_destroy (&request);

// Send UUID through to message queue
reply = zmsg_new ();
zmsg_addstr (reply, uuid);
zmsg_send (&reply, pipe);

// Now send UUID back to client
// Done by the mdwrk_recv() at the top of the loop
reply = zmsg_new ();
zmsg_addstr (reply, "200");
zmsg_addstr (reply, uuid);
free (uuid);
}
mdwrk_destroy (&worker);
}

// The titanic.reply task checks if there's a reply for the specified
// request (by UUID), and returns a 200 (OK), 300 (Pending), or 400
// (Unknown) accordingly:

static void *
titanic_reply (void *context)
{
mdwrk_t *worker = mdwrk_new (
"tcp://localhost:5555", "titanic.reply", 0);
zmsg_t *reply = NULL;

while (true) {
zmsg_t *request = mdwrk_recv (worker, &reply);
if (!request)
break; // Interrupted, exit

char *uuid = zmsg_popstr (request);
char *req_filename = s_request_filename (uuid);
char *rep_filename = s_reply_filename (uuid);
if (zfile_exists (rep_filename)) {
FILE *file = fopen (rep_filename, "r");
assert (file);
reply = zmsg_load (NULL, file);
zmsg_pushstr (reply, "200");
fclose (file);
}
else {
reply = zmsg_new ();
if (zfile_exists (req_filename))
zmsg_pushstr (reply, "300"); //Pending
else
zmsg_pushstr (reply, "400"); //Unknown
}
zmsg_destroy (&request);
free (uuid);
free (req_filename);
free (rep_filename);
}
mdwrk_destroy (&worker);
return 0;
}

// The titanic.close task removes any waiting replies for the request
// (specified by UUID). It's idempotent, so it is safe to call more than
// once in a row:

static void *
titanic_close (void *context)
{
mdwrk_t *worker = mdwrk_new (
"tcp://localhost:5555", "titanic.close", 0);
zmsg_t *reply = NULL;

while (true) {
zmsg_t *request = mdwrk_recv (worker, &reply);
if (!request)
break; // Interrupted, exit

char *uuid = zmsg_popstr (request);
char *req_filename = s_request_filename (uuid);
char *rep_filename = s_reply_filename (uuid);
zfile_delete (req_filename);
zfile_delete (rep_filename);
free (uuid);
free (req_filename);
free (rep_filename);

zmsg_destroy (&request);
reply = zmsg_new ();
zmsg_addstr (reply, "200");
}
mdwrk_destroy (&worker);
return 0;
}

// This is the main thread for the Titanic worker. It starts three child
// threads; for the request, reply, and close services. It then dispatches
// requests to workers using a simple brute force disk queue. It receives
// request UUIDs from the titanic.request service, saves these to a disk
// file, and then throws each request at MDP workers until it gets a
// response.

static int s_service_success (char *uuid);

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
zctx_t *ctx = zctx_new ();

void *request_pipe = zthread_fork (ctx, titanic_request, NULL);
zthread_new (titanic_reply, NULL);
zthread_new (titanic_close, NULL);

// Main dispatcher loop
while (true) {
// We'll dispatch once per second, if there's no activity
zmq_pollitem_t items [] = { { request_pipe, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted
if (items [0].revents & ZMQ_POLLIN) {
// Ensure message directory exists
zfile_mkdir (TITANIC_DIR);

// Append UUID to queue, prefixed with '-' for pending
zmsg_t *msg = zmsg_recv (request_pipe);
if (!msg)
break; // Interrupted
FILE *file = fopen (TITANIC_DIR "/queue", "a");
char *uuid = zmsg_popstr (msg);
fprintf (file, "-%s\n", uuid);
fclose (file);
free (uuid);
zmsg_destroy (&msg);
}
// Brute force dispatcher
char entry [] = "?…….:…….:…….:…….:";
FILE *file = fopen (TITANIC_DIR "/queue", "r+");
while (file && fread (entry, 33, 1, file) == 1) {
// UUID is prefixed with '-' if still waiting
if (entry [0] == '-') {
if (verbose)
printf ("I: processing request %s\n", entry + 1);
if (s_service_success (entry + 1)) {
// Mark queue entry as processed
fseek (file, -33, SEEK_CUR);
fwrite ("+", 1, 1, file);
fseek (file, 32, SEEK_CUR);
}
}
// Skip end of line, LF or CRLF
if (fgetc (file) == '\r')
fgetc (file);
if (zctx_interrupted)
break;
}
if (file)
fclose (file);
}
return 0;
}

// Here, we first check if the requested MDP service is defined or not,
// using a MMI lookup to the Majordomo broker. If the service exists,
// we send a request and wait for a reply using the conventional MDP
// client API. This is not meant to be fast, just very simple:

static int
s_service_success (char *uuid)
{
// Load request message, service will be first frame
char *filename = s_request_filename (uuid);
FILE *file = fopen (filename, "r");
free (filename);

// If the client already closed request, treat as successful
if (!file)
return 1;

zmsg_t *request = zmsg_load (NULL, file);
fclose (file);
zframe_t *service = zmsg_pop (request);
char *service_name = zframe_strdup (service);

// Create MDP client session with short timeout
mdcli_t *client = mdcli_new ("tcp://localhost:5555", false);
mdcli_set_timeout (client, 1000); // 1 sec
mdcli_set_retries (client, 1); // only 1 retry

// Use MMI protocol to check if service is available
zmsg_t *mmi_request = zmsg_new ();
zmsg_add (mmi_request, service);
zmsg_t *mmi_reply = mdcli_send (client, "mmi.service", &mmi_request);
int service_ok = (mmi_reply
&& zframe_streq (zmsg_first (mmi_reply), "200"));
zmsg_destroy (&mmi_reply);

int result = 0;
if (service_ok) {
zmsg_t *reply = mdcli_send (client, service_name, &request);
if (reply) {
filename = s_reply_filename (uuid);
FILE *file = fopen (filename, "w");
assert (file);
zmsg_save (reply, file);
fclose (file);
free (filename);
result = 1;
}
zmsg_destroy (&reply);
}
else
zmsg_destroy (&request);

mdcli_destroy (&client);
free (service_name);
return result;
}

要测试这个程序,需要先启动mdbroker和titanic,然后再启动ticlient。现在反复重启mdworker,你会看到client收到一个回复然后干净地退出。

关于这段代码的一些说明:

  • 注意一些循环以发送信息开始,另一些以接收开始。这是因为Titanic在不同角色里分别扮演着client和worker。
  • Titanic broker使用MMI服务发现协议只对正在运行着的服务发送请求。因为在我们的小管家broker中实现的MMI很简陋,它并不会总是正常工作。
  • 我们使用一个inproc连接从titanic.request服务发送请求数据到中心调度器。这避免了调度去必须扫描文件,加载所有请求文件并把它们按照日期/时间顺序排列。

  这个例子中重要的东西不是性能(尽管我没测试过,但应该相当糟糕),而是它怎么实现的可靠传输。试着先启动mdbroker和titanic程序,然后启动ticlient,接着启动mdworker echo 服务。你可以在运行的时候使用-v选项来做冗余的活动跟踪。你可以停止和重启除了client的任何部分,它不会丢失任何东西。

  如果你想在实际中使用Titanic,你可能会反复问自己”我怎么能把它弄得快点儿?”

  下面是我会做的,从简单的实现开始:

  • 对所有数据使用单一文件,而不是多个。操作系统通常都是对单个大文件的处理要好于很多小文件的处理。
  • 把文件组织成一个环状的存储空间,好让新的请求能持续写入。在一个单独线程中全速写入磁盘能工作的比较快。
  • 在内存中记录编号,在重启的时候从磁盘存储中重新加载编号。这需要额外的磁盘写入来保证标号在磁盘上的安全性。你会希望在每条message之后都做磁盘同步的工作,甚至是为了防止系统故障而在每N毫秒之后就做一次同步。
  • 使用固态硬盘而不是机械硬盘。
  • 提前申请全部文件,或者在一个大块中申请,它能让环状缓存区根据需要伸缩。这避免了磁盘碎片并保证了大多数的读取和写入工作是持续的。

  诸如此类的。我不建议把信息存放在数据库中,甚至是”快速”的key/value数据库,除非你非常喜欢一个特定的数据库并且不用担心性能上的问题。相比如在原始的磁盘文件操作,在数据库上操作需要更多的抽象化设计和上千倍的操作时间。

  如果你想把Titanic设计的更可靠,把请求备份到第二个server中,该server远到不会受到在第一个server的核爆炸的影响,但也需要不能造成太多延迟。

  如果你想把Titanic设计的更快但少些可靠性,只把请求和回复都放在内存中。这会给你一个面向非连接的服务,但如果Titanic server本身挂掉的话,那些请求就没办法保存了。

高可靠性组合(双子星模型 Binary Star Pattern)

f52

  双子星模型让两个server成为主-从配套的高可靠性组。在任何时间,这两个中的一个(活跃的那个)接收client程序的连接。另一个(不活跃的那个)什么都不做,但这两个server相互监控着对方。如果活跃着的那个掉线了,一段特定的时间之后不活跃的那个就会接手。

  我们在iMatix为我们的OpenAMQ server开发了双子星模型。我们设计它是为了:

  • 提供一种直接的高可靠性解决方案。
  • 理解和使用都足够简单。
  • 在且仅在故障的时候才使用。

  假设我们的双子星模型正在运行,下面是一些故障中可能出现的情景:

  • 运行主server的硬件发生了致命错误(断电,火灾或者某人不小心拔了插头),然后掉线了。应用程序会发现,然后重连到备用server。
  • 主server所在的网络挂掉了——可能是路由遭雷击了——然后应用程序会重连到备用server上。
  • 主server崩溃或者被管理员关掉了,并且没有自动重启。

f53

  从故障中恢复工作的步骤如下:

  • 管理员重启主server并且修复让它掉线的原因。
  • 管理员在某个时候关掉备用server,这会对应用造成一些混乱。
  • 当应用程序重连到主server之后,管理员重启备用server。

  恢复(让主server成为活跃的那个)是人工操作。痛苦的经历教会了我们不要奢望自动恢复,原因有以下几个:

  • 故障转移会造成应用的服务中断,大约持续10-30秒。如果有个真正紧急的任务,这要远远好过服务完全挂掉。但如果回复需要花费另外的10-30秒,这最好发生在谷期,这时候用户都没在线。
  • 如果有个紧急任务,首要任务当然是试着修复故障。自动恢复会造成不确定性,如果系统管理员没有双向检查,会无法确定哪个server是活跃的。
  • 自动回复会发生在网络故障转移然后回复的时候,这会让管理员无法确定发生了什么。服务会有一段时间的中断,但原因不清楚。

  如上所说,双子星模型会让备用server失效,然后故障转移到主server(已经重新运行了的)上。事实上,这就是我们怎么处理故障恢复的。

  关闭双子星进程有两种办法:

  • 先停掉备用server,然后在之后的某个时间关闭活跃的主server,或者
  • 按任意顺序停掉两个server,但停掉一个之后等些时间再去停另一个。

  先停掉活跃的哪个,然后是备用的server,会让应用程序断线,然后重连,然后又断线,这会让用户感到迷惑。

更细节的要求

  双子星非常简单,但仍能准确的工作。事实上,现在这个设计是第三个完整的重构。每个先前的设计都过于复杂,想做的事过于多,我们就开始删减功能,直到变成这个容易理解使用也足够健壮的版本。

  下面是我们对一个高健壮性框架的一些要求:

  • 该故障转移是针对灾难性的系统故障来说的,比如硬件挂掉,火灾,事故等。从普通的servcer挂掉恢复过来已经有了很多简单的方法,我们之前已经说过了。
  • 故障转移的时间应该低于60秒,最好是低于10秒
  • 故障转移必须自动运行,同时恢复必须手动进行。我们希望应用程序能自动转移到备份server上去,但我们不希望在管理员解决了问题之前系统自动切换到主server上去。
  • client的逻辑必须简单,让开发者容易理解。理想的情况是,它们应该隐藏在client API之后。
  • 在如何避免导致人格分裂症(双子星组中两个server都认为它们是活跃的server)的问题上要有清楚的网络架构设计的说明书。
  • 这两个server的启动顺序应该没有依赖关系。
  • 必须有如何停掉和重启任何一个server而不中断服务的计划(尽管client应用需要被迫重连)。
  • 管理员必须能全天候监控两个server。
  • 必须能把两个server用专用的高速网络连接起来。也就是说,故障转移同步操作必须去使用一个专门的IP路由。

  下面是一些假设:

  • 一个单独的备份server就足够了,不需要多级备份。
  • 主从server对负载的处理能力是一样的,我们不需要设计负载均衡机制。
  • 在所有时间段内需要供应一个完备冗余的备份server。

  我们不准备考虑到以下东西:

  • 活动的备份server的使用或者负载均衡机制。在双子星组中,备份server是不活跃的,在主server失效之前不会做任何事。
  • 任何方式的持久化信息或事务的处理。我们假设server或双子星组所在的网络是不可靠的(也可能是不可信任的)。
  • 任何形式的网络自动探测。双子星组是人工操作的并且在网络中明确定义的,是被应用程序知道的东西(至少是在它们的配置数据中)。
  • 在server之间的状态或信息的复制。所有服务器端的状态在服务器挂掉的时候都必须由客户端重新创建。

  下面是我们在双子星模型中使用的术语:

  • 主要的 Primary: 通常或者启动时是活跃的那个server。
  • 备份的 Backup: 通常是不活跃的那个server。它只在主server从网络中消失和client应用要求连接备份server的时候才变成活跃的。
  • 活跃的 Active: 接收client连接的server,至少会有一个活跃的server。
  • 不活跃的 Passive: 如果活跃的server掉线不活跃的会接管过来。注意在一个双子星组正常运行的时候,主server是活跃的,备份server是不活跃的。当故障转移发生的时候,它们的角色就换一下。

  要配置一个双子星组,你需要:

  • 告诉主srever备份的server的位置。
  • 告诉备份server主server的位置。
  • 可选,调整故障转移响应时间一致,这对两个server来说必须是相同的。

  协调的主要部分是你想让serve多久检查一次它们对端的状态,和多快激活故障转移。在我们的例子中,故障转移超时时间的默认值是2000毫秒。如果你减少了这个时间,备份server可能会更快的接管失效的主server的工作,但也可能在主server能自己恢复过来的时候接管。例如,你可能把主server包装在了一个shell脚本中,在server挂掉的时候使用脚本重启它。在这个例子中,超时时间应该高于重启主server需要的时间。

  对client来说要跟双子星组配合工作,它们必须:

  1. 知道两个server的地址。
  2. 试着连接主server,如果失败,再去连接备份server。
  3. 检测失败的连接,主要使用心跳。
  4. 试着重连主server,然后才是备份server(按顺序),中间的延迟至少是服务器进行故障转移所需要的时间。
  5. 重建它们在server上所需要的所有状态。
  6. 如果需要可靠的消息机制,重发故障转移时间内丢失的数据。

  这不是个小工作,我们通常会把这些封装到一个API中,把它从实际应用代码中隔离开。

  双子星模型主要有以下限制:

  • 一个server进程不能是多于1个双子星组的一部分。
  • 一个主server只能有一个单独的备份server,不能多。
  • 不活跃的server没有做什么有用的工作,因此是浪费的。
  • 备份server必须能够处理所有的负载。
  • 故障转移不能在运行时配置。
  • client必须做同样的任务,这样才能从故障转移中获益。

防止人格分裂症

  人格分裂症发生在一个集群的不同部分在同一时间都认为它们自己是活跃的部分。它会让应用程序停止相互监测。双子星有个算法用于探测和消除分裂,它依赖于一个三方决策机制(一个server只会在它得到客户端连接请求和看不到同组server的时候才会变成活跃的)。

  但是,仍然可能(错误的)设计一个网络去糊弄该算法。一个典型的场景是,双子星组分布在两栋建筑中,每栋建筑中都有一套client应用并且在两栋之间只有一个单独的网络连接。切断该连接会形成两套client应用,每套有一半的双子星组,并且每个故障转移server都会成为活跃的。

  要防止分裂的情况,我们使用一个专用的网络来连接双子星组,简单的可以把它们插在一个相同的交换机上,或者复杂点儿就用线缆直接连接两台机器。

  一套更复杂的网络配置会使用两个独立的集群连接线路,而不是一个。更近一步的,用于集群之间连接的网卡应该跟消息传输通道使用的网卡区别开来,甚至部署在服务器硬件上的不同通道上。目标是把网络上的故障跟集群本身的故障隔离开。网络端口会有相对比较高的故障率。

双子星实现

  不需要再说更多的东西了,这里有个双子星服务器的概念性实现。主server和备份server跑的是相同的代码,在运行的时候你确定它们的角色:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
//  bstarsrv: Binary Star server in C
// Binary Star server proof-of-concept implementation. This server does no
// real work; it just demonstrates the Binary Star failover model.

#include "czmq.h"

// States we can be in at any point in time
typedef enum {
STATE_PRIMARY = 1, // Primary, waiting for peer to connect
STATE_BACKUP = 2, // Backup, waiting for peer to connect
STATE_ACTIVE = 3, // Active - accepting connections
STATE_PASSIVE = 4 // Passive - not accepting connections
} state_t;

// Events, which start with the states our peer can be in
typedef enum {
PEER_PRIMARY = 1, // HA peer is pending primary
PEER_BACKUP = 2, // HA peer is pending backup
PEER_ACTIVE = 3, // HA peer is active
PEER_PASSIVE = 4, // HA peer is passive
CLIENT_REQUEST = 5 // Client makes request
} event_t;

// Our finite state machine
typedef struct {
state_t state; // Current state
event_t event; // Current event
int64_t peer_expiry; // When peer is considered 'dead'
} bstar_t;

// We send state information this often
// If peer doesn't respond in two heartbeats, it is 'dead'
#define HEARTBEAT 1000 // In msecs

// The heart of the Binary Star design is its finite-state machine (FSM).
// The FSM runs one event at a time. We apply an event to the current state,
// which checks if the event is accepted, and if so, sets a new state:

static bool
s_state_machine (bstar_t *fsm)
{
bool exception = false;

// These are the PRIMARY and BACKUP states; we're waiting to become
// ACTIVE or PASSIVE depending on events we get from our peer:
if (fsm->state == STATE_PRIMARY) {
if (fsm->event == PEER_BACKUP) {
printf ("I: connected to backup (passive), ready active\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_ACTIVE) {
printf ("I: connected to backup (active), ready passive\n");
fsm->state = STATE_PASSIVE;
}
// Accept client connections
}
else
if (fsm->state == STATE_BACKUP) {
if (fsm->event == PEER_ACTIVE) {
printf ("I: connected to primary (active), ready passive\n");
fsm->state = STATE_PASSIVE;
}
else
// Reject client connections when acting as backup
if (fsm->event == CLIENT_REQUEST)
exception = true;
}
else
// These are the ACTIVE and PASSIVE states:

if (fsm->state == STATE_ACTIVE) {
if (fsm->event == PEER_ACTIVE) {
// Two actives would mean split-brain
printf ("E: fatal error - dual actives, aborting\n");
exception = true;
}
}
else
// Server is passive
// CLIENT_REQUEST events can trigger failover if peer looks dead
if (fsm->state == STATE_PASSIVE) {
if (fsm->event == PEER_PRIMARY) {
// Peer is restarting - become active, peer will go passive
printf ("I: primary (passive) is restarting, ready active\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_BACKUP) {
// Peer is restarting - become active, peer will go passive
printf ("I: backup (passive) is restarting, ready active\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_PASSIVE) {
// Two passives would mean cluster would be non-responsive
printf ("E: fatal error - dual passives, aborting\n");
exception = true;
}
else
if (fsm->event == CLIENT_REQUEST) {
// Peer becomes active if timeout has passed
// It's the client request that triggers the failover
assert (fsm->peer_expiry > 0);
if (zclock_time () >= fsm->peer_expiry) {
// If peer is dead, switch to the active state
printf ("I: failover successful, ready active\n");
fsm->state = STATE_ACTIVE;
}
else
// If peer is alive, reject connections
exception = true;
}
}
return exception;
}

// This is our main task. First we bind/connect our sockets with our
// peer and make sure we will get state messages correctly. We use
// three sockets; one to publish state, one to subscribe to state, and
// one for client requests/replies:

int main (int argc, char *argv [])
{
// Arguments can be either of:
// -p primary server, at tcp://localhost:5001
// -b backup server, at tcp://localhost:5002
zctx_t *ctx = zctx_new ();
void *statepub = zsocket_new (ctx, ZMQ_PUB);
void *statesub = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (statesub, "");
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
bstar_t fsm = { 0 };

if (argc == 2 && streq (argv [1], "-p")) {
printf ("I: Primary active, waiting for backup (passive)\n");
zsocket_bind (frontend, "tcp://*:5001");
zsocket_bind (statepub, "tcp://*:5003");
zsocket_connect (statesub, "tcp://localhost:5004");
fsm.state = STATE_PRIMARY;
}
else
if (argc == 2 && streq (argv [1], "-b")) {
printf ("I: Backup passive, waiting for primary (active)\n");
zsocket_bind (frontend, "tcp://*:5002");
zsocket_bind (statepub, "tcp://*:5004");
zsocket_connect (statesub, "tcp://localhost:5003");
fsm.state = STATE_BACKUP;
}
else {
printf ("Usage: bstarsrv { -p | -b }\n");
zctx_destroy (&ctx);
exit (0);
}
// We now process events on our two input sockets, and process these
// events one at a time via our finite-state machine. Our "work" for
// a client request is simply to echo it back:

// Set timer for next outgoing state message
int64_t send_state_at = zclock_time () + HEARTBEAT;
while (!zctx_interrupted) {
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ statesub, 0, ZMQ_POLLIN, 0 }
};
int time_left = (int) ((send_state_at - zclock_time ()));
if (time_left < 0)
time_left = 0;
int rc = zmq_poll (items, 2, time_left * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Context has been shut down

if (items [0].revents & ZMQ_POLLIN) {
// Have a client request
zmsg_t *msg = zmsg_recv (frontend);
fsm.event = CLIENT_REQUEST;
if (s_state_machine (&fsm) == false)
// Answer client by echoing request back
zmsg_send (&msg, frontend);
else
zmsg_destroy (&msg);
}
if (items [1].revents & ZMQ_POLLIN) {
// Have state from our peer, execute as event
char *message = zstr_recv (statesub);
fsm.event = atoi (message);
free (message);
if (s_state_machine (&fsm))
break; // Error, so exit
fsm.peer_expiry = zclock_time () + 2 * HEARTBEAT;
}
// If we timed out, send state to peer
if (zclock_time () >= send_state_at) {
char message [2];
sprintf (message, "%d", fsm.state);
zstr_send (statepub, message);
send_state_at = zclock_time () + HEARTBEAT;
}
}
if (zctx_interrupted)
printf ("W: interrupted\n");

// Shutdown sockets and context
zctx_destroy (&ctx);
return 0;
}

  下面是client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//  bstarcli: Binary Star client in C
// Binary Star client proof-of-concept implementation. This client does no
// real work; it just demonstrates the Binary Star failover model.

#include "czmq.h"
#define REQUEST_TIMEOUT 1000 // msecs
#define SETTLE_DELAY 2000 // Before failing over

int main (void)
{
zctx_t *ctx = zctx_new ();

char *server [] = { "tcp://localhost:5001", "tcp://localhost:5002" };
uint server_nbr = 0;

printf ("I: connecting to server at %s…\n", server [server_nbr]);
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, server [server_nbr]);

int sequence = 0;
while (!zctx_interrupted) {
// We send a request, then we work to get a reply
char request [10];
sprintf (request, "%d", ++sequence);
zstr_send (client, request);

int expect_reply = 1;
while (expect_reply) {
// Poll socket for a reply, with timeout
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

// We use a Lazy Pirate strategy in the client. If there's no
// reply within our timeout, we close the socket and try again.
// In Binary Star, it's the client vote that decides which
// server is primary; the client must therefore try to connect
// to each server in turn:

if (items [0].revents & ZMQ_POLLIN) {
// We got a reply from the server, must match sequence
char *reply = zstr_recv (client);
if (atoi (reply) == sequence) {
printf ("I: server replied OK (%s)\n", reply);
expect_reply = 0;
sleep (1); // One request per second
}
else
printf ("E: bad reply from server: %s\n", reply);
free (reply);
}
else {
printf ("W: no response from server, failing over\n");

// Old socket is confused; close it and open a new one
zsocket_destroy (ctx, client);
server_nbr = (server_nbr + 1) % 2;
zclock_sleep (SETTLE_DELAY);
printf ("I: connecting to server at %s…\n",
server [server_nbr]);
client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, server [server_nbr]);

// Send request again, on new socket
zstr_send (client, request);
}
}
}
zctx_destroy (&ctx);
return 0;
}

  要测试双子星,按任意顺序启动server和client:

1
2
3
bstarsrv -p     # Start primary
bstarsrv -b # Start backup
bstarcli

  你可以通过关掉主server,然后重启主server并关掉备份server来证明故障转移是否实现。注意clietn是怎么确定触发故障转移然后恢复的。

  双子星是靠一个无限状态机驱动的。事件是对端的状态,因此”对端活跃”意味着其他server已经告诉我们它是活跃的。”client请求”意味着我们收到了一个client请求。”client 投票”意味着我们收到了一个clietn请求并且我们的对端在两个心跳时间内不是活跃的。

  注意server使用PUB-SUB socket进行状态交换。没有其他的socket组合能用在这里。PUSH和DEALER如果在没有对端接收数据的时候会阻塞。PAIR在对端掉线并重新上线的时候不会进行重连。ROUTER需要在发送信息之前就有对端的地址。

f54

双子星反应器

  把双子星包装起来作为一个可复用的反应器类是足够通用的。不管什么时候反应器收到一条message都会去运行我们的代码去处理。这要比把双子星代码嵌入每个我们想适配的服务器要优雅的多。

  在C中,我们以前看到过CZMQ包装的zloop类。zloop能让你注册一个socket和定时器时间的句柄。在双子星反应器中,我们为投票者和状态改变(活跃的变成不活跃的,或者反过来)提供处理句柄。下面是bstar的API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
//  bstar class - Binary Star reactor

#include "bstar.h"

// States we can be in at any point in time
typedef enum {
STATE_PRIMARY = 1, // Primary, waiting for peer to connect
STATE_BACKUP = 2, // Backup, waiting for peer to connect
STATE_ACTIVE = 3, // Active - accepting connections
STATE_PASSIVE = 4 // Passive - not accepting connections
} state_t;

// Events, which start with the states our peer can be in
typedef enum {
PEER_PRIMARY = 1, // HA peer is pending primary
PEER_BACKUP = 2, // HA peer is pending backup
PEER_ACTIVE = 3, // HA peer is active
PEER_PASSIVE = 4, // HA peer is passive
CLIENT_REQUEST = 5 // Client makes request
} event_t;

// Structure of our class

struct _bstar_t {
zctx_t *ctx; // Our private context
zloop_t *loop; // Reactor loop
void *statepub; // State publisher
void *statesub; // State subscriber
state_t state; // Current state
event_t event; // Current event
int64_t peer_expiry; // When peer is considered 'dead'
zloop_fn *voter_fn; // Voting socket handler
void *voter_arg; // Arguments for voting handler
zloop_fn *active_fn; // Call when become active
void *active_arg; // Arguments for handler
zloop_fn *passive_fn; // Call when become passive
void *passive_arg; // Arguments for handler
};

// The finite-state machine is the same as in the proof-of-concept server.
// To understand this reactor in detail, first read the CZMQ zloop class.

// We send state information every this often
// If peer doesn't respond in two heartbeats, it is 'dead'
#define BSTAR_HEARTBEAT 1000 // In msecs

// Binary Star finite state machine (applies event to state)
// Returns -1 if there was an exception, 0 if event was valid.

static int
s_execute_fsm (bstar_t *self)
{
int rc = 0;
// Primary server is waiting for peer to connect
// Accepts CLIENT_REQUEST events in this state
if (self->state == STATE_PRIMARY) {
if (self->event == PEER_BACKUP) {
zclock_log ("I: connected to backup (passive), ready as active");
self->state = STATE_ACTIVE;
if (self->active_fn)
(self->active_fn) (self->loop, NULL, self->active_arg);
}
else
if (self->event == PEER_ACTIVE) {
zclock_log ("I: connected to backup (active), ready as passive");
self->state = STATE_PASSIVE;
if (self->passive_fn)
(self->passive_fn) (self->loop, NULL, self->passive_arg);
}
else
if (self->event == CLIENT_REQUEST) {
// Allow client requests to turn us into the active if we've
// waited sufficiently long to believe the backup is not
// currently acting as active (i.e., after a failover)
assert (self->peer_expiry > 0);
if (zclock_time () >= self->peer_expiry) {
zclock_log ("I: request from client, ready as active");
self->state = STATE_ACTIVE;
if (self->active_fn)
(self->active_fn) (self->loop, NULL, self->active_arg);
} else
// Don't respond to clients yet - it's possible we're
// performing a failback and the backup is currently active
rc = -1;
}
}
else
// Backup server is waiting for peer to connect
// Rejects CLIENT_REQUEST events in this state
if (self->state == STATE_BACKUP) {
if (self->event == PEER_ACTIVE) {
zclock_log ("I: connected to primary (active), ready as passive");
self->state = STATE_PASSIVE;
if (self->passive_fn)
(self->passive_fn) (self->loop, NULL, self->passive_arg);
}
else
if (self->event == CLIENT_REQUEST)
rc = -1;
}
else
// Server is active
// Accepts CLIENT_REQUEST events in this state
// The only way out of ACTIVE is death
if (self->state == STATE_ACTIVE) {
if (self->event == PEER_ACTIVE) {
// Two actives would mean split-brain
zclock_log ("E: fatal error - dual actives, aborting");
rc = -1;
}
}
else
// Server is passive
// CLIENT_REQUEST events can trigger failover if peer looks dead
if (self->state == STATE_PASSIVE) {
if (self->event == PEER_PRIMARY) {
// Peer is restarting - become active, peer will go passive
zclock_log ("I: primary (passive) is restarting, ready as active");
self->state = STATE_ACTIVE;
}
else
if (self->event == PEER_BACKUP) {
// Peer is restarting - become active, peer will go passive
zclock_log ("I: backup (passive) is restarting, ready as active");
self->state = STATE_ACTIVE;
}
else
if (self->event == PEER_PASSIVE) {
// Two passives would mean cluster would be non-responsive
zclock_log ("E: fatal error - dual passives, aborting");
rc = -1;
}
else
if (self->event == CLIENT_REQUEST) {
// Peer becomes active if timeout has passed
// It's the client request that triggers the failover
assert (self->peer_expiry > 0);
if (zclock_time () >= self->peer_expiry) {
// If peer is dead, switch to the active state
zclock_log ("I: failover successful, ready as active");
self->state = STATE_ACTIVE;
}
else
// If peer is alive, reject connections
rc = -1;
}
// Call state change handler if necessary
if (self->state == STATE_ACTIVE && self->active_fn)
(self->active_fn) (self->loop, NULL, self->active_arg);
}
return rc;
}

static void
s_update_peer_expiry (bstar_t *self)
{
self->peer_expiry = zclock_time () + 2 * BSTAR_HEARTBEAT;
}

// Reactor event handlers…

// Publish our state to peer
int s_send_state (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
bstar_t *self = (bstar_t *) arg;
zstr_send (self->statepub, "%d", self->state);
return 0;
}

// Receive state from peer, execute finite state machine
int s_recv_state (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
bstar_t *self = (bstar_t *) arg;
char *state = zstr_recv (poller->socket);
if (state) {
self->event = atoi (state);
s_update_peer_expiry (self);
free (state);
}
return s_execute_fsm (self);
}

// Application wants to speak to us, see if it's possible
int s_voter_ready (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
bstar_t *self = (bstar_t *) arg;
// If server can accept input now, call appl handler
self->event = CLIENT_REQUEST;
if (s_execute_fsm (self) == 0)
(self->voter_fn) (self->loop, poller, self->voter_arg);
else {
// Destroy waiting message, no-one to read it
zmsg_t *msg = zmsg_recv (poller->socket);
zmsg_destroy (&msg);
}
return 0;
}

// This is the constructor for our bstar class. We have to tell it
// whether we're primary or backup server, as well as our local and
// remote endpoints to bind and connect to:

bstar_t *
bstar_new (int primary, char *local, char *remote)
{
bstar_t
*self;

self = (bstar_t *) zmalloc (sizeof (bstar_t));

// Initialize the Binary Star
self->ctx = zctx_new ();
self->loop = zloop_new ();
self->state = primary? STATE_PRIMARY: STATE_BACKUP;

// Create publisher for state going to peer
self->statepub = zsocket_new (self->ctx, ZMQ_PUB);
zsocket_bind (self->statepub, local);

// Create subscriber for state coming from peer
self->statesub = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_set_subscribe (self->statesub, "");
zsocket_connect (self->statesub, remote);

// Set-up basic reactor events
zloop_timer (self->loop, BSTAR_HEARTBEAT, 0, s_send_state, self);
zmq_pollitem_t poller = { self->statesub, 0, ZMQ_POLLIN };
zloop_poller (self->loop, &poller, s_recv_state, self);
return self;
}

// The destructor shuts down the bstar reactor:

void
bstar_destroy (bstar_t **self_p)
{
assert (self_p);
if (*self_p) {
bstar_t *self = *self_p;
zloop_destroy (&self->loop);
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}

// This method returns the underlying zloop reactor, so we can add
// additional timers and readers:

zloop_t *
bstar_zloop (bstar_t *self)
{
return self->loop;
}

// This method registers a client voter socket. Messages received
// on this socket provide the CLIENT_REQUEST events for the Binary Star
// FSM and are passed to the provided application handler. We require
// exactly one voter per bstar instance:

int
bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler,
void *arg)
{
// Hold actual handler+arg so we can call this later
void *socket = zsocket_new (self->ctx, type);
zsocket_bind (socket, endpoint);
assert (!self->voter_fn);
self->voter_fn = handler;
self->voter_arg = arg;
zmq_pollitem_t poller = { socket, 0, ZMQ_POLLIN };
return zloop_poller (self->loop, &poller, s_voter_ready, self);
}

// Register handlers to be called each time there's a state change:

void
bstar_new_active (bstar_t *self, zloop_fn handler, void *arg)
{
assert (!self->active_fn);
self->active_fn = handler;
self->active_arg = arg;
}

void
bstar_new_passive (bstar_t *self, zloop_fn handler, void *arg)
{
assert (!self->passive_fn);
self->passive_fn = handler;
self->passive_arg = arg;
}

// Enable/disable verbose tracing, for debugging:

void bstar_set_verbose (bstar_t *self, bool verbose)
{
zloop_set_verbose (self->loop, verbose);
}

// Finally, start the configured reactor. It will end if any handler
// returns -1 to the reactor, or if the process receives SIGINT or SIGTERM:

int
bstar_start (bstar_t *self)
{
assert (self->voter_fn);
s_update_peer_expiry (self);
return zloop_start (self->loop);
}

  下面是这个类的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
//  bstar: Binary Star core class in C
// bstar class - Binary Star reactor

#include "bstar.h"

// States we can be in at any point in time
typedef enum {
STATE_PRIMARY = 1, // Primary, waiting for peer to connect
STATE_BACKUP = 2, // Backup, waiting for peer to connect
STATE_ACTIVE = 3, // Active - accepting connections
STATE_PASSIVE = 4 // Passive - not accepting connections
} state_t;

// Events, which start with the states our peer can be in
typedef enum {
PEER_PRIMARY = 1, // HA peer is pending primary
PEER_BACKUP = 2, // HA peer is pending backup
PEER_ACTIVE = 3, // HA peer is active
PEER_PASSIVE = 4, // HA peer is passive
CLIENT_REQUEST = 5 // Client makes request
} event_t;

// Structure of our class

struct _bstar_t {
zctx_t *ctx; // Our private context
zloop_t *loop; // Reactor loop
void *statepub; // State publisher
void *statesub; // State subscriber
state_t state; // Current state
event_t event; // Current event
int64_t peer_expiry; // When peer is considered 'dead'
zloop_fn *voter_fn; // Voting socket handler
void *voter_arg; // Arguments for voting handler
zloop_fn *active_fn; // Call when become active
void *active_arg; // Arguments for handler
zloop_fn *passive_fn; // Call when become passive
void *passive_arg; // Arguments for handler
};

// The finite-state machine is the same as in the proof-of-concept server.
// To understand this reactor in detail, first read the CZMQ zloop class.

// We send state information every this often
// If peer doesn't respond in two heartbeats, it is 'dead'
#define BSTAR_HEARTBEAT 1000 // In msecs

// Binary Star finite state machine (applies event to state)
// Returns -1 if there was an exception, 0 if event was valid.

static int
s_execute_fsm (bstar_t *self)
{
int rc = 0;
// Primary server is waiting for peer to connect
// Accepts CLIENT_REQUEST events in this state
if (self->state == STATE_PRIMARY) {
if (self->event == PEER_BACKUP) {
zclock_log ("I: connected to backup (passive), ready as active");
self->state = STATE_ACTIVE;
if (self->active_fn)
(self->active_fn) (self->loop, NULL, self->active_arg);
}
else
if (self->event == PEER_ACTIVE) {
zclock_log ("I: connected to backup (active), ready as passive");
self->state = STATE_PASSIVE;
if (self->passive_fn)
(self->passive_fn) (self->loop, NULL, self->passive_arg);
}
else
if (self->event == CLIENT_REQUEST) {
// Allow client requests to turn us into the active if we've
// waited sufficiently long to believe the backup is not
// currently acting as active (i.e., after a failover)
assert (self->peer_expiry > 0);
if (zclock_time () >= self->peer_expiry) {
zclock_log ("I: request from client, ready as active");
self->state = STATE_ACTIVE;
if (self->active_fn)
(self->active_fn) (self->loop, NULL, self->active_arg);
} else
// Don't respond to clients yet - it's possible we're
// performing a failback and the backup is currently active
rc = -1;
}
}
else
// Backup server is waiting for peer to connect
// Rejects CLIENT_REQUEST events in this state
if (self->state == STATE_BACKUP) {
if (self->event == PEER_ACTIVE) {
zclock_log ("I: connected to primary (active), ready as passive");
self->state = STATE_PASSIVE;
if (self->passive_fn)
(self->passive_fn) (self->loop, NULL, self->passive_arg);
}
else
if (self->event == CLIENT_REQUEST)
rc = -1;
}
else
// Server is active
// Accepts CLIENT_REQUEST events in this state
// The only way out of ACTIVE is death
if (self->state == STATE_ACTIVE) {
if (self->event == PEER_ACTIVE) {
// Two actives would mean split-brain
zclock_log ("E: fatal error - dual actives, aborting");
rc = -1;
}
}
else
// Server is passive
// CLIENT_REQUEST events can trigger failover if peer looks dead
if (self->state == STATE_PASSIVE) {
if (self->event == PEER_PRIMARY) {
// Peer is restarting - become active, peer will go passive
zclock_log ("I: primary (passive) is restarting, ready as active");
self->state = STATE_ACTIVE;
}
else
if (self->event == PEER_BACKUP) {
// Peer is restarting - become active, peer will go passive
zclock_log ("I: backup (passive) is restarting, ready as active");
self->state = STATE_ACTIVE;
}
else
if (self->event == PEER_PASSIVE) {
// Two passives would mean cluster would be non-responsive
zclock_log ("E: fatal error - dual passives, aborting");
rc = -1;
}
else
if (self->event == CLIENT_REQUEST) {
// Peer becomes active if timeout has passed
// It's the client request that triggers the failover
assert (self->peer_expiry > 0);
if (zclock_time () >= self->peer_expiry) {
// If peer is dead, switch to the active state
zclock_log ("I: failover successful, ready as active");
self->state = STATE_ACTIVE;
}
else
// If peer is alive, reject connections
rc = -1;
}
// Call state change handler if necessary
if (self->state == STATE_ACTIVE && self->active_fn)
(self->active_fn) (self->loop, NULL, self->active_arg);
}
return rc;
}

static void
s_update_peer_expiry (bstar_t *self)
{
self->peer_expiry = zclock_time () + 2 * BSTAR_HEARTBEAT;
}

// Reactor event handlers…

// Publish our state to peer
int s_send_state (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
bstar_t *self = (bstar_t *) arg;
zstr_send (self->statepub, "%d", self->state);
return 0;
}

// Receive state from peer, execute finite state machine
int s_recv_state (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
bstar_t *self = (bstar_t *) arg;
char *state = zstr_recv (poller->socket);
if (state) {
self->event = atoi (state);
s_update_peer_expiry (self);
free (state);
}
return s_execute_fsm (self);
}

// Application wants to speak to us, see if it's possible
int s_voter_ready (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
bstar_t *self = (bstar_t *) arg;
// If server can accept input now, call appl handler
self->event = CLIENT_REQUEST;
if (s_execute_fsm (self) == 0)
(self->voter_fn) (self->loop, poller, self->voter_arg);
else {
// Destroy waiting message, no-one to read it
zmsg_t *msg = zmsg_recv (poller->socket);
zmsg_destroy (&msg);
}
return 0;
}

// This is the constructor for our bstar class. We have to tell it
// whether we're primary or backup server, as well as our local and
// remote endpoints to bind and connect to:

bstar_t *
bstar_new (int primary, char *local, char *remote)
{
bstar_t
*self;

self = (bstar_t *) zmalloc (sizeof (bstar_t));

// Initialize the Binary Star
self->ctx = zctx_new ();
self->loop = zloop_new ();
self->state = primary? STATE_PRIMARY: STATE_BACKUP;

// Create publisher for state going to peer
self->statepub = zsocket_new (self->ctx, ZMQ_PUB);
zsocket_bind (self->statepub, local);

// Create subscriber for state coming from peer
self->statesub = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_set_subscribe (self->statesub, "");
zsocket_connect (self->statesub, remote);

// Set-up basic reactor events
zloop_timer (self->loop, BSTAR_HEARTBEAT, 0, s_send_state, self);
zmq_pollitem_t poller = { self->statesub, 0, ZMQ_POLLIN };
zloop_poller (self->loop, &poller, s_recv_state, self);
return self;
}

// The destructor shuts down the bstar reactor:

void
bstar_destroy (bstar_t **self_p)
{
assert (self_p);
if (*self_p) {
bstar_t *self = *self_p;
zloop_destroy (&self->loop);
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}

// This method returns the underlying zloop reactor, so we can add
// additional timers and readers:

zloop_t *
bstar_zloop (bstar_t *self)
{
return self->loop;
}

// This method registers a client voter socket. Messages received
// on this socket provide the CLIENT_REQUEST events for the Binary Star
// FSM and are passed to the provided application handler. We require
// exactly one voter per bstar instance:

int
bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler,
void *arg)
{
// Hold actual handler+arg so we can call this later
void *socket = zsocket_new (self->ctx, type);
zsocket_bind (socket, endpoint);
assert (!self->voter_fn);
self->voter_fn = handler;
self->voter_arg = arg;
zmq_pollitem_t poller = { socket, 0, ZMQ_POLLIN };
return zloop_poller (self->loop, &poller, s_voter_ready, self);
}

// Register handlers to be called each time there's a state change:

void
bstar_new_active (bstar_t *self, zloop_fn handler, void *arg)
{
assert (!self->active_fn);
self->active_fn = handler;
self->active_arg = arg;
}

void
bstar_new_passive (bstar_t *self, zloop_fn handler, void *arg)
{
assert (!self->passive_fn);
self->passive_fn = handler;
self->passive_arg = arg;
}

// Enable/disable verbose tracing, for debugging:

void bstar_set_verbose (bstar_t *self, bool verbose)
{
zloop_set_verbose (self->loop, verbose);
}

// Finally, start the configured reactor. It will end if any handler
// returns -1 to the reactor, or if the process receives SIGINT or SIGTERM:

int
bstar_start (bstar_t *self)
{
assert (self->voter_fn);
s_update_peer_expiry (self);
return zloop_start (self->loop);
}

  这给了我们下面这个简短的主程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// bstarsrv2: Bianry Star server, using core class in C
// Binary Star server, using bstar reactor

// Lets us build this source without creating a library
#include "bstar.c"

// Echo service
int s_echo (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
zmsg_t *msg = zmsg_recv (poller->socket);
zmsg_send (&msg, poller->socket);
return 0;
}

int main (int argc, char *argv [])
{
// Arguments can be either of:
// -p primary server, at tcp://localhost:5001
// -b backup server, at tcp://localhost:5002
bstar_t *bstar;
if (argc == 2 && streq (argv [1], "-p")) {
printf ("I: Primary active, waiting for backup (passive)\n");
bstar = bstar_new (BSTAR_PRIMARY,
"tcp://*:5003", "tcp://localhost:5004");
bstar_voter (bstar, "tcp://*:5001", ZMQ_ROUTER, s_echo, NULL);
}
else
if (argc == 2 && streq (argv [1], "-b")) {
printf ("I: Backup passive, waiting for primary (active)\n");
bstar = bstar_new (BSTAR_BACKUP,
"tcp://*:5004", "tcp://localhost:5003");
bstar_voter (bstar, "tcp://*:5002", ZMQ_ROUTER, s_echo, NULL);
}
else {
printf ("Usage: bstarsrvs { -p | -b }\n");
exit (0);
}
bstar_start (bstar);
bstar_destroy (&bstar);
return 0;
}

无中介的可靠模型(自由者模型 Freelance Pattern)

  我们经常解释zmq是”无中介的消息传输框架”,但关注很多以中介为基础的可靠性看起来很讽刺。然而,在消息传输中,就像在现实中一样,中介既是负担也是好东西。在实践中,大多数消息传输框架会从分布式和中间件传输的混合使用中获益。当你能自由根据你想要做的东西权衡利弊后会得到最好的结果。这也就是为什么我可以开两个小时的车去批发商那儿买5箱酒来准备晚会,但也能走10分钟到街角商店买一瓶在晚饭时候喝。我们对时间、能源和开销的评估对现实世界的经济活动很重要。它们对一个最佳的消息基础框架也非常重要。

  这也是zmq为什么不强制使用中间件为中心的架构,尽管它给了你构建中介的工具,叫代理proxy,并且我们也构建了很多或者那么多不同的中间件,只是为了练习。

  因此我们会以解构以中间件为基础的可靠性来结束本章,转向一个我称作自由者模型的分布式的端对端架构。我们的使用场景是一个名字解析服务。这是用zmq框架的一个普遍问题:我们怎么知道该连接的对端呢?把TCP/IP地址硬编码到代码中是非常脆弱的。使用配置文件会让管理员做噩梦的。想想一下如果你必须在使用的每台PC或者移动电话上手动配置去让web浏览器知道”google.com”是”74.125.230.82”的情况。

  一个zmq命名服务(并且我们也会设计一个简单的实现)必须做到下面这些:

  • 把一个逻辑名字解析成至少一个绑定的端口和一个连接端口。一个实际的名字服务必须提供可能多个绑定的端口和多个连接端口。
  • 让我们能管理多个并行的环境,比如”test”和”production”,不用去修改代码。
  • 要可靠,因为如果它不能用,那些应用程序就连不上网了。

  从某种观点来看把命名服务放到一个服务为导向的管家中介之后是很聪明的。但,把它暴露出来让client能直接连接会恒简单和更少意外性。如果我们正确的实现了它,命名服务就会是我们必须硬编码或者放到配置文件里的唯一的全局网络端点地址。

f55

  我们要处理的故障类型是server崩溃或重启,server太忙,server过载和网络问题。要得到可靠性,我们会创建一个名字server池,如果有一个挂掉或掉线了,client能连接到另一个。在实际中,两个就会是足够的。但在例子中,我们会假设池子会是任意大小。

  在这个框架中,一大组client去直接连接一小组server。server会绑定在各自的地址上。它从根本上就跟管家模型那种以中介为基础的实现方式不同,在管家模型那里worker是去连接中介的。clietn有很多选择:

  • 使用REQ socket和懒海盗模型。简单,但需要点额外的工作,不让client重复连接同一个挂掉的server。
  • 使用DEALER socket和分散请求(用负载均衡发送到所有连接着的server去)直到得到一个回复。高效,但不够优雅。
  • 使用ROUTER socket让clietn能指定特定的server。但怎么让client知道server socket的标识呢?可以让server先去ping client(比较复杂),或者让server使用硬编码的client已知的标识(比较恼人的)。

  我们会在接下来的章节分别开发这几个模型。

模型1:简单重试和故障转移

  因此我们的菜单貌似会简单、粗暴、复杂或者充满坏味道。让我们从简单的开始,一步步实现这个设想。我们把懒海盗模型重写成能跟多个server通信的东西。

  首先启动一个或多个server,在参数中指定一个绑定的地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//  flserver1: Freelance server, Model One in C
// Freelance server - Model 1
// Trivial echo service

#include "czmq.h"

int main (int argc, char *argv [])
{
if (argc < 2) {
printf ("I: syntax: %s <endpoint>\n", argv [0]);
return 0;
}
zctx_t *ctx = zctx_new ();
void *server = zsocket_new (ctx, ZMQ_REP);
zsocket_bind (server, argv [1]);

printf ("I: echo service is ready at %s\n", argv [1]);
while (true) {
zmsg_t *msg = zmsg_recv (server);
if (!msg)
break; // Interrupted
zmsg_send (&msg, server);
}
if (zctx_interrupted)
printf ("W: interrupted\n");

zctx_destroy (&ctx);
return 0;
}

  然后启动client,在参数中指定一个或多个连接地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//  flclient1: Freelance client, Model One in C
// Freelance client - Model 1
// Uses REQ socket to query one or more services

#include "czmq.h"
#define REQUEST_TIMEOUT 1000
#define MAX_RETRIES 3 // Before we abandon

static zmsg_t *
s_try_request (zctx_t *ctx, char *endpoint, zmsg_t *request)
{
printf ("I: trying echo service at %s…\n", endpoint);
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, endpoint);

// Send request, wait safely for reply
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, client);
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
zmsg_t *reply = NULL;
if (items [0].revents & ZMQ_POLLIN)
reply = zmsg_recv (client);

// Close socket in any case, we're done with it now
zsocket_destroy (ctx, client);
return reply;
}

// The client uses a Lazy Pirate strategy if it only has one server to talk
// to. If it has two or more servers to talk to, it will try each server just
// once:

int main (int argc, char *argv [])
{
zctx_t *ctx = zctx_new ();
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "Hello world");
zmsg_t *reply = NULL;

int endpoints = argc - 1;
if (endpoints == 0)
printf ("I: syntax: %s <endpoint> …\n", argv [0]);
else
if (endpoints == 1) {
// For one endpoint, we retry N times
int retries;
for (retries = 0; retries < MAX_RETRIES; retries++) {
char *endpoint = argv [1];
reply = s_try_request (ctx, endpoint, request);
if (reply)
break; // Successful
printf ("W: no response from %s, retrying…\n", endpoint);
}
}
else {
// For multiple endpoints, try each at most once
int endpoint_nbr;
for (endpoint_nbr = 0; endpoint_nbr < endpoints; endpoint_nbr++) {
char *endpoint = argv [endpoint_nbr + 1];
reply = s_try_request (ctx, endpoint, request);
if (reply)
break; // Successful
printf ("W: no response from %s\n", endpoint);
}
}
if (reply)
printf ("Service is running OK\n");

zmsg_destroy (&request);
zmsg_destroy (&reply);
zctx_destroy (&ctx);
return 0;
}

  一个简单的运行例子如下:

1
2
3
flserver1 tcp://*:5555 &
flserver1 tcp://*:5556 &
flclient1 tcp://localhost:5555 tcp://localhost:5556

  尽管是用懒海盗简单实现,但client目标也只是得到一条成功的回复。它有两个技巧,依据你运行的是一个server还是多个server:

  • 运行一个server的话,client会重试好几次,跟懒海盗一样。
  • 运行多个server的话,clietn会在每台server上最多重试1次直到它收到回复或者试过所有的server。

  这解决了懒海盗模型中的主要缺点,就是它不能故障转移到备份或替代服务器上去。

  但是,这样设计在实际应用中并不会工作的很好。如果我们正在连接多个socket并且这时候我们的主命名服务器挂掉的话,就会遭遇难以忍受的超时时间。

模型2:野蛮的霰弹扫射

  让我们的client换成使用DEALER socket。这里我们的目标是保证在尽可能短的时间内得到一条回复,不管特定的server是启动还是挂掉。我们的client会采用如下实现:

  • 把系统运行起来,然后让client连接所有的server。
  • 当我们要发出一个请求,就把它发出去跟服务器个数一样多的次数。
  • 等待第一个回复,并拿到它。
  • 忽略所有其他的回复。

  实际中可能会发生的是当所有的server都在运行的时候,zmq会分发这些请求,因此每个server都会得到一个请求然后发出一个回复。当任何一个server掉线或者断开连接的时候,zmq会把请求分发给余下的server,因此同一个server有可能得到不止一条相同请求。

  对client来说更恼人的是我们会的到多个回复,但并不会保证能得到确定数量的回复。请求和回复都可能丢失(比如妖士server在处理一个请求的时候挂掉了)。

  因此我们必须给请求标号并忽略跟请求的标号不匹配的所有回复。在模型1中的server能工作因为它是一个echo服务器。但对理解系统行为来说巧合并不是件好事。因此我们会让模型2中的server消化收到的message并返回一个带有”OK”内容的正确标号的回复。我们会用一个2帧message:一个帧是序列编号,另一个是回复内容。

  启动一个或多个server,每次都指定一个绑定的地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//  flserver2: Freelance server, Model Two in C
// Freelance server - Model 2
// Does some work, replies OK, with message sequencing

#include "czmq.h"

int main (int argc, char *argv [])
{
if (argc < 2) {
printf ("I: syntax: %s <endpoint>\n", argv [0]);
return 0;
}
zctx_t *ctx = zctx_new ();
void *server = zsocket_new (ctx, ZMQ_REP);
zsocket_bind (server, argv [1]);

printf ("I: service is ready at %s\n", argv [1]);
while (true) {
zmsg_t *request = zmsg_recv (server);
if (!request)
break; // Interrupted
// Fail nastily if run against wrong client
assert (zmsg_size (request) == 2);

zframe_t *identity = zmsg_pop (request);
zmsg_destroy (&request);

zmsg_t *reply = zmsg_new ();
zmsg_add (reply, identity);
zmsg_addstr (reply, "OK");
zmsg_send (&reply, server);
}
if (zctx_interrupted)
printf ("W: interrupted\n");

zctx_destroy (&ctx);
return 0;
}

  然后启动client,在参数中指定连接的地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// flclient2: Freelance client, Model Two in C
// Freelance client - Model 2
// Uses DEALER socket to blast one or more services

#include "czmq.h"

// We design our client API as a class, using the CZMQ style
#ifdef __cplusplus
extern "C" {
#endif

typedef struct _flclient_t flclient_t;
flclient_t *flclient_new (void);
void flclient_destroy (flclient_t **self_p);
void flclient_connect (flclient_t *self, char *endpoint);
zmsg_t *flclient_request (flclient_t *self, zmsg_t **request_p);

#ifdef __cplusplus
}
#endif

// If not a single service replies within this time, give up
#define GLOBAL_TIMEOUT 2500

int main (int argc, char *argv [])
{
if (argc == 1) {
printf ("I: syntax: %s <endpoint> …\n", argv [0]);
return 0;
}
// Create new freelance client object
flclient_t *client = flclient_new ();

// Connect to each endpoint
int argn;
for (argn = 1; argn < argc; argn++)
flclient_connect (client, argv [argn]);

// Send a bunch of name resolution 'requests', measure time
int requests = 10000;
uint64_t start = zclock_time ();
while (requests--) {
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "random name");
zmsg_t *reply = flclient_request (client, &request);
if (!reply) {
printf ("E: name service not available, aborting\n");
break;
}
zmsg_destroy (&reply);
}
printf ("Average round trip cost: %d usec\n",
(int) (zclock_time () - start) / 10);

flclient_destroy (&client);
return 0;
}

// Here is the flclient class implementation. Each instance has a
// context, a DEALER socket it uses to talk to the servers, a counter
// of how many servers it's connected to, and a request sequence number:

struct _flclient_t {
zctx_t *ctx; // Our context wrapper
void *socket; // DEALER socket talking to servers
size_t servers; // How many servers we have connected to
uint sequence; // Number of requests ever sent
};

// Constructor

flclient_t *
flclient_new (void)
{
flclient_t
*self;

self = (flclient_t *) zmalloc (sizeof (flclient_t));
self->ctx = zctx_new ();
self->socket = zsocket_new (self->ctx, ZMQ_DEALER);
return self;
}

// Destructor

void
flclient_destroy (flclient_t **self_p)
{
assert (self_p);
if (*self_p) {
flclient_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}

// Connect to new server endpoint

void
flclient_connect (flclient_t *self, char *endpoint)
{
assert (self);
zsocket_connect (self->socket, endpoint);
self->servers++;
}

// This method does the hard work. It sends a request to all
// connected servers in parallel (for this to work, all connections
// must be successful and completed by this time). It then waits
// for a single successful reply, and returns that to the caller.
// Any other replies are just dropped:

zmsg_t *
flclient_request (flclient_t *self, zmsg_t **request_p)
{
assert (self);
assert (*request_p);
zmsg_t *request = *request_p;

// Prefix request with sequence number and empty envelope
char sequence_text [10];
sprintf (sequence_text, "%u", ++self->sequence);
zmsg_pushstr (request, sequence_text);
zmsg_pushstr (request, "");

// Blast the request to all connected servers
int server;
for (server = 0; server < self->servers; server++) {
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->socket);
}
// Wait for a matching reply to arrive from anywhere
// Since we can poll several times, calculate each one
zmsg_t *reply = NULL;
uint64_t endtime = zclock_time () + GLOBAL_TIMEOUT;
while (zclock_time () < endtime) {
zmq_pollitem_t items [] = { { self->socket, 0, ZMQ_POLLIN, 0 } };
zmq_poll (items, 1, (endtime - zclock_time ()) * ZMQ_POLL_MSEC);
if (items [0].revents & ZMQ_POLLIN) {
// Reply is [empty][sequence][OK]
reply = zmsg_recv (self->socket);
assert (zmsg_size (reply) == 3);
free (zmsg_popstr (reply));
char *sequence = zmsg_popstr (reply);
int sequence_nbr = atoi (sequence);
free (sequence);
if (sequence_nbr == self->sequence)
break;
zmsg_destroy (&reply);
}
}
zmsg_destroy (request_p);
return reply;
}

  关于client的实现有一些说明:

  • client被封装成一个良好的基于类的API,隐藏了创建zmq context和socket和跟server通信的复杂工作。如果一个霰弹命中才能被成为”通信”。
  • 如果client在几秒中内没有发现任何回复的server,那它就会放弃该请求。
  • client必须创建一个有效的REP信封,也就是说,在message最前边添加一个空帧。

  该client会运行10,000次名字解析请求(假的,因为我们的server根本什么都没做)然后度量平均开销。在我的测试机上,跟一个server通信的话需要花费60微秒,跟三个server通信,会花费80微秒。

  我们的霰弹扫射实现的优缺点有:

  • 优点:简单,易于实现易于理解。
  • 优点:有故障转移,并且速度相当快,只要有至少一个server在运行。
  • 缺点:有多余的网络流量开销。
  • 缺点:不能先指定好server的顺序,比如说主服务器,次服务器等。
  • 缺点:server每周期最多只能处理一个请求。

模型3: 复杂的和充满坏味道的(Model Three: Complex and Nasty)

  霰弹方式的实现太好以至于不真实。让我们科学点儿来详细讨论下所有的替代设计。我们会探索这些复杂/充满坏味道的选择,即使到最后发现我们更喜欢粗暴点儿的设计。啊哈,这就是我生活的故事。

  我们可以换成使用一个ROUTER socket来解决client的主要问题。这能让我们把请求发送给指定的server,避免发送给已知挂掉的server,在通常意义上跟我们希望的那样智能。我们也能用同样的方法解决server(单线程)的主要问题。

  但在两个匿名socket之间连接ROUTER-ROUTER socket是不可能的。两端都只会在收到第一条message的时候(为对端)产生一个标识,因此两端都不能跟另一端通信,因为它们都在等第一条message。解决这个难题的办法就是欺骗,在一个方向上使用硬编码的标识。在一个client/server的情景中,正确的欺骗方式是让client”知道”server的标识。在另一个方向上这样做会是不切实际的,非常复杂,因为任意数量的client都可能单独上线。疯狂、复杂、阴险对独裁者来说是必须的品质,但对软件来说就极其糟糕。

  为了不去创造另外的概念,我们会使用连接地址作为标识。这是个唯一的字符串,双方不需要霰弹模型之外的信息就都知道的东西。用它来连接两个ROUTER socket相当隐秘且有效。

  回忆下zmq的标识是怎么工作的。server ROUTER socket在绑定之前会设置一个标识。当一个client连接进来的时候,在两端发送一条真实message之前,它们会做一个短暂的握手来交换各自的标识。 client ROUTER socket并不设置标识,会给server发送一个无标识的信息。server会为该client生成一个随机的UUID来自己使用。server发送它的标识(我们同意用的端点地址字符串)给client。

  这意味着我们的client能把一条message在连接建立之后尽快路由给server(比如说在它自己的ROUTER socket上发出一条用server地址字符串作为标识的message)。并不是在调用zmq_connect()之后就立即工作了,而是在一个随机时间之后。这里有个问题:我们并不知道server什么时候会可用且完成它建立连接的握手。如果server在线,那可能是几毫秒之后,如果server没在线,并且系统管理员出去吃饭了,那现在就可能是1小时了。

  这里有个小的悖论。我们需要知道server什么时候建立连接能正常工作。在自由者模型中,不像我们先前章节中看到的以broker为基础的模型,server在有client跟它通信之前是静默的。因此我们并不能跟一个server通信,直到它告诉了我们它在线,但这也是我们问起来它的时候它才会告诉的。

  我的解决办法是稍微混合了一点儿模型2的霰弹方法,也就是说我会向能射中的任何东西开枪(无害的),如果有东西动了,我们就知道它是活着的。我们不会发射真实的请求,而是一种ping-pong的心跳。

  这又把我们带到了协议的范围,这里是一个简短的自由者模型中client和server怎样交换ping-pong命令和请求-回复命令的定义

  对实现一个server它很简短且友好。下面就是我们的echo server,模型3,现在叫FLP:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//  flserver3: Freelance server, Model Three in C
// Freelance server - Model 3
// Uses an ROUTER/ROUTER socket but just one thread

#include "czmq.h"

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));

zctx_t *ctx = zctx_new ();

// Prepare server socket with predictable identity
char *bind_endpoint = "tcp://*:5555";
char *connect_endpoint = "tcp://localhost:5555";
void *server = zsocket_new (ctx, ZMQ_ROUTER);
zmq_setsockopt (server,
ZMQ_IDENTITY, connect_endpoint, strlen (connect_endpoint));
zsocket_bind (server, bind_endpoint);
printf ("I: service is ready at %s\n", bind_endpoint);

while (!zctx_interrupted) {
zmsg_t *request = zmsg_recv (server);
if (verbose && request)
zmsg_dump (request);
if (!request)
break; // Interrupted

// Frame 0: identity of client
// Frame 1: PING, or client control frame
// Frame 2: request body
zframe_t *identity = zmsg_pop (request);
zframe_t *control = zmsg_pop (request);
zmsg_t *reply = zmsg_new ();
if (zframe_streq (control, "PING"))
zmsg_addstr (reply, "PONG");
else {
zmsg_add (reply, control);
zmsg_addstr (reply, "OK");
}
zmsg_destroy (&request);
zmsg_push (reply, identity);
if (verbose && reply)
zmsg_dump (reply);
zmsg_send (&reply, server);
}
if (zctx_interrupted)
printf ("W: interrupted\n");

zctx_destroy (&ctx);
return 0;
}

  然而自由者的client变得非常大。为了清晰,它分成了一个示例应用和一个做这些复杂工作的类。下面是这个顶层的应用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//  flclient3: Freelance client, Model Three in C
// Freelance client - Model 3
// Uses flcliapi class to encapsulate Freelance pattern

// Lets us build this source without creating a library
#include "flcliapi.c"

int main (void)
{
// Create new freelance client object
flcliapi_t *client = flcliapi_new ();

// Connect to several endpoints
flcliapi_connect (client, "tcp://localhost:5555");
flcliapi_connect (client, "tcp://localhost:5556");
flcliapi_connect (client, "tcp://localhost:5557");

// Send a bunch of name resolution 'requests', measure time
int requests = 1000;
uint64_t start = zclock_time ();
while (requests--) {
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "random name");
zmsg_t *reply = flcliapi_request (client, &request);
if (!reply) {
printf ("E: name service not available, aborting\n");
break;
}
zmsg_destroy (&reply);
}
printf ("Average round trip cost: %d usec\n",
(int) (zclock_time () - start) / 10);

flcliapi_destroy (&client);
return 0;
}

  下面是跟管家broker差不多一样复杂庞大的client API类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
//  flcliapi: Freelance client API in C
// flcliapi class - Freelance Pattern agent class
// Implements the Freelance Protocol at http://rfc.zeromq.org/spec:10

#include "flcliapi.h"

// If no server replies within this time, abandon request
#define GLOBAL_TIMEOUT 3000 // msecs
// PING interval for servers we think are alive
#define PING_INTERVAL 2000 // msecs
// Server considered dead if silent for this long
#define SERVER_TTL 6000 // msecs

// This API works in two halves, a common pattern for APIs that need to
// run in the background. One half is an frontend object our application
// creates and works with; the other half is a backend "agent" that runs
// in a background thread. The frontend talks to the backend over an
// inproc pipe socket:

// Structure of our frontend class

struct _flcliapi_t {
zctx_t *ctx; // Our context wrapper
void *pipe; // Pipe through to flcliapi agent
};

// This is the thread that handles our real flcliapi class
static void flcliapi_agent (void *args, zctx_t *ctx, void *pipe);

// Constructor

flcliapi_t *
flcliapi_new (void)
{
flcliapi_t
*self;

self = (flcliapi_t *) zmalloc (sizeof (flcliapi_t));
self->ctx = zctx_new ();
self->pipe = zthread_fork (self->ctx, flcliapi_agent, NULL);
return self;
}

// Destructor

void
flcliapi_destroy (flcliapi_t **self_p)
{
assert (self_p);
if (*self_p) {
flcliapi_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}

// To implement the connect method, the frontend object sends a multipart
// message to the backend agent. The first part is a string "CONNECT", and
// the second part is the endpoint. It waits 100msec for the connection to
// come up, which isn't pretty, but saves us from sending all requests to a
// single server, at startup time:

void
flcliapi_connect (flcliapi_t *self, char *endpoint)
{
assert (self);
assert (endpoint);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "CONNECT");
zmsg_addstr (msg, endpoint);
zmsg_send (&msg, self->pipe);
zclock_sleep (100); // Allow connection to come up
}

// To implement the request method, the frontend object sends a message
// to the backend, specifying a command "REQUEST" and the request message:

zmsg_t *
flcliapi_request (flcliapi_t *self, zmsg_t **request_p)
{
assert (self);
assert (*request_p);

zmsg_pushstr (*request_p, "REQUEST");
zmsg_send (request_p, self->pipe);
zmsg_t *reply = zmsg_recv (self->pipe);
if (reply) {
char *status = zmsg_popstr (reply);
if (streq (status, "FAILED"))
zmsg_destroy (&reply);
free (status);
}
return reply;
}

// Here we see the backend agent. It runs as an attached thread, talking
// to its parent over a pipe socket. It is a fairly complex piece of work
// so we'll break it down into pieces. First, the agent manages a set of
// servers, using our familiar class approach:

// Simple class for one server we talk to

typedef struct {
char *endpoint; // Server identity/endpoint
uint alive; // 1 if known to be alive
int64_t ping_at; // Next ping at this time
int64_t expires; // Expires at this time
} server_t;

server_t *
server_new (char *endpoint)
{
server_t *self = (server_t *) zmalloc (sizeof (server_t));
self->endpoint = strdup (endpoint);
self->alive = 0;
self->ping_at = zclock_time () + PING_INTERVAL;
self->expires = zclock_time () + SERVER_TTL;
return self;
}

void
server_destroy (server_t **self_p)
{
assert (self_p);
if (*self_p) {
server_t *self = *self_p;
free (self->endpoint);
free (self);
*self_p = NULL;
}
}

int
server_ping (const char *key, void *server, void *socket)
{
server_t *self = (server_t *) server;
if (zclock_time () >= self->ping_at) {
zmsg_t *ping = zmsg_new ();
zmsg_addstr (ping, self->endpoint);
zmsg_addstr (ping, "PING");
zmsg_send (&ping, socket);
self->ping_at = zclock_time () + PING_INTERVAL;
}
return 0;
}

int
server_tickless (const char *key, void *server, void *arg)
{
server_t *self = (server_t *) server;
uint64_t *tickless = (uint64_t *) arg;
if (*tickless > self->ping_at)
*tickless = self->ping_at;
return 0;
}

// We build the agent as a class that's capable of processing messages
// coming in from its various sockets:

// Simple class for one background agent

typedef struct {
zctx_t *ctx; // Own context
void *pipe; // Socket to talk back to application
void *router; // Socket to talk to servers
zhash_t *servers; // Servers we've connected to
zlist_t *actives; // Servers we know are alive
uint sequence; // Number of requests ever sent
zmsg_t *request; // Current request if any
zmsg_t *reply; // Current reply if any
int64_t expires; // Timeout for request/reply
} agent_t;

agent_t *
agent_new (zctx_t *ctx, void *pipe)
{
agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
self->ctx = ctx;
self->pipe = pipe;
self->router = zsocket_new (self->ctx, ZMQ_ROUTER);
self->servers = zhash_new ();
self->actives = zlist_new ();
return self;
}

void
agent_destroy (agent_t **self_p)
{
assert (self_p);
if (*self_p) {
agent_t *self = *self_p;
zhash_destroy (&self->servers);
zlist_destroy (&self->actives);
zmsg_destroy (&self->request);
zmsg_destroy (&self->reply);
free (self);
*self_p = NULL;
}
}

// This method processes one message from our frontend class
// (it's going to be CONNECT or REQUEST):

// Callback when we remove server from agent 'servers' hash table

static void
s_server_free (void *argument)
{
server_t *server = (server_t *) argument;
server_destroy (&server);
}

void
agent_control_message (agent_t *self)
{
zmsg_t *msg = zmsg_recv (self->pipe);
char *command = zmsg_popstr (msg);

if (streq (command, "CONNECT")) {
char *endpoint = zmsg_popstr (msg);
printf ("I: connecting to %s…\n", endpoint);
int rc = zmq_connect (self->router, endpoint);
assert (rc == 0);
server_t *server = server_new (endpoint);
zhash_insert (self->servers, endpoint, server);
zhash_freefn (self->servers, endpoint, s_server_free);
zlist_append (self->actives, server);
server->ping_at = zclock_time () + PING_INTERVAL;
server->expires = zclock_time () + SERVER_TTL;
free (endpoint);
}
else
if (streq (command, "REQUEST")) {
assert (!self->request); // Strict request-reply cycle
// Prefix request with sequence number and empty envelope
char sequence_text [10];
sprintf (sequence_text, "%u", ++self->sequence);
zmsg_pushstr (msg, sequence_text);
// Take ownership of request message
self->request = msg;
msg = NULL;
// Request expires after global timeout
self->expires = zclock_time () + GLOBAL_TIMEOUT;
}
free (command);
zmsg_destroy (&msg);
}

// This method processes one message from a connected
// server:

void
agent_router_message (agent_t *self)
{
zmsg_t *reply = zmsg_recv (self->router);

// Frame 0 is server that replied
char *endpoint = zmsg_popstr (reply);
server_t *server =
(server_t *) zhash_lookup (self->servers, endpoint);
assert (server);
free (endpoint);
if (!server->alive) {
zlist_append (self->actives, server);
server->alive = 1;
}
server->ping_at = zclock_time () + PING_INTERVAL;
server->expires = zclock_time () + SERVER_TTL;

// Frame 1 may be sequence number for reply
char *sequence = zmsg_popstr (reply);
if (atoi (sequence) == self->sequence) {
zmsg_pushstr (reply, "OK");
zmsg_send (&reply, self->pipe);
zmsg_destroy (&self->request);
}
else
zmsg_destroy (&reply);
}

// Finally, here's the agent task itself, which polls its two sockets
// and processes incoming messages:

static void
flcliapi_agent (void *args, zctx_t *ctx, void *pipe)
{
agent_t *self = agent_new (ctx, pipe);

zmq_pollitem_t items [] = {
{ self->pipe, 0, ZMQ_POLLIN, 0 },
{ self->router, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
// Calculate tickless timer, up to 1 hour
uint64_t tickless = zclock_time () + 1000 * 3600;
if (self->request
&& tickless > self->expires)
tickless = self->expires;
zhash_foreach (self->servers, server_tickless, &tickless);

int rc = zmq_poll (items, 2,
(tickless - zclock_time ()) * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Context has been shut down

if (items [0].revents & ZMQ_POLLIN)
agent_control_message (self);

if (items [1].revents & ZMQ_POLLIN)
agent_router_message (self);

// If we're processing a request, dispatch to next server
if (self->request) {
if (zclock_time () >= self->expires) {
// Request expired, kill it
zstr_send (self->pipe, "FAILED");
zmsg_destroy (&self->request);
}
else {
// Find server to talk to, remove any expired ones
while (zlist_size (self->actives)) {
server_t *server =
(server_t *) zlist_first (self->actives);
if (zclock_time () >= server->expires) {
zlist_pop (self->actives);
server->alive = 0;
}
else {
zmsg_t *request = zmsg_dup (self->request);
zmsg_pushstr (request, server->endpoint);
zmsg_send (&request, self->router);
break;
}
}
}
}
// Disconnect and delete any expired servers
// Send heartbeats to idle servers if needed
zhash_foreach (self->servers, server_ping, self->router);
}
agent_destroy (&self);
}

  该API的实现相当复杂,使用了两个我们以前没有见过的技巧:

  • 多线程API: 该client API 包括两部分,一个同步的flciapi类跑在应用线程中,一个异步的agent类跑在后台线程中。回忆下zmq怎么使多线程应用的创建变得容易的。该flcliapi和agent类通过一个inproc socket使用message相互通信。所有的zmq部分(例如创建和销毁context)被隐藏在API中。该agent实际上就像一个小型的broker,在后台跟server通信,好让在我们发起一个请求的时候,它能尽最大努力连接一个它认为在线可用的server。
  • 无计数轮训定时器: 在以前的轮训循环中我们总是使用一个固定的滴答间隔,比如说1s,这很简单但对电量敏感的client(比如笔记本或者移动电话)就不完美了,在这些设备种滴答会唤醒CPU而浪费电量。为了好玩,也为了帮助拯救地球,agen使用了一个无计数定时器,它会根据我们希望的下次超时时间来计算轮训延迟。一个正确的实现是维持一个排好序的超时时间列表。我们只需要检查一下全部的超时时间然后计算下次的轮训延迟就可以了。

结论

  在本章中,我们看了很多中可靠的请求-回复机制,每种都有相应的缺点和优点。示例代码很大成都上都能拿来实际使用,尽管还没进行优化。在所有的这些不同模型中,有两种能用来放到产品中使用的是管家模型——针对以borker为基础的可靠性和自由者模型——针对无borker的可靠性。