0%

zmq中文指南_5

第5章-高级发布-订阅模型

  在第3章-高级请求-回复模型和第4章-可靠的请求-回复模型中,我们讨论了zmq的高级请求-回复模型的使用。如果你已经全部掌握了,恭喜。本章我们会集中在发布-订阅模型和为了性能、可靠性、状态分发和监测使用更高级的模型对核心的pub-sub模型进行扩展。

  我们会讲到:

  • 什么时候使用发布-订阅模型
  • 怎么处理过慢的订阅者(慢性自杀模型 the Suicidal Snail pattern)
  • 怎样设计高速订阅者(黑盒模型 the Black Box pattern)
  • 怎样监测一个pub-sub网络(浓缩咖啡模型 the Espresso pattern)
  • 怎样设计一个共享的键-值存储(克隆模型 the Clone pattern)
  • 怎样使用反应器简化复杂的服务器设计
  • 怎样使用双子星模型为一个服务器增加故障转移。

Pub-Sub的优缺点

  zmq的低级组合有它们各自不同的特性。pub-sub致力于解决一个消息传输的老问题:广播组播。它有着跟zmq不同的一丝不苟的简单性和粗暴方式相结合的独特混合性质。它值得我们去了解pub-sub的优缺点,懂得怎么做会对我们有利,并且在需要的时候如何避免那些弊端。

  首先,PUB会把每条message都发送给”所有的对端”,而不是PUSH和DEALER那样只发送给”所有对端中的某一个”。你不能简单的用PUB替换掉PUSH或者反过来又期望系统能正常工作。这个错误经常出现,因为人们真的经常建议这样做。

  其次,pub-sub目标是可扩展性。这意味着大量数据快速发送给很多个接收者。如果你需要每秒给成千个节点发送百万级别的信息,你就非常需要pub-sub了。

  为了可扩展性,pub-sub使用了跟push-pull相同的技巧,就是避免回复。也就是说接收者不会反过来跟发送者通信。也有些例外,比如,SUB socket会向PUB socket发送少量且匿名的订阅信息。

  避免回复在实际的可扩展性中非常重要。使用pub-sub,它能清晰的映射到网络交换机处理的PGM多播协议。换句话说,订阅者并不会直接连接发布者,它们只是连接到交换机上的一个多播上去,发布者会把信息发送到该组上。

  因为避免了回复,我们所有的信息流都变得非常简单,这能让我们设计出更简单的API,更简单的协议,能让更多的人接触使用。但我们也移除了协同发送者和接收者的任何可能性。这意味着:

  • 发布者不能告诉订阅者什么时候双方成功建立了连接,不管是初始化连接还是网络故障修复后的重连接。
  • 订阅者无法告诉发布者任何能让发布者控制发布的信息速度的信息。发布者只有一个配置,也就是全速发送,那订阅者或者能保存或者丢失信息了。
  • 发布者无法知道订阅者什么时候因为进程挂掉、网络故障等原因而掉线。

  缺点是如果我们想设计可靠的多播这些就是必不可少的。zmq的pub-sub模型会在订阅者正在连接的时候、网络故障的时候或者订阅者或网络跟不上发布者发布速度的时候或者其他任意时候丢失message。

  好处是仍有很多应用场景中大致可靠的多播就足够好了。当我们需要回复的时候,我们可以换成使用ROUTER-DEALER(对大多数常规应用场景中我推荐这种模型),或者为同步增加一个单独的通道(本章稍后会看到)。

  pub-sub就像个广播:在加入前你会错过所有东西,然后你能获取多少东西依赖于你的接收器的品质。让人高兴的是,该模型很有用,因为它完美的反映出了真实世界中信息的分发。想想Facebook和Twitter,BBC世界新闻和体育结果。

  就像在请求-发布模型中做的那样,我们来根据能发生的错误来定义可靠性。下面是pub-sub会发生的经典错误:

  • 订阅者加入的晚了,错过了server已经发送的信息。
  • 订阅者接收信息太慢了,缓存队列持续增加最后溢出。
  • 订阅者在掉线的时候会丢失信息。
  • 订阅者会挂掉和重启,并且丢掉它们已经接收到的信息。
  • 网络会过载然后丢掉数据(特别是对PGM协议来说)。
  • 网络会变得非常慢,因此发布者的缓存队列会溢出然后发布者挂掉。

  还有很多可能的故障,但这些是我们在实际系统中发现的比较典型的。从v3.x开始,zmq强制在内部缓冲区(被称作高水位或HWM)使用默认的限制,因此除非你把HWM设置为无限制,那发布者的崩溃会很少。

  所有这些故障情况都有解决方案,尽管有些并不简单。对我们来说大多数时候并不需要很复杂的可靠性保证,这也是为什么zmq并不准备在外部提供可靠性保证的原因(即使可以有个全局的可靠性设计,但还是没有)。

pub-sub跟踪(浓缩咖啡模型 Espresso Pattern)

  让我们从找到一种跟踪pub-sub网络的方法来开始本章内容。在第2章中我们已经看过了一个简单的proxy来做这些传输桥接。zmq_proxy()方法有三个参数:它桥接在一起的frontendbackend socket,和一个把所有信息都发过去的capture socket。

  代码相当简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
//  espresso: Espresso Pattern in C
// Espresso Pattern
// This shows how to capture data using a pub-sub proxy

#include "czmq.h"

// The subscriber thread requests messages starting with
// A and B, then reads and counts incoming messages.

static void
subscriber_thread (void *args, zctx_t *ctx, void *pipe)
{
// Subscribe to "A" and "B"
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (subscriber, "tcp://localhost:6001");
zsocket_set_subscribe (subscriber, "A");
zsocket_set_subscribe (subscriber, "B");

int count = 0;
while (count < 5) {
char *string = zstr_recv (subscriber);
if (!string)
break; // Interrupted
free (string);
count++;
}
zsocket_destroy (ctx, subscriber);
}

// The publisher sends random messages starting with A-J:

static void
publisher_thread (void *args, zctx_t *ctx, void *pipe)
{
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:6000");

while (!zctx_interrupted) {
char string [10];
sprintf (string, "%c-%05d", randof (10) + 'A', randof (100000));
if (zstr_send (publisher, string) == -1)
break; // Interrupted
zclock_sleep (100); // Wait for 1/10th second
}
}

// The listener receives all messages flowing through the proxy, on its
// pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connect
// attached child threads. In other languages your mileage may vary:

static void
listener_thread (void *args, zctx_t *ctx, void *pipe)
{
// Print everything that arrives on pipe
while (true) {
zframe_t *frame = zframe_recv (pipe);
if (!frame)
break; // Interrupted
zframe_print (frame, NULL);
zframe_destroy (&frame);
}
}

// The main task starts the subscriber and publisher, and then sets
// itself up as a listening proxy. The listener runs as a child thread:

int main (void)
{
// Start child threads
zctx_t *ctx = zctx_new ();
zthread_fork (ctx, publisher_thread, NULL);
zthread_fork (ctx, subscriber_thread, NULL);

void *subscriber = zsocket_new (ctx, ZMQ_XSUB);
zsocket_connect (subscriber, "tcp://localhost:6000");
void *publisher = zsocket_new (ctx, ZMQ_XPUB);
zsocket_bind (publisher, "tcp://*:6001");
void *listener = zthread_fork (ctx, listener_thread, NULL);
zmq_proxy (subscriber, publisher, listener);

puts (" interrupted");
// Tell attached threads to exit
zctx_destroy (&ctx);
return 0;
}

  Espresso模型通过创建一个监听线程读取一个PAIR socket的信息然后打印出来它拿到的所有东西来工作。该PAIR socket是一个管道的一端;另一端(另一个PAIR)是我们传递给zmq_proxy()的socket。实际中,你需要过滤一下来拿到自己希望跟踪的信息(印证了该模型的名字)。

  订阅者线程订阅了”A”和”B”类型的信息,收到5条message,然后关闭它的socket。当你运行该例子,监听者打印出两条订阅信息,5条数据message,两个未订阅的信息,然后静默:

1
2
3
4
5
6
7
8
9
[002] 0141
[002] 0142
[007] B-91164
[007] B-12979
[007] A-52599
[007] A-06417
[007] A-45770
[002] 0041
[002] 0042

  该例子清楚的展示了发布者socket怎么在没有订阅者的时候停止发送数据的。发布线程仍然发送着信息,但该socket就静默地丢掉了这些信息。

最后值缓存

如果你已经用过了商业的pub-sub系统,你会发现在zmq的pub-sub模型中没有了一些功能。其中一个就是*最后值缓存 last value caching(LVC)*。这解决了一个新的订阅者在它加入网络的时候怎么跟上的问题。该理论就是刚一个新的订阅者加入并订阅某些特定信息的时候发布者会得到通知。然后发布者就会重新广播这类特定信息的最后一条message。

  我已经解释过了为什么在有新的订阅者加入的时候发布者得不到通知了,就是因为在大型pub-sub系统中,大量的数据让这件事变得不可能。为了构建真实的大规模pub-sub网络,你需要像PGM那样的协议能提升以太网交换机到一个新高度来广播信息给成千个订阅者。用tcp的单播连接发布者到成千个订阅者不具备可扩展性。你会陷入不公平的分发(一些订阅者会在其他订阅者之前得到message),网络冲突和一些其他恼人的问题中。

  PGM是单向协议:发布者向在交换机中的多播地址发送一条message,交换机会重新把该条message广播给所有对它感兴趣的订阅者。发布者永远不会看到订阅者什么时候加入或者离开:这些都发生在交换机中,我们也不想为这部分重新编码。

  然而,在较少数据量少订阅者和有限个数的信息主题的网络中,我们可以使用TCP和XSUB、XPUB socket真事的跟每个订阅者通信,就像我们在Espresso模型中看到的那样。

  能用zmq设计出一个LVC吗?答案是肯定的,只要我们在发布者和订阅者之间设计一个proxy:类似于PGM交换机的功能,但可以让我们自己控制。

  我会从设计一个出现最坏情况的发布者和订阅者开始。该发布者是有缺陷的,它一启动就立即发送一千条不同主题的信息,然后每秒更新一条随机主题的信息。一个订阅者连接并订阅一个主题。没有LVC,一个订阅者可能必须等待平均500s才能得到一条数据。To add some drama, let’s pretend there’s an escaped convict called Gregor threatening to rip the head off Roger the toy bunny if we can’t fix that 8.3 minutes’ delay.

  下面是发布者的代码,注意它有连接到某个地址上的命令行选项,但同时也绑定到了一个地址上。我们会稍后在我们的LVC上使用这个东西:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//  pathopub: Pathologic Publisher in C
// Pathological publisher
// Sends out 1,000 topics and then one random update per second

#include "czmq.h"

int main (int argc, char *argv [])
{
zctx_t *context = zctx_new ();
void *publisher = zsocket_new (context, ZMQ_PUB);
if (argc == 2)
zsocket_bind (publisher, argv [1]);
else
zsocket_bind (publisher, "tcp://*:5556");

// Ensure subscriber connection has time to complete
sleep (1);

// Send out all 1,000 topic messages
int topic_nbr;
for (topic_nbr = 0; topic_nbr < 1000; topic_nbr++) {
zstr_sendfm (publisher, "%03d", topic_nbr);
zstr_send (publisher, "Save Roger");
}
// Send one random update per second
srandom ((unsigned) time (NULL));
while (!zctx_interrupted) {
sleep (1);
zstr_sendfm (publisher, "%03d", randof (1000));
zstr_send (publisher, "Off with his head!");
}
zctx_destroy (&context);
return 0;
}

  下面是订阅者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//  pathosub: Pathologic Subscriber in C
// Pathological subscriber
// Subscribes to one random topic and prints received messages

#include "czmq.h"

int main (int argc, char *argv [])
{
zctx_t *context = zctx_new ();
void *subscriber = zsocket_new (context, ZMQ_SUB);
if (argc == 2)
zsocket_connect (subscriber, argv [1]);
else
zsocket_connect (subscriber, "tcp://localhost:5556");

srandom ((unsigned) time (NULL));
char subscription [5];
sprintf (subscription, "%03d", randof (1000));
zsocket_set_subscribe (subscriber, subscription);

while (true) {
char *topic = zstr_recv (subscriber);
if (!topic)
break;
char *data = zstr_recv (subscriber);
assert (streq (topic, subscription));
puts (data);
free (topic);
free (data);
}
zctx_destroy (&context);
return 0;
}

  试着编译运行这些东西:首先是订阅者,然后运行发布者。你会看到订阅者报告拿到了”Save Roger”:

1
2
./pathosub &
./pathopub

  当你运行第二个订阅者的时候你才会发现Roger的困境。你必须留给它足够长的时间它才能报告得到数据。因此,下面就是我们的最后值缓存系统。就像我说的那样,它是一个绑定两个socket然后在两端都处理数据的proxy:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//  lvcache: Last Value Caching Proxy in C
// Last value cache
// Uses XPUB subscription messages to re-send data

#include "czmq.h"

int main (void)
{
zctx_t *context = zctx_new ();
void *frontend = zsocket_new (context, ZMQ_SUB);
zsocket_bind (frontend, "tcp://*:5557");
void *backend = zsocket_new (context, ZMQ_XPUB);
zsocket_bind (backend, "tcp://*:5558");

// Subscribe to every single topic from publisher
zsocket_set_subscribe (frontend, "");

// Store last instance of each topic in a cache
zhash_t *cache = zhash_new ();

// We route topic updates from frontend to backend, and
// we handle subscriptions by sending whatever we cached,
// if anything:
while (true) {
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
if (zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC) == -1)
break; // Interrupted

// Any new topic data we cache and then forward
if (items [0].revents & ZMQ_POLLIN) {
char *topic = zstr_recv (frontend);
char *current = zstr_recv (frontend);
if (!topic)
break;
char *previous = zhash_lookup (cache, topic);
if (previous) {
zhash_delete (cache, topic);
free (previous);
}
zhash_insert (cache, topic, current);
zstr_sendm (backend, topic);
zstr_send (backend, current);
free (topic);
}
// When we get a new subscription, we pull data from the cache:
if (items [1].revents & ZMQ_POLLIN) {
zframe_t *frame = zframe_recv (backend);
if (!frame)
break;
// Event is one byte 0=unsub or 1=sub, followed by topic
byte *event = zframe_data (frame);
if (event [0] == 1) {
char *topic = zmalloc (zframe_size (frame));
memcpy (topic, event + 1, zframe_size (frame) - 1);
printf ("Sending cached topic %s\n", topic);
char *previous = zhash_lookup (cache, topic);
if (previous) {
zstr_sendm (backend, topic);
zstr_send (backend, previous);
}
free (topic);
}
zframe_destroy (&frame);
}
}
zctx_destroy (&context);
zhash_destroy (&cache);
return 0;
}

  现在运行proxy,然后是发布者:

1
2
./lvcache &
./pathopub tcp://localhost:5557

  现在尽量多的运行订阅者实例,每次都连接proxy的端口5558:

1
./pathosub tcp://localhost:5558

  每个订阅者都愉快的报告”Save Roger”,然后逃犯Gregor又潜回他的位子上吃饭喝茶,这是他最想做的事了。

  一个要注意的:默认的,XPUB socket不会报告重复的订阅信息,这正是在你连接一个XPUB和一个XSUB的时候想要的。我们的例子偷偷绕过了这个,方法是使用随机主题,因此它不起作用的概率是百万分之一。在一个真实的LVC proxy中,你会想用ZMQ_XPUB_VERBOSE选项做个练习。

慢订阅者检测(慢性自杀模型) Slow Subscriber Detection (Suicidal Snail Pattern)

  实际中你可能遇到的一个常见问题是慢订阅者。在一个理想的情况中,数据流全速在发布者到订阅者之间流通。但实际上,订阅者应用经常是用解释性语言编写的,或者要做很多工作,或者程序被写的很烂,导致它们跟不上发布者的速度。

  我们怎么处理一个慢订阅者呢?理想的补救措施是让订阅者更快些,但那可能需要很多工作和时间。一些处理慢订阅者的传统步骤有:

  • 在发布者那里建立队列。这是Gmail在我几个小时没读邮件的时候做的。但在大数据量的应用中,把数据加入队列没什么用且有可能导致发布者耗尽内存崩溃的结果——特别是有很多的订阅者,但因为性能原因也不能把数据缓存到硬盘上的时候。
  • 在订阅者处建立队列。这要好得多,这是是zmq在网络能跟上的时候默认做的那样。如果有人会耗尽内存而崩溃,那也是订阅者而不是发布者,这就公平多了。这对应付”峰期”数据流相当有用,峰期的时候订阅者暂时跟不上速度,但在数据流慢下来的时候就能追上。然而,这对总是很慢的订阅者来说并不是个解决方案。
  • 暂时停止入队新数据。这是Gmail在我的邮箱超出存储空间的时候做的。只是拒绝接收新消息或者直接丢弃。从发布者的角度来说这是个很好的策略,这也是当发布者设置了一个HWM的时候zmq做的。然而,这也无法帮我们解决太慢的订阅者。现在我们只是得到了不完整的数据流
  • 断开太慢的订阅者以做惩罚。这是Hotmail(还记得它吗?)在我两周没登陆的时候干的事,我用了15个Hotmail账号了,这也就是为什么它让我想起来可能有个更好的解决方案。这是个很好的粗鲁的解决策略,能强制让订阅者注意起来,也相当理想,但zmq没这样做,也没办法把它放到高优先层通知订阅者,因为订阅者对发布者程序来说是看不见的。

  这些传统的策略没一个适合的,因此我们需要创造一个新的。不是断开发布者,我们来劝劝订阅者自杀吧。这就是慢性自杀模型(Suicidal Snail pattern)。当一个订阅者检测到它运行的太慢的时候(这里”太慢”大致是个配置选项,意味着”如果你变得慢到一定程度,就大声喊出来,因为我需要知道,然后我能修复它!”),它就呜呼死掉。

  订阅者怎么检测呢?一种方法是把信息顺序入队(按顺序编号)并在发布者那里设置一个HWM。现在,如果订阅者检测到一个缝隙(比如编号不连续),它就知道有些事情弄错了。然后我们调整HWM到”如果你到这个程度就自杀”的水平。

  该方案有两个问题。一是如果我们有很多发布者,我们怎么顺序排列数据呢?解决办法是给每个发布者一个唯一的ID,然后在编号上加上该ID。二是如果订阅者使用ZMQ_SUBSCRIBE过滤,它们会天然带有缝隙。我们宝贵的序列就什么用都没了。

  一些应用场景并不使用过滤器,那排序就有意义。但一个更普遍的做法是发布者给每条信息都打上时间戳。当订阅者收到一条信息,它就检查时间,如果相差超过比如说1s,它就做”呜呼死掉”的事情,可能首先是对这一些操作窗口报警。

  自杀模型特别用于那些订阅者有它们自己的client和服务级协议(service-level agreement),并需要保证特定最小延迟的情况。中断订阅者并不像一个保证最小延迟的有效方法,而是一个断言模型。今天断掉,然后问题会被修复。让延迟的数据进入,在雷达上可能会造成更大的危害和更长时间的潜伏。

  下面是一个很小的自杀模型的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//  suisnail: Suicidal Snail in C
// Suicidal Snail

#include "czmq.h"

// This is our subscriber. It connects to the publisher and subscribes
// to everything. It sleeps for a short time between messages to
// simulate doing too much work. If a message is more than one second
// late, it croaks.

#define MAX_ALLOWED_DELAY 1000 // msecs

static void
subscriber (void *args, zctx_t *ctx, void *pipe)
{
// Subscribe to everything
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (subscriber, "");
zsocket_connect (subscriber, "tcp://localhost:5556");

// Get and process messages
while (true) {
char *string = zstr_recv (subscriber);
printf("%s\n", string);
int64_t clock;
int terms = sscanf (string, "%" PRId64, &clock);
assert (terms == 1);
free (string);

// Suicide snail logic
if (zclock_time () - clock > MAX_ALLOWED_DELAY) {
fprintf (stderr, "E: subscriber cannot keep up, aborting\n");
break;
}
// Work for 1 msec plus some random additional time
zclock_sleep (1 + randof (2));
}
zstr_send (pipe, "gone and died");
}

// This is our publisher task. It publishes a time-stamped message to its
// PUB socket every millisecond:

static void
publisher (void *args, zctx_t *ctx, void *pipe)
{
// Prepare publisher
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5556");

while (true) {
// Send current clock (msecs) to subscribers
char string [20];
sprintf (string, "%" PRId64, zclock_time ());
zstr_send (publisher, string);
char *signal = zstr_recv_nowait (pipe);
if (signal) {
free (signal);
break;
}
zclock_sleep (1); // 1msec wait
}
}

// The main task simply starts a client and a server, and then
// waits for the client to signal that it has died:

int main (void)
{
zctx_t *ctx = zctx_new ();
void *pubpipe = zthread_fork (ctx, publisher, NULL);
void *subpipe = zthread_fork (ctx, subscriber, NULL);
free (zstr_recv (subpipe));
zstr_send (pubpipe, "break");
zclock_sleep (100);
zctx_destroy (&ctx);
return 0;
}

  下面是关于慢性自杀模型例子的一些说明:

  • 这里message只是简单的包含当前系统时钟的毫秒数。在实际应用中,你至少需要一个时间戳组成的message头和包含数据的message body。
  • 本例子中订阅者和发布者只是一个单独进程中的两个线程。实际中,它们可能是单独的进程。使用线程只是便于展示。

高速订阅者 High-Speed Subscribers (Black Box Pattern)

  现在来看看一个让我们的订阅者变快的方法。pub-sub的一个常用场景是分发像股票变化的市场数据那样的大量数据流。典型的系统是有个连接股票变化的发布者,获取报价,然后发给很多订阅者。如果有比较少的订阅者,我们就可以使用TCP。如果有很多订阅者,就需要可靠的广播协议,例如PGM。

f56

  假设我们的订阅平均每秒有100,000条100btype的信息。这是经过过滤不需要的市场数据之后发送给订阅者的典型的速率。现在我们要记录一天的数据(可能在8小时中有250G),然后把它发给一个模拟网络,比如说一小组订阅者。尽管对zmq应用来说每秒100k条数据很简单,但我们还想更快些

  我们需要一组节点来构建我们的框架——一个用来做发布者,然后每个订阅者一个。这些都是定制的节点——订阅者的8核,发布者的12个核心。

  在我们给订阅者发布数据的时候,需要注意两个事情:

  1. 即使对message进行非常非常微小的操作,也会降低订阅者的接受速度让它再也追不上发布者的速度。
  2. 我们会到达一个瓶颈,不管是发布者还是接收者,大约最多每秒6M的数据,即使很小心的进行优化和使用TCP通道。

  首先我们要做的是把订阅者拆成一个多线程的设计,好让一个线程读取数据的时候另外的一组线程去处理数据。特别是,我们并不想按照相同方式处理每条数据。另外,订阅者可能根据前缀关键字来筛选一些message。当message符合某种标准的时候,订阅者会调用一个worker去处理它。在zmq的语境中,这意味着给一个工作线程发送message。

  因此订阅者看起来像一个队列的装置。我们可以用很多socket去连接订阅者和worker,如果我们假设的是单向通信并且worker全是不同的,就可以用PUSH和PULL,把所有路由工作都交给zmq去完成。这是最简单和最快的实现方式。

  订阅者跟发布者通信是基于TCP或PGM。订阅者跟它的workers通信是基于inproc://,它们都在同一个线程中。

f57

  现在来打破瓶颈。订阅者线程会达到100%的CPU使用率,因为它是单线程的,不能使用多个核心。单线程总会到达一个瓶颈,最多每秒处理2M、6M或者更多点的数据。我们希望能把工作分摊到多个并行运行的线程中。

  这种解决方法已经被很多高性能产品使用,被称作分片 sharding。使用分片,我们能把工作分散到并行和独立的处理流中,比如说一半的主题关键字放到这个处理流中,另一半放另一个中。我们可以使用很多个处理流,但除非有多余的核心,否则性能得不到提升。让我们来看看怎么分成两个处理流。

  要使用两个处理流,都全速工作,我们可以这样配置zmq:

  • 两个I/O线程,而不是一个。
  • 两个网络接口(NIC),每个订阅者一个。
  • 每个I/O线程绑定到一个指定的NIC上。
  • 两个订阅者线程,绑定到指定的核心。
  • 两个SUB socket,每个订阅者线程一个。
  • 剩余的核心分配给worker线程。
  • worker线程连接到两个订阅者的PUSH socket去。

  理想状况下,我们希望框架中满负荷的线程数跟核心数相匹配。当线程开始竞争核心和CPU循环,增加更多的线程数带来的开销要大于收益。例如,创建更多的I/O线程就没有多大意义。

可靠的pub-sub Reliable Pub-Sub(Clone Pattern)

  基于大量已经工作的例子,现在我们来设计一个可靠的pub-sub框架。我们会一步步来开发。目标是让一组应用能共享某些状态。下面是我们的技术挑战:

  • 有一大组client应用,大概成千或上万个。
  • 它们会随时加入和退出网络。
  • 这些应用必须共享一个自始至终都一致的*状态(state)*。
  • 任何client都能在任何时间更新该状态。

  假设更新很小数据量,我们也并不打算做到实时。整个状态信息可以放进内存。一些可能的应用场景是:

  • 一组云服务器共享的配置信息。
  • 一组玩家共享的一些游戏信息。
  • 实时更新的汇率信息。

中心式vs分布式 (Centralized Versus Decentralized)

  我们首先必须要做的一个决定是是否需要一个中心服务器。最终设计会有很大区别。优缺点如下:

  • 概念上来说,中心服务器理解起来更简单,因为网络并不是天然对称的。使用中心服务器,我们可以避免诸如服务发现、绑定还是连接等问题。
  • 通常来说,一个完全分布式的架构更有技术挑战但会有更简单的协议。也就是说,每个节点都必须按照正确的方式既能当server又能当client,这相当精妙。等设计正确的话,结果会比一个中心服务器更简单清晰。我们在第四章的自由者模型中已经看过了。
  • 一个中心服务器可能会在大数据量的应用场景中成为瓶颈。如果处理规模需要在每秒百万级别信息的话,我们就要朝着分布式的方向努力了。
  • 反过来说,一个中心式的架构会比分布式的架构更容易扩展节点。把10,000个节点连接1台服务器要比这10,000个节点相互之间连接要容易的多。

把状态表示成键-值对

  我们会按步骤开发克隆模型,每次解决一个问题。首先,让我们看看怎么在一组client之间更新一个共享状态。我们需要决定怎么去表示我们的状态,也就是更新的信息。最简单的形式可能就是一个键-值对存储,每个键-值对代表共享状态改变的一个原子组合。

  在第一章我们有个简单的pub-sub例子,天气预报server和client。我们来把server换成发送键-值对的数据,把client换成在一个hash表中存储这些数据。这样我们就可以使用传统的pub-sub模型从一个server向一组client发送更新。

  一个更新可以是一个新的key-value对,一个对现有key修改的value或者一个删除的key。目前我们可以假设key-value对能整个放到内存中并且应用根据key来使用,比如使用一个hash表或字典。为了更大存储空间或持久化我们可以把状态放进数据库,但对这里没什么影响。

  下面是server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//  clonesrv1: Clone server, Model One in C
// Clone server Model One

#include "kvsimple.c"

int main (void)
{
// Prepare our context and publisher socket
zctx_t *ctx = zctx_new ();
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5556");
zclock_sleep (200);

zhash_t *kvmap = zhash_new ();
int64_t sequence = 0;
srandom ((unsigned) time (NULL));

while (!zctx_interrupted) {
// Distribute as key-value message
kvmsg_t *kvmsg = kvmsg_new (++sequence);
kvmsg_fmt_key (kvmsg, "%d", randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_store (&kvmsg, kvmap);
}
printf (" Interrupted\n%d messages out\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}

  下面是client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//  clonecli1: Clone client, Model One in C
// Clone client Model One

#include "kvsimple.c"

int main (void)
{
// Prepare our context and updates socket
zctx_t *ctx = zctx_new ();
void *updates = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (updates, "");
zsocket_connect (updates, "tcp://localhost:5556");

zhash_t *kvmap = zhash_new ();
int64_t sequence = 0;

while (true) {
kvmsg_t *kvmsg = kvmsg_recv (updates);
if (!kvmsg)
break; // Interrupted
kvmsg_store (&kvmsg, kvmap);
sequence++;
}
printf (" Interrupted\n%d messages in\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}

f58

  对这第一个模型有一些说明:

  • 所有复杂的工作都在一个 kvmsg 类中做了。该类操作一个key-value message 对象,该对象被组织成zmq的三帧message:一个key(一个zmq字符串),一个序列号(64位值,按网络字节序)和一个二进制的body(包括所有其他的东西)。
  • 该server使用一个随机的4位数字key生成message,它能让我们模拟一个大的但不是特别大的hash表(10k个条目)。
  • 在这个版本中我们不去实现删除:所有的message都是插入或更新。
  • 绑定它的socket之后server会做一个200毫秒的暂停,来避免由于慢接入现象导致的订阅者在连接的时候丢失信息的问题。在稍后版本的克隆模型代码中我们会移除它。
  • 在代码中我们使用publishersubscriber来代表socket,这在以后我们用多个socket做不同事情的时候会有帮助。

  下面是这个kvmsg类,暂时只是最简单的形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
//  kvsimple: Key-value message class in C
// kvsimple class - key-value message class for example applications

#include "kvsimple.h"
#include "zlist.h"

// Keys are short strings
#define KVMSG_KEY_MAX 255

// Message is formatted on wire as 4 frames:
// frame 0: key (0MQ string)
// frame 1: sequence (8 bytes, network order)
// frame 2: body (blob)
#define FRAME_KEY 0
#define FRAME_SEQ 1
#define FRAME_BODY 2
#define KVMSG_FRAMES 3

// The kvmsg class holds a single key-value message consisting of a
// list of 0 or more frames:

struct _kvmsg {
// Presence indicators for each frame
int present [KVMSG_FRAMES];
// Corresponding 0MQ message frames, if any
zmq_msg_t frame [KVMSG_FRAMES];
// Key, copied into safe C string
char key [KVMSG_KEY_MAX + 1];
};

// Here are the constructor and destructor for the class:

// Constructor, takes a sequence number for the new kvmsg instance:
kvmsg_t *
kvmsg_new (int64_t sequence)
{
kvmsg_t
*self;

self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));
kvmsg_set_sequence (self, sequence);
return self;
}

// zhash_free_fn callback helper that does the low level destruction:
void
kvmsg_free (void *ptr)
{
if (ptr) {
kvmsg_t *self = (kvmsg_t *) ptr;
// Destroy message frames if any
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);

// Free object itself
free (self);
}
}

// Destructor
void
kvmsg_destroy (kvmsg_t **self_p)
{
assert (self_p);
if (*self_p) {
kvmsg_free (*self_p);
*self_p = NULL;
}
}

// This method reads a key-value message from socket, and returns a new
// kvmsg instance:

kvmsg_t *
kvmsg_recv (void *socket)
{
assert (socket);
kvmsg_t *self = kvmsg_new (0);

// Read all frames off the wire, reject if bogus
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);
zmq_msg_init (&self->frame [frame_nbr]);
self->present [frame_nbr] = 1;
if (zmq_msg_recv (&self->frame [frame_nbr], socket, 0) == -1) {
kvmsg_destroy (&self);
break;
}
// Verify multipart framing
int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;
if (zsocket_rcvmore (socket) != rcvmore) {
kvmsg_destroy (&self);
break;
}
}
return self;
}

// This method sends a multiframe key-value message to a socket:

void
kvmsg_send (kvmsg_t *self, void *socket)
{
assert (self);
assert (socket);

int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
zmq_msg_t copy;
zmq_msg_init (&copy);
if (self->present [frame_nbr])
zmq_msg_copy (&copy, &self->frame [frame_nbr]);
zmq_msg_send (&copy, socket,
(frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);
zmq_msg_close (&copy);
}
}

// These methods let the caller get and set the message key, as a
// fixed string and as a printf formatted string:

char *
kvmsg_key (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_KEY]) {
if (!*self->key) {
size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
if (size > KVMSG_KEY_MAX)
size = KVMSG_KEY_MAX;
memcpy (self->key,
zmq_msg_data (&self->frame [FRAME_KEY]), size);
self->key [size] = 0;
}
return self->key;
}
else
return NULL;
}

void
kvmsg_set_key (kvmsg_t *self, char *key)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_KEY];
if (self->present [FRAME_KEY])
zmq_msg_close (msg);
zmq_msg_init_size (msg, strlen (key));
memcpy (zmq_msg_data (msg), key, strlen (key));
self->present [FRAME_KEY] = 1;
}

void
kvmsg_fmt_key (kvmsg_t *self, char *format, …)
{
char value [KVMSG_KEY_MAX + 1];
va_list args;

assert (self);
va_start (args, format);
vsnprintf (value, KVMSG_KEY_MAX, format, args);
va_end (args);
kvmsg_set_key (self, value);
}

// These two methods let the caller get and set the message sequence number:

int64_t
kvmsg_sequence (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_SEQ]) {
assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
int64_t sequence = ((int64_t) (source [0]) << 56)
+ ((int64_t) (source [1]) << 48)
+ ((int64_t) (source [2]) << 40)
+ ((int64_t) (source [3]) << 32)
+ ((int64_t) (source [4]) << 24)
+ ((int64_t) (source [5]) << 16)
+ ((int64_t) (source [6]) << 8)
+ (int64_t) (source [7]);
return sequence;
}
else
return 0;
}

void
kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_SEQ];
if (self->present [FRAME_SEQ])
zmq_msg_close (msg);
zmq_msg_init_size (msg, 8);

byte *source = zmq_msg_data (msg);
source [0] = (byte) ((sequence >> 56) & 255);
source [1] = (byte) ((sequence >> 48) & 255);
source [2] = (byte) ((sequence >> 40) & 255);
source [3] = (byte) ((sequence >> 32) & 255);
source [4] = (byte) ((sequence >> 24) & 255);
source [5] = (byte) ((sequence >> 16) & 255);
source [6] = (byte) ((sequence >> 8) & 255);
source [7] = (byte) ((sequence) & 255);

self->present [FRAME_SEQ] = 1;
}

// These methods let the caller get and set the message body as a
// fixed string and as a printf formatted string:

byte *
kvmsg_body (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
else
return NULL;
}

void
kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_BODY];
if (self->present [FRAME_BODY])
zmq_msg_close (msg);
self->present [FRAME_BODY] = 1;
zmq_msg_init_size (msg, size);
memcpy (zmq_msg_data (msg), body, size);
}

void
kvmsg_fmt_body (kvmsg_t *self, char *format, …)
{
char value [255 + 1];
va_list args;

assert (self);
va_start (args, format);
vsnprintf (value, 255, format, args);
va_end (args);
kvmsg_set_body (self, (byte *) value, strlen (value));
}

// This method returns the body size of the most recently read message,
// if any exists:

size_t
kvmsg_size (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return zmq_msg_size (&self->frame [FRAME_BODY]);
else
return 0;
}

// This method stores the key-value message into a hash map, unless
// the key and value are both null. It nullifies the kvmsg reference
// so that the object is owned by the hash map, not the caller:

void
kvmsg_store (kvmsg_t **self_p, zhash_t *hash)
{
assert (self_p);
if (*self_p) {
kvmsg_t *self = *self_p;
assert (self);
if (self->present [FRAME_KEY]
&& self->present [FRAME_BODY]) {
zhash_update (hash, kvmsg_key (self), self);
zhash_freefn (hash, kvmsg_key (self), kvmsg_free);
}
*self_p = NULL;
}
}

// This method prints the key-value message to stderr for
// debugging and tracing:

void
kvmsg_dump (kvmsg_t *self)
{
if (self) {
if (!self) {
fprintf (stderr, "NULL");
return;
}
size_t size = kvmsg_size (self);
byte *body = kvmsg_body (self);
fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self));
fprintf (stderr, "[key:%s]", kvmsg_key (self));
fprintf (stderr, "[size:%zd] ", size);
int char_nbr;
for (char_nbr = 0; char_nbr < size; char_nbr++)
fprintf (stderr, "%02X", body [char_nbr]);
fprintf (stderr, "\n");
}
else
fprintf (stderr, "NULL message\n");
}

// It's good practice to have a self-test method that tests the class; this
// also shows how it's used in applications:

int
kvmsg_test (int verbose)
{
kvmsg_t
*kvmsg;

printf (" * kvmsg: ");

// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *output = zsocket_new (ctx, ZMQ_DEALER);
int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);
void *input = zsocket_new (ctx, ZMQ_DEALER);
rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);

zhash_t *kvmap = zhash_new ();

// Test send and receive of simple message
kvmsg = kvmsg_new (1);
kvmsg_set_key (kvmsg, "key");
kvmsg_set_body (kvmsg, (byte *) "body", 4);
if (verbose)
kvmsg_dump (kvmsg);
kvmsg_send (kvmsg, output);
kvmsg_store (&kvmsg, kvmap);

kvmsg = kvmsg_recv (input);
if (verbose)
kvmsg_dump (kvmsg);
assert (streq (kvmsg_key (kvmsg), "key"));
kvmsg_store (&kvmsg, kvmap);

// Shutdown and destroy all objects
zhash_destroy (&kvmap);
zctx_destroy (&ctx);

printf ("OK\n");
return 0;
}

以后我们会设计一个更复杂的kvmsg类来适应实际的工作。

  server和client都会维持hash表,但这第一个模型只是在我们先启动所有的client然后启动server并且client从不会崩溃的情况下才能正常工作,这相当假。

获取一个外带快照 Getting an Out-of-Band Snapshot

  现在我们就有了第二个问题:怎么解决慢接入的client和崩溃又重启的client。

  为了让一个迟到的(或重新回来的)client跟上server,它必须获取一个server状态的快照。就像我们把”message”简化成”一个排好序的key-value对”,我们也可以把”状态”简化成”一个hash表”。为了拿到server的状态,一个client打开一个DEALER socekt然后明确地向server要这个状态。

  要让它能正常工作,我们必须解决一个时间的问题。拿到一个状态的快照会花费一定的时间,如果快照很大时间会相当长。我们需要适当的更新快照。但server并不知道什么时候开始向我们发送更新。一个办法是开始接收,拿到第一个更新,然后再去要”更新N的状态”。这要求server为每个更新都存储一个快照,很不现实。

f59

  所以我们会在client中做同步,如下:

  • client首先订阅更新然后做一个状态请求。这保证了现有的状态比它原有最老的状态要新(译注:感觉这句话错了。 The client first subscribers to updates and then makes a state reques. This guarantees that the state is going to be newer than the oldest update it has.)。
  • client等着server回复状态,同时把所有更新都放到队列中。它不读取它们:zmq会把它们都放到socket的队列中去的。
  • 当client收到它的状态更新,就重新开始读取更新。然而,它会把所有比状态更新旧的更新给丢掉。因此如果状态更新最多包含到200,那client就会把直到201之前的更新都给丢掉。
  • client然后就把更新应用到它自己的状态快照中去。

  这是一个简单的模型,利用了zmq自身的内部队列。下面是server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
//  clonesrv2: Clone server, Model Two in C
// Clone server - Model Two

// Lets us build this source without creating a library
#include "kvsimple.c"

static int s_send_single (const char *key, void *data, void *args);
static void state_manager (void *args, zctx_t *ctx, void *pipe);

int main (void)
{
// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5557");

int64_t sequence = 0;
srandom ((unsigned) time (NULL));

// Start state manager and wait for synchronization signal
void *updates = zthread_fork (ctx, state_manager, NULL);
free (zstr_recv (updates));

while (!zctx_interrupted) {
// Distribute as key-value message
kvmsg_t *kvmsg = kvmsg_new (++sequence);
kvmsg_fmt_key (kvmsg, "%d", randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_send (kvmsg, updates);
kvmsg_destroy (&kvmsg);
}
printf (" Interrupted\n%d messages out\n", (int) sequence);
zctx_destroy (&ctx);
return 0;
}

// Routing information for a key-value snapshot
typedef struct {
void *socket; // ROUTER socket to send to
zframe_t *identity; // Identity of peer who requested state
} kvroute_t;

// Send one state snapshot key-value pair to a socket
// Hash item data is our kvmsg object, ready to send
static int
s_send_single (const char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
// Send identity of recipient first
zframe_send (&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_t *kvmsg = (kvmsg_t *) data;
kvmsg_send (kvmsg, kvroute->socket);
return 0;
}

// The state manager task maintains the state and handles requests from
// clients for snapshots:

static void
state_manager (void *args, zctx_t *ctx, void *pipe)
{
zhash_t *kvmap = zhash_new ();

zstr_send (pipe, "READY");
void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (snapshot, "tcp://*:5556");

zmq_pollitem_t items [] = {
{ pipe, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
int64_t sequence = 0; // Current snapshot version number
while (!zctx_interrupted) {
int rc = zmq_poll (items, 2, -1);
if (rc == -1 && errno == ETERM)
break; // Context has been shut down

// Apply state update from main thread
if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (pipe);
if (!kvmsg)
break; // Interrupted
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
}
// Execute state snapshot request
if (items [1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv (snapshot);
if (!identity)
break; // Interrupted

// Request is in second frame of message
char *request = zstr_recv (snapshot);
if (streq (request, "ICANHAZ?"))
free (request);
else {
printf ("E: bad request, aborting\n");
break;
}
// Send state snapshot to client
kvroute_t routing = { snapshot, identity };

// For each entry in kvmap, send kvmsg to client
zhash_foreach (kvmap, s_send_single, &routing);

// Now send END message with sequence number
printf ("Sending state shapshot=%d\n", (int) sequence);
zframe_send (&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, snapshot);
kvmsg_destroy (&kvmsg);
}
}
zhash_destroy (&kvmap);
}

  下面是client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//  clonecli2: Clone client, Model Two in C
// Clone client - Model Two

// Lets us build this source without creating a library
#include "kvsimple.c"

int main (void)
{
// Prepare our context and subscriber
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (subscriber, "");
zsocket_connect (subscriber, "tcp://localhost:5557");

zhash_t *kvmap = zhash_new ();

// Get state snapshot
int64_t sequence = 0;
zstr_send (snapshot, "ICANHAZ?");
while (true) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // Interrupted
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence (kvmsg);
printf ("Received snapshot=%d\n", (int) sequence);
kvmsg_destroy (&kvmsg);
break; // Done
}
kvmsg_store (&kvmsg, kvmap);
}
// Now apply pending updates, discard out-of-sequence messages
while (!zctx_interrupted) {
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
break; // Interrupted
if (kvmsg_sequence (kvmsg) > sequence) {
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
}
else
kvmsg_destroy (&kvmsg);
}
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}

  关于这两个程序有些需要注意的:

  • server用了两个任务。一个线程产生更新(随机的)然后送给主PUB socket,另一个线程在ROUTER socket上处理状态请求。这两个通过一个inproc://连接的PAIR socket通信。
  • client真的很简单。使用C,它只包含大概50行代码。很多复杂的工作都在kvmsg类中做了。即便如此,这个基本的克隆模型也比它一开始看起来实现要简单些。
  • 我们没用任何序列化这些状态的措施。hash表维持了一组kvmsg对象,server把这些作为一组message发送给要求状态的client。如果多个client同时请求状态,每个都会得到不同的快照。
  • 我们假设该client只跟一个server通信。该server必须运行着:我们并不去解决server挂掉的问题。

  现在,这两个程序并没做什么实际的工作,但它们正确的同步了状态。它是很好的例子来战士怎么去混合多种模型:PAIR-PAIR,PUB-SUB和ROUTER-DEALER。

从client从新发布更新  Rblishing Updates from Clients

  在我们的第二个模型中,key-value存储的改变来自server本身。这是个中心式的模型,对比如说我们有个中心配置文件需要分发,并且每个节点都有本地缓存的状况很有用。另一个更有趣的模型从client获取更新,而不是server。server因此成为一个无状态broker。这会有如下好处:

  • 我们可以更少关心server的可靠性。如果它挂掉了,我们可以重启一个新实例,给它新值。
  • 我们可以用key-value存储在活跃的对端之间共享数据。

  要把更新从client发回给server,我们可以用很多种socket模型。最简单的就是PUSH-PULL组合。

  为什么我们不让client直接向其他client发布更新呢?尽管这会减少延迟,但保证不了一致性。如果你允许更新的顺序根据是谁收到它们的而改变的话,就无法得到连续的共享状态。比如我们有两个client,改变两个不同的key的话,这会工作的很好,但如果这两个client想要在大致同一时间改变相同的key,它们会得到不同的值。

  有几种不同的策略去保证在同一时间不同地方发生的变化的一致性。我们要使用的方法是把所有改变都集中起来。不管是client什么时候做出的更新,都会经过server被推送出去,这会强制让一个单独的序列按照它收到更新的顺序排列。

f60

  通过协调所有的更新,server就也能在所有的更新中添加一个唯一的序列编号了。有了唯一的序列,client就能检测严重故障了,包括网络冲突和队列溢出。如果一个client发现进入的数据流有个懂,它就会采取行动。client联系server去要缺失的数据看起来很合理,但在实际应用中却没什么用。如果存在”洞”,就是网络压力造成的,现在给网络增加更大的压力会让事情变得更糟。client能做的事就只是告诉用户”现在没办法继续”,停止,在有人手动检查过问题的原因之前不要重启。

  现在我们就能在client保证状态更新了。下面是server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//  clonesrv3: Clone server, Model Three in C
// Clone server - Model Three

// Lets us build this source without creating a library
#include "kvsimple.c"

// Routing information for a key-value snapshot
typedef struct {
void *socket; // ROUTER socket to send to
zframe_t *identity; // Identity of peer who requested state
} kvroute_t;

// Send one state snapshot key-value pair to a socket
// Hash item data is our kvmsg object, ready to send
static int
s_send_single (const char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
// Send identity of recipient first
zframe_send (&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_t *kvmsg = (kvmsg_t *) data;
kvmsg_send (kvmsg, kvroute->socket);
return 0;
}

int main (void)
{
// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (snapshot, "tcp://*:5556");
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5557");
void *collector = zsocket_new (ctx, ZMQ_PULL);
zsocket_bind (collector, "tcp://*:5558");

// The body of the main task collects updates from clients and
// publishes them back out to clients:

int64_t sequence = 0;
zhash_t *kvmap = zhash_new ();

zmq_pollitem_t items [] = {
{ collector, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);

// Apply state update sent from client
if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (collector);
if (!kvmsg)
break; // Interrupted
kvmsg_set_sequence (kvmsg, ++sequence);
kvmsg_send (kvmsg, publisher);
kvmsg_store (&kvmsg, kvmap);
printf ("I: publishing update %5d\n", (int) sequence);
}
// Execute state snapshot request
if (items [1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv (snapshot);
if (!identity)
break; // Interrupted

// Request is in second frame of message
char *request = zstr_recv (snapshot);
if (streq (request, "ICANHAZ?"))
free (request);
else {
printf ("E: bad request, aborting\n");
break;
}
// Send state snapshot to client
kvroute_t routing = { snapshot, identity };

// For each entry in kvmap, send kvmsg to client
zhash_foreach (kvmap, s_send_single, &routing);

// Now send END message with sequence number
printf ("I: sending shapshot=%d\n", (int) sequence);
zframe_send (&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, snapshot);
kvmsg_destroy (&kvmsg);
}
}
printf (" Interrupted\n%d messages handled\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);

return 0;
}

  下面是client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//  clonecli3: Clone client, Model Three in C
// Clone client - Model Three

// Lets us build this source without creating a library
#include "kvsimple.c"

int main (void)
{
// Prepare our context and subscriber
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (subscriber, "");
zsocket_connect (subscriber, "tcp://localhost:5557");
void *publisher = zsocket_new (ctx, ZMQ_PUSH);
zsocket_connect (publisher, "tcp://localhost:5558");

zhash_t *kvmap = zhash_new ();
srandom ((unsigned) time (NULL));

// We first request a state snapshot:
int64_t sequence = 0;
zstr_send (snapshot, "ICANHAZ?");
while (true) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // Interrupted
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence (kvmsg);
printf ("I: received snapshot=%d\n", (int) sequence);
kvmsg_destroy (&kvmsg);
break; // Done
}
kvmsg_store (&kvmsg, kvmap);
}
// Now we wait for updates from the server and every so often, we
// send a random key-value update to the server:

int64_t alarm = zclock_time () + 1000;
while (!zctx_interrupted) {
zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
int tickless = (int) ((alarm - zclock_time ()));
if (tickless < 0)
tickless = 0;
int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Context has been shut down

if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
break; // Interrupted

// Discard out-of-sequence kvmsgs, incl. heartbeats
if (kvmsg_sequence (kvmsg) > sequence) {
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
printf ("I: received update=%d\n", (int) sequence);
}
else
kvmsg_destroy (&kvmsg);
}
// If we timed out, generate a random kvmsg
if (zclock_time () >= alarm) {
kvmsg_t *kvmsg = kvmsg_new (0);
kvmsg_fmt_key (kvmsg, "%d", randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_destroy (&kvmsg);
alarm = zclock_time () + 1000;
}
}
printf (" Interrupted\n%d messages in\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}

  下面是关于第三版的一些说明:

  • server整合进了一个任务。它管理一个负责接收更新的PULL socket,一个负责状态请求的ROUTER socket和一个负责发送更新的PUB socket。
  • client用一个无滴答的计时器向server每秒发送一个随机的更新。在实际实现中,我们会从应用代码中获取更新。

用子树工作 Working with Subtrees

  随着client数量的增加,我们的共享存储规模也会随之增大。向每个client都发送所有的信息会变得没有意义。这是pub-sub的一个传统问题:当你有很少的client的时候,你可以向所有的client发送所有的信息,随着规模的增长,这会变得很没效率。client在不同地方会很不一样。

  因此即使当用一个共享的存储的时候,一些client也希望只用完整存储的一部分,我们称之为子树 subtree。client在它做出一个状态请求的时候必须请求该子树,并且当它订阅更新的时候也必须明确指定同一个子树。

  对树来说有很多种常用的语法,一种就是*分层路径(path hierarchy),另一种是主题树(topic tree)*。这些看起来像这样:

  • 分层路径: /some/list/of/paths
  • 主题树: some.list.of.topics

  我们会用分层路径,并扩展client和server好让client能用一个单独的子树工作。一旦你学会了怎么处理一个单独的子树,就会自己扩展去管理多子树了,当然是你的应用里需要的话。

  下面是实现的server的子树,一个模型三的变体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
//  clonesrv4: Clone server, Model Four in C
// Clone server - Model Four

// Lets us build this source without creating a library
#include "kvsimple.c"

// Routing information for a key-value snapshot
typedef struct {
void *socket; // ROUTER socket to send to
zframe_t *identity; // Identity of peer who requested state
char *subtree; // Client subtree specification
} kvroute_t;

// Send one state snapshot key-value pair to a socket
// Hash item data is our kvmsg object, ready to send
static int
s_send_single (const char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
&& memcmp (kvroute->subtree,
kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
// Send identity of recipient first
zframe_send (&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send (kvmsg, kvroute->socket);
}
return 0;
}

// The main task is identical to clonesrv3 except for where it
// handles subtrees.

int main (void)
{
// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (snapshot, "tcp://*:5556");
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5557");
void *collector = zsocket_new (ctx, ZMQ_PULL);
zsocket_bind (collector, "tcp://*:5558");

int64_t sequence = 0;
zhash_t *kvmap = zhash_new ();

zmq_pollitem_t items [] = {
{ collector, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);

// Apply state update sent from client
if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (collector);
if (!kvmsg)
break; // Interrupted
kvmsg_set_sequence (kvmsg, ++sequence);
kvmsg_send (kvmsg, publisher);
kvmsg_store (&kvmsg, kvmap);
printf ("I: publishing update %5d\n", (int) sequence);
}
// Execute state snapshot request
if (items [1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv (snapshot);
if (!identity)
break; // Interrupted

// Request is in second frame of message
char *request = zstr_recv (snapshot);
char *subtree = NULL;
if (streq (request, "ICANHAZ?")) {
free (request);
subtree = zstr_recv (snapshot);
}
else {
printf ("E: bad request, aborting\n");
break;
}
// Send state snapshot to client
kvroute_t routing = { snapshot, identity, subtree };

// For each entry in kvmap, send kvmsg to client
zhash_foreach (kvmap, s_send_single, &routing);

// Now send END message with sequence number
printf ("I: sending shapshot=%d\n", (int) sequence);
zframe_send (&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) subtree, 0);
kvmsg_send (kvmsg, snapshot);
kvmsg_destroy (&kvmsg);
free (subtree);
}
}
printf (" Interrupted\n%d messages handled\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);

return 0;
}

  下面是相应的client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// clonecli4: Clone client, Model Four in C
// Clone client - Model Four

// Lets us build this source without creating a library
#include "kvsimple.c"

// This client is identical to clonecli3 except for where we
// handles subtrees.
#define SUBTREE "/client/"

int main (void)
{
// Prepare our context and subscriber
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (subscriber, "");
zsocket_connect (subscriber, "tcp://localhost:5557");
zsocket_set_subscribe (subscriber, SUBTREE);
void *publisher = zsocket_new (ctx, ZMQ_PUSH);
zsocket_connect (publisher, "tcp://localhost:5558");

zhash_t *kvmap = zhash_new ();
srandom ((unsigned) time (NULL));

// We first request a state snapshot:
int64_t sequence = 0;
zstr_sendm (snapshot, "ICANHAZ?");
zstr_send (snapshot, SUBTREE);
while (true) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // Interrupted
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence (kvmsg);
printf ("I: received snapshot=%d\n", (int) sequence);
kvmsg_destroy (&kvmsg);
break; // Done
}
kvmsg_store (&kvmsg, kvmap);
}
int64_t alarm = zclock_time () + 1000;
while (!zctx_interrupted) {
zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
int tickless = (int) ((alarm - zclock_time ()));
if (tickless < 0)
tickless = 0;
int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Context has been shut down

if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
break; // Interrupted

// Discard out-of-sequence kvmsgs, incl. heartbeats
if (kvmsg_sequence (kvmsg) > sequence) {
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
printf ("I: received update=%d\n", (int) sequence);
}
else
kvmsg_destroy (&kvmsg);
}
// If we timed out, generate a random kvmsg
if (zclock_time () >= alarm) {
kvmsg_t *kvmsg = kvmsg_new (0);
kvmsg_fmt_key (kvmsg, "%s%d", SUBTREE, randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_destroy (&kvmsg);
alarm = zclock_time () + 1000;
}
}
printf (" Interrupted\n%d messages in\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}

临时值 Ephemeral Values

  临时值是那些不被定期更新就会自动过期的值。如果你想把克隆模型用于注册服务,那临时值就能让你做动态估计。一个节点加入网络,发布它的地址,然后按规律更新。如果该节点挂掉,它的地址最终就会被移除。

  临时值的一个常用方法是把它们附加到一个*会话(session)中,当会话结束的时候删除它们。在克隆模型中,会话由client定义,如果client死掉会话就会结束。一个简单点儿的实现是附加一个剩余时间(time to live, TTL)*给临时值,该剩余时间由server使用去删除那些没有在指定时间内更新的数据。

  不管什么时候我都尽可能使用的一个很好的设计原则是不要在非必须的时候发明概念。如果我们有很多个临时值,那会话会提供更好的行能。如果我们只有很少的临时值,那最好给每个都设置一个TTL。如果我们使用了大量的临时值,把它们附加到会话中并且适时过期会更有效。在目前这并不是一个问题,并且可能永远都不会遇到,因此会话就可以先不考虑了。

  现在我们来实现临时值。首先,我们需要个在key-value message中编码TTL的方法。可以增加个帧,但使用zmq帧结构的一个问题是每次我们想增加一个新属性,就必须改变message的结构。这破坏了兼容性。因此让我们给message增加个性质帧,写些能让我们存取属性值的代码。

  其次,我们需要个方法,比如说”删除该key”。到目前为止,server和client都盲目的把新值插入或更新进它们的hash表。我们可以说如果value为空,就意味着”删除该key”。

  下面是个更完整的kvmsg类的版本,它实现了属性帧(也增加了一个UUID帧,我们稍后会用到)。如果需要,它也处理了从hash表中删除key的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
//  kvmsg: Key-value message class: full in C
// kvmsg class - key-value message class for example applications

#include "kvmsg.h"
#include <uuid/uuid.h>
#include "zlist.h"

// Keys are short strings
#define KVMSG_KEY_MAX 255

// Message is formatted on wire as 4 frames:
// frame 0: key (0MQ string)
// frame 1: sequence (8 bytes, network order)
// frame 2: uuid (blob, 16 bytes)
// frame 3: properties (0MQ string)
// frame 4: body (blob)
#define FRAME_KEY 0
#define FRAME_SEQ 1
#define FRAME_UUID 2
#define FRAME_PROPS 3
#define FRAME_BODY 4
#define KVMSG_FRAMES 5

// Structure of our class
struct _kvmsg {
// Presence indicators for each frame
int present [KVMSG_FRAMES];
// Corresponding 0MQ message frames, if any
zmq_msg_t frame [KVMSG_FRAMES];
// Key, copied into safe C string
char key [KVMSG_KEY_MAX + 1];
// List of properties, as name=value strings
zlist_t *props;
size_t props_size;
};

// These two helpers serialize a list of properties to and from a
// message frame:

static void
s_encode_props (kvmsg_t *self)
{
zmq_msg_t *msg = &self->frame [FRAME_PROPS];
if (self->present [FRAME_PROPS])
zmq_msg_close (msg);

zmq_msg_init_size (msg, self->props_size);
char *prop = zlist_first (self->props);
char *dest = (char *) zmq_msg_data (msg);
while (prop) {
strcpy (dest, prop);
dest += strlen (prop);
*dest++ = '\n';
prop = zlist_next (self->props);
}
self->present [FRAME_PROPS] = 1;
}

static void
s_decode_props (kvmsg_t *self)
{
zmq_msg_t *msg = &self->frame [FRAME_PROPS];
self->props_size = 0;
while (zlist_size (self->props))
free (zlist_pop (self->props));

size_t remainder = zmq_msg_size (msg);
char *prop = (char *) zmq_msg_data (msg);
char *eoln = memchr (prop, '\n', remainder);
while (eoln) {
*eoln = 0;
zlist_append (self->props, strdup (prop));
self->props_size += strlen (prop) + 1;
remainder -= strlen (prop) + 1;
prop = eoln + 1;
eoln = memchr (prop, '\n', remainder);
}
}

// Here are the constructor and destructor for the class:

// Constructor, takes a sequence number for the new kvmsg instance:
kvmsg_t *
kvmsg_new (int64_t sequence)
{
kvmsg_t
*self;

self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));
self->props = zlist_new ();
kvmsg_set_sequence (self, sequence);
return self;
}

// zhash_free_fn callback helper that does the low level destruction:
void
kvmsg_free (void *ptr)
{
if (ptr) {
kvmsg_t *self = (kvmsg_t *) ptr;
// Destroy message frames if any
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);

// Destroy property list
while (zlist_size (self->props))
free (zlist_pop (self->props));
zlist_destroy (&self->props);

// Free object itself
free (self);
}
}

// Destructor
void
kvmsg_destroy (kvmsg_t **self_p)
{
assert (self_p);
if (*self_p) {
kvmsg_free (*self_p);
*self_p = NULL;
}
}

// This method reads a key-value message from the socket and returns a
// new kvmsg instance:

kvmsg_t *
kvmsg_recv (void *socket)
{
// This method is almost unchanged from kvsimple
assert (socket);
kvmsg_t *self = kvmsg_new (0);

// Read all frames off the wire, reject if bogus
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);
zmq_msg_init (&self->frame [frame_nbr]);
self->present [frame_nbr] = 1;
if (zmq_msg_recv (&self->frame [frame_nbr], socket, 0) == -1) {
kvmsg_destroy (&self);
break;
}
// Verify multipart framing
int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;
if (zsocket_rcvmore (socket) != rcvmore) {
kvmsg_destroy (&self);
break;
}
}
if (self)
s_decode_props (self);
return self;
}

// Send key-value message to socket; any empty frames are sent as such.
void
kvmsg_send (kvmsg_t *self, void *socket)
{
assert (self);
assert (socket);

s_encode_props (self);
// The rest of the method is unchanged from kvsimple
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
zmq_msg_t copy;
zmq_msg_init (&copy);
if (self->present [frame_nbr])
zmq_msg_copy (&copy, &self->frame [frame_nbr]);
zmq_msg_send (&copy, socket,
(frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);
zmq_msg_close (&copy);
}
}

// This method duplicates a kvmsg instance, returns the new instance:

kvmsg_t *
kvmsg_dup (kvmsg_t *self)
{
kvmsg_t *kvmsg = kvmsg_new (0);
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present [frame_nbr]) {
zmq_msg_t *src = &self->frame [frame_nbr];
zmq_msg_t *dst = &kvmsg->frame [frame_nbr];
zmq_msg_init_size (dst, zmq_msg_size (src));
memcpy (zmq_msg_data (dst),
zmq_msg_data (src), zmq_msg_size (src));
kvmsg->present [frame_nbr] = 1;
}
}
kvmsg->props_size = zlist_size (self->props);
char *prop = (char *) zlist_first (self->props);
while (prop) {
zlist_append (kvmsg->props, strdup (prop));
prop = (char *) zlist_next (self->props);
}
return kvmsg;
}

// The key, sequence, body, and size methods are the same as in kvsimple.

// Return key from last read message, if any, else NULL
char *
kvmsg_key (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_KEY]) {
if (!*self->key) {
size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
if (size > KVMSG_KEY_MAX)
size = KVMSG_KEY_MAX;
memcpy (self->key,
zmq_msg_data (&self->frame [FRAME_KEY]), size);
self->key [size] = 0;
}
return self->key;
}
else
return NULL;
}

// Set message key as provided
void
kvmsg_set_key (kvmsg_t *self, char *key)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_KEY];
if (self->present [FRAME_KEY])
zmq_msg_close (msg);
zmq_msg_init_size (msg, strlen (key));
memcpy (zmq_msg_data (msg), key, strlen (key));
self->present [FRAME_KEY] = 1;
}

// Set message key using printf format
void
kvmsg_fmt_key (kvmsg_t *self, char *format, …)
{
char value [KVMSG_KEY_MAX + 1];
va_list args;

assert (self);
va_start (args, format);
vsnprintf (value, KVMSG_KEY_MAX, format, args);
va_end (args);
kvmsg_set_key (self, value);
}

// Return sequence nbr from last read message, if any
int64_t
kvmsg_sequence (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_SEQ]) {
assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
int64_t sequence = ((int64_t) (source [0]) << 56)
+ ((int64_t) (source [1]) << 48)
+ ((int64_t) (source [2]) << 40)
+ ((int64_t) (source [3]) << 32)
+ ((int64_t) (source [4]) << 24)
+ ((int64_t) (source [5]) << 16)
+ ((int64_t) (source [6]) << 8)
+ (int64_t) (source [7]);
return sequence;
}
else
return 0;
}

// Set message sequence number
void
kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_SEQ];
if (self->present [FRAME_SEQ])
zmq_msg_close (msg);
zmq_msg_init_size (msg, 8);

byte *source = zmq_msg_data (msg);
source [0] = (byte) ((sequence >> 56) & 255);
source [1] = (byte) ((sequence >> 48) & 255);
source [2] = (byte) ((sequence >> 40) & 255);
source [3] = (byte) ((sequence >> 32) & 255);
source [4] = (byte) ((sequence >> 24) & 255);
source [5] = (byte) ((sequence >> 16) & 255);
source [6] = (byte) ((sequence >> 8) & 255);
source [7] = (byte) ((sequence) & 255);

self->present [FRAME_SEQ] = 1;
}

// Return body from last read message, if any, else NULL
byte *
kvmsg_body (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
else
return NULL;
}

// Set message body
void
kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_BODY];
if (self->present [FRAME_BODY])
zmq_msg_close (msg);
self->present [FRAME_BODY] = 1;
zmq_msg_init_size (msg, size);
memcpy (zmq_msg_data (msg), body, size);
}

// Set message body using printf format
void
kvmsg_fmt_body (kvmsg_t *self, char *format, …)
{
char value [255 + 1];
va_list args;

assert (self);
va_start (args, format);
vsnprintf (value, 255, format, args);
va_end (args);
kvmsg_set_body (self, (byte *) value, strlen (value));
}

// Return body size from last read message, if any, else zero
size_t
kvmsg_size (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return zmq_msg_size (&self->frame [FRAME_BODY]);
else
return 0;
}

// These methods get and set the UUID for the key-value message:

byte *
kvmsg_uuid (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_UUID]
&& zmq_msg_size (&self->frame [FRAME_UUID]) == sizeof (uuid_t))
return (byte *) zmq_msg_data (&self->frame [FRAME_UUID]);
else
return NULL;
}

// Sets the UUID to a randomly generated value
void
kvmsg_set_uuid (kvmsg_t *self)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_UUID];
uuid_t uuid;
uuid_generate (uuid);
if (self->present [FRAME_UUID])
zmq_msg_close (msg);
zmq_msg_init_size (msg, sizeof (uuid));
memcpy (zmq_msg_data (msg), uuid, sizeof (uuid));
self->present [FRAME_UUID] = 1;
}

// These methods get and set a specified message property:

// Get message property, return "" if no such property is defined.
char *
kvmsg_get_prop (kvmsg_t *self, char *name)
{
assert (strchr (name, '=') == NULL);
char *prop = zlist_first (self->props);
size_t namelen = strlen (name);
while (prop) {
if (strlen (prop) > namelen
&& memcmp (prop, name, namelen) == 0
&& prop [namelen] == '=')
return prop + namelen + 1;
prop = zlist_next (self->props);
}
return "";
}

// Set message property. Property name cannot contain '='. Max length of
// value is 255 chars.
void
kvmsg_set_prop (kvmsg_t *self, char *name, char *format, …)
{
assert (strchr (name, '=') == NULL);

char value [255 + 1];
va_list args;
assert (self);
va_start (args, format);
vsnprintf (value, 255, format, args);
va_end (args);

// Allocate name=value string
char *prop = malloc (strlen (name) + strlen (value) + 2);

// Remove existing property if any
sprintf (prop, "%s=", name);
char *existing = zlist_first (self->props);
while (existing) {
if (memcmp (prop, existing, strlen (prop)) == 0) {
self->props_size -= strlen (existing) + 1;
zlist_remove (self->props, existing);
free (existing);
break;
}
existing = zlist_next (self->props);
}
// Add new name=value property string
strcat (prop, value);
zlist_append (self->props, prop);
self->props_size += strlen (prop) + 1;
}

// This method stores the key-value message into a hash map, unless
// the key and value are both null. It nullifies the kvmsg reference
// so that the object is owned by the hash map, not the caller:

void
kvmsg_store (kvmsg_t **self_p, zhash_t *hash)
{
assert (self_p);
if (*self_p) {
kvmsg_t *self = *self_p;
assert (self);
if (kvmsg_size (self)) {
if (self->present [FRAME_KEY]
&& self->present [FRAME_BODY]) {
zhash_update (hash, kvmsg_key (self), self);
zhash_freefn (hash, kvmsg_key (self), kvmsg_free);
}
}
else
zhash_delete (hash, kvmsg_key (self));

*self_p = NULL;
}
}

// This method extends the kvsimple implementation with support for
// message properties:

void
kvmsg_dump (kvmsg_t *self)
{
if (self) {
if (!self) {
fprintf (stderr, "NULL");
return;
}
size_t size = kvmsg_size (self);
byte *body = kvmsg_body (self);
fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self));
fprintf (stderr, "[key:%s]", kvmsg_key (self));
fprintf (stderr, "[size:%zd] ", size);
if (zlist_size (self->props)) {
fprintf (stderr, "[");
char *prop = zlist_first (self->props);
while (prop) {
fprintf (stderr, "%s;", prop);
prop = zlist_next (self->props);
}
fprintf (stderr, "]");
}
int char_nbr;
for (char_nbr = 0; char_nbr < size; char_nbr++)
fprintf (stderr, "%02X", body [char_nbr]);
fprintf (stderr, "\n");
}
else
fprintf (stderr, "NULL message\n");
}

// This method is the same as in kvsimple with added support
// for the uuid and property features of kvmsg:

int
kvmsg_test (int verbose)
{
kvmsg_t
*kvmsg;

printf (" * kvmsg: ");

// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *output = zsocket_new (ctx, ZMQ_DEALER);
int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);
void *input = zsocket_new (ctx, ZMQ_DEALER);
rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);

zhash_t *kvmap = zhash_new ();

// Test send and receive of simple message
kvmsg = kvmsg_new (1);
kvmsg_set_key (kvmsg, "key");
kvmsg_set_uuid (kvmsg);
kvmsg_set_body (kvmsg, (byte *) "body", 4);
if (verbose)
kvmsg_dump (kvmsg);
kvmsg_send (kvmsg, output);
kvmsg_store (&kvmsg, kvmap);

kvmsg = kvmsg_recv (input);
if (verbose)
kvmsg_dump (kvmsg);
assert (streq (kvmsg_key (kvmsg), "key"));
kvmsg_store (&kvmsg, kvmap);

// Test send and receive of message with properties
kvmsg = kvmsg_new (2);
kvmsg_set_prop (kvmsg, "prop1", "value1");
kvmsg_set_prop (kvmsg, "prop2", "value1");
kvmsg_set_prop (kvmsg, "prop2", "value2");
kvmsg_set_key (kvmsg, "key");
kvmsg_set_uuid (kvmsg);
kvmsg_set_body (kvmsg, (byte *) "body", 4);
assert (streq (kvmsg_get_prop (kvmsg, "prop2"), "value2"));
if (verbose)
kvmsg_dump (kvmsg);
kvmsg_send (kvmsg, output);
kvmsg_destroy (&kvmsg);

kvmsg = kvmsg_recv (input);
if (verbose)
kvmsg_dump (kvmsg);
assert (streq (kvmsg_key (kvmsg), "key"));
assert (streq (kvmsg_get_prop (kvmsg, "prop2"), "value2"));
kvmsg_destroy (&kvmsg);
// Shutdown and destroy all objects
zhash_destroy (&kvmap);
zctx_destroy (&ctx);

printf ("OK\n");
return 0;
}

模型5的client跟模型4的差不多一样。它现在使用了完整版的kvmsg类,在每条message上都设置了一个随机的ttl属性(秒级):

1
kvmsg_set_prop(kvmsg, "ttl", "%d", randof(30));

使用反应器 Using a Reactor

  到现在,我们在server使用了一个轮询循环。在下个模型的server中,我们会换成使用反应器。在C中,我们使用CZMQ的zloop类。使用反应器会让代码更多,但更易理解和构建,因为server的每个部分都在一个单独的反应器句柄中得到处理。

  我们使用一个单独的线程,在反应器句柄之间传递一个server对象。我们可以把server组织成多线程,每个管理一个socket或者定时器,但多线程在没有共享数据的时候才会工作的更好。在本例中所有的工作都围绕这server的hash表,因此单线程更简单。

  下面是三个反应器句柄:

  • 一个处理来自ROUTER socket上的快照请求。
  • 一个处理来自PULL socket的client的更新。
  • 一个负责清除那些已经超过TTL的临时值。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    //  clonesrv5: Clone server, Model Five in C
    // Clone server - Model Five

    // Lets us build this source without creating a library
    #include "kvmsg.c"

    // zloop reactor handlers
    static int s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args);
    static int s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args);
    static int s_flush_ttl (zloop_t *loop, zmq_pollitem_t *poller, void *args);

    // Our server is defined by these properties
    typedef struct {
    zctx_t *ctx; // Context wrapper
    zhash_t *kvmap; // Key-value store
    zloop_t *loop; // zloop reactor
    int port; // Main port we're working on
    int64_t sequence; // How many updates we're at
    void *snapshot; // Handle snapshot requests
    void *publisher; // Publish updates to clients
    void *collector; // Collect updates from clients
    } clonesrv_t;

    int main (void)
    {
    clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
    self->port = 5556;
    self->ctx = zctx_new ();
    self->kvmap = zhash_new ();
    self->loop = zloop_new ();
    zloop_set_verbose (self->loop, false);

    // Set up our clone server sockets
    self->snapshot = zsocket_new (self->ctx, ZMQ_ROUTER);
    zsocket_bind (self->snapshot, "tcp://*:%d", self->port);
    self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
    zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1);
    self->collector = zsocket_new (self->ctx, ZMQ_PULL);
    zsocket_bind (self->collector, "tcp://*:%d", self->port + 2);

    // Register our handlers with reactor
    zmq_pollitem_t poller = { 0, 0, ZMQ_POLLIN };
    poller.socket = self->snapshot;
    zloop_poller (self->loop, &poller, s_snapshots, self);
    poller.socket = self->collector;
    zloop_poller (self->loop, &poller, s_collector, self);
    zloop_timer (self->loop, 1000, 0, s_flush_ttl, self);

    // Run reactor until process interrupted
    zloop_start (self->loop);

    zloop_destroy (&self->loop);
    zhash_destroy (&self->kvmap);
    zctx_destroy (&self->ctx);
    free (self);
    return 0;
    }

    // We handle ICANHAZ? requests by sending snapshot data to the
    // client that requested it:

    // Routing information for a key-value snapshot
    typedef struct {
    void *socket; // ROUTER socket to send to
    zframe_t *identity; // Identity of peer who requested state
    char *subtree; // Client subtree specification
    } kvroute_t;

    // We call this function for each key-value pair in our hash table
    static int
    s_send_single (const char *key, void *data, void *args)
    {
    kvroute_t *kvroute = (kvroute_t *) args;
    kvmsg_t *kvmsg = (kvmsg_t *) data;
    if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
    && memcmp (kvroute->subtree,
    kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
    zframe_send (&kvroute->identity, // Choose recipient
    kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
    kvmsg_send (kvmsg, kvroute->socket);
    }
    return 0;
    }

    // This is the reactor handler for the snapshot socket; it accepts
    // just the ICANHAZ? request and replies with a state snapshot ending
    // with a KTHXBAI message:

    static int
    s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args)
    {
    clonesrv_t *self = (clonesrv_t *) args;

    zframe_t *identity = zframe_recv (poller->socket);
    if (identity) {
    // Request is in second frame of message
    char *request = zstr_recv (poller->socket);
    char *subtree = NULL;
    if (streq (request, "ICANHAZ?")) {
    free (request);
    subtree = zstr_recv (poller->socket);
    }
    else
    printf ("E: bad request, aborting\n");

    if (subtree) {
    // Send state socket to client
    kvroute_t routing = { poller->socket, identity, subtree };
    zhash_foreach (self->kvmap, s_send_single, &routing);

    // Now send END message with sequence number
    zclock_log ("I: sending shapshot=%d", (int) self->sequence);
    zframe_send (&identity, poller->socket, ZFRAME_MORE);
    kvmsg_t *kvmsg = kvmsg_new (self->sequence);
    kvmsg_set_key (kvmsg, "KTHXBAI");
    kvmsg_set_body (kvmsg, (byte *) subtree, 0);
    kvmsg_send (kvmsg, poller->socket);
    kvmsg_destroy (&kvmsg);
    free (subtree);
    }
    zframe_destroy(&identity);
    }
    return 0;
    }

    // We store each update with a new sequence number, and if necessary, a
    // time-to-live. We publish updates immediately on our publisher socket:

    static int
    s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args)
    {
    clonesrv_t *self = (clonesrv_t *) args;

    kvmsg_t *kvmsg = kvmsg_recv (poller->socket);
    if (kvmsg) {
    kvmsg_set_sequence (kvmsg, ++self->sequence);
    kvmsg_send (kvmsg, self->publisher);
    int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
    if (ttl)
    kvmsg_set_prop (kvmsg, "ttl",
    "%" PRId64, zclock_time () + ttl * 1000);
    kvmsg_store (&kvmsg, self->kvmap);
    zclock_log ("I: publishing update=%d", (int) self->sequence);
    }
    return 0;
    }

    // At regular intervals, we flush ephemeral values that have expired. This
    // could be slow on very large data sets:

    // If key-value pair has expired, delete it and publish the
    // fact to listening clients.
    static int
    s_flush_single (const char *key, void *data, void *args)
    {
    clonesrv_t *self = (clonesrv_t *) args;

    kvmsg_t *kvmsg = (kvmsg_t *) data;
    int64_t ttl;
    sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl);
    if (ttl && zclock_time () >= ttl) {
    kvmsg_set_sequence (kvmsg, ++self->sequence);
    kvmsg_set_body (kvmsg, (byte *) "", 0);
    kvmsg_send (kvmsg, self->publisher);
    kvmsg_store (&kvmsg, self->kvmap);
    zclock_log ("I: publishing delete=%d", (int) self->sequence);
    }
    return 0;
    }

    static int
    s_flush_ttl (zloop_t *loop, zmq_pollitem_t *poller, void *args)
    {
    clonesrv_t *self = (clonesrv_t *) args;
    if (self->kvmap)
    zhash_foreach (self->kvmap, s_flush_single, args);
    return 0;
    }
    给可靠性添加双子星模型 Adding the Binary Star Pattern for Reliability

  到目前位置我们研究的克隆模型还比较简单。现在我们就要进入相当复杂的部分了,这让我又需要再去拿一杯浓缩咖啡。你会发现要设计”可靠的”消息机制相当复杂,需要在设计之前就问自己”到底需不需要这个?”如果使用不可靠的或”足够”可靠的系统就能应付过去,那就可以少费很多功夫。当然,你也可能会时不时丢些数据。这经常是个需要权衡的问题。因为这杯咖啡相当好喝,我们就开始跳进这个深坑吧。

  在你运行这最后一个模型的时候,你需要停止然后重启server。这看起来像是它恢复过来了,但当然是它把更新放到了一个空的状态存储而不是真正的当前状态存储中。任何新加入的client只会得到最近的更新而得不到完整的历史更新。

  我们想要的就是一种让server从中断或崩溃中恢复过来的方法。我们也需要在server停止一定时间服务的时候提供备份。当某人要求”可靠性”的时候,问问他们想要处理哪些故障。在本例中,有这些故障:

  • server进程崩溃然后自动或手动重启。该进程丢了它的状态,必须从某处恢复过来。
  • server机器挂掉并且很长时间掉线。client必须切换到替代server上。
  • server进程或机器断网了,比如交换机或数据中心挂掉了。它在某个时候可能会恢复,但同时client需要有个备份server。

  第一步是要添加第二个server。我们可以用第四章的双子星模型,把这两个server组织成主server和备份server。双子星是个反应器,因此我们把最后这个server模型重构成反应器模式是很有用的。

  我们需要保证如果主server挂掉的话没有丢失更新。最简单的方法就是把它们发送给两个server。备份server然后就像一个client那样工作,通过接收更新保持状态同步。它也会从client那里拿到新的更新。它还不能把这些更新放到自己的hash表中,但可以保持一端时间。

  因此,模型6从模型5的基础上引入了如下改变:

  • 我们使用一个pub-sub流而不是push-pull流来做client发送到server的更新。这考虑到需要向两个server都发送更新。否则我们就必须使用两个DEALER socket。
  • 我们给server的更新(给client的)添加了心跳,好让在主server挂掉的时候client能检测到,然后它能切换到备份服务器上。
  • 我们用双子星bstar反应器类连接这两个server。双子星依赖client向它们认为活跃的server发送明确的请求来投票。我们会使用快照请求作为投票机制。
  • 我们通过增加一个UUID字段保证所有的更新message都是唯一的。client生成它,然后server会在重新发布出来的更新中传递它。
  • 不活跃的server会保存一份它从client接收的更新的”待定列表(pending list)”,而不是从活跃的server上拿来;或者从活跃的server上拿来,而不是从client上。该列表从旧到新依次排列,因此很容易就移除头部的更新。

f61

把client的逻辑设计成一个有限状态机很有用,client循环经过三种状态:

  • client打开并连接它的socket,然后向第一个server请求快照。为了避免请求风暴,它只会询问任一指定的server两次。请求可能会丢失,这运气就比较糟糕了。两个都丢失的可能性会少很多。
  • client等着当前server的一个回复(快照数据),如果拿到,就存储起来。如果在一段时间内没有回复,它就会切换到另一台server上去。
  • 当client已经拿到它的快照,就等着并处理更新。再次,如果它在一段时间内没有收到server的任何数据,就切换到另一台server上去。

  client会永远循环。很可能在启动或者进行故障转移的时候一些client会试着连接主server而另一些尝试跟备份server通信。双子星状态机会处理这种情况,确切来说是希望能处理。很难保证软件的正确性:相反我们会不断锤炼它直到没有错。

  故障转移会按照以下步骤进行:

  • client检测到主server不再发送心跳,并且得出它挂掉的结论。然后client去连接备份server并发送一个新的状态快照请求。
  • 备份server从client接收到快照请求,然后它检测到主server已经挂掉了,就替换掉主server。
  • 备份server把它的待定列表添加到自己的hash表中,然后开始处理状态快照请求。

  当主server恢复过来,它会:

  • 启动然后变成不活跃的server,作为一个克隆模型的client连接到备份server。
  • 通过它自己的SUB socket接收从client的更新。

  我们会做出如下的假设:

  • 至少一个server会持续运行。如果两台server都挂掉,就会丢失所有的server状态并且没办法恢复。
  • 多client不会在同一时间更新相同的hash key。client的更新会按照不同的顺序抵达两台server。因此,备份server可能会按照跟主server可能会做或已经做了的顺序不同的顺序从它的待定列表中应用更新。从一个client来的更新总是会按照相同的顺序抵达两台server,因此它是安全的。

  因此我们使用双子星模型的高可靠性server组的框架会有两个server和一组跟两台server都通信的client组成。

f62

  下面是克隆模型server的第六个也是最后一个模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
//  clonesrv6: Clone server, Model Six in C
// Clone server Model Six

// Lets us build this source without creating a library
#include "bstar.c"
#include "kvmsg.c"

// We define a set of reactor handlers and our server object structure:

// Bstar reactor handlers
static int
s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int
s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int
s_flush_ttl (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int
s_send_hugz (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int
s_new_active (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int
s_new_passive (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int
s_subscriber (zloop_t *loop, zmq_pollitem_t *poller, void *args);

// Our server is defined by these properties
typedef struct {
zctx_t *ctx; // Context wrapper
zhash_t *kvmap; // Key-value store
bstar_t *bstar; // Bstar reactor core
int64_t sequence; // How many updates we're at
int port; // Main port we're working on
int peer; // Main port of our peer
void *publisher; // Publish updates and hugz
void *collector; // Collect updates from clients
void *subscriber; // Get updates from peer
zlist_t *pending; // Pending updates from clients
bool primary; // true if we're primary
bool active; // true if we're active
bool passive; // true if we're passive
} clonesrv_t;

// The main task parses the command line to decide whether to start
// as a primary or backup server. We're using the Binary Star pattern
// for reliability. This interconnects the two servers so they can
// agree on which one is primary and which one is backup. To allow the
// two servers to run on the same box, we use different ports for
// primary and backup. Ports 5003/5004 are used to interconnect the
// servers. Ports 5556/5566 are used to receive voting events (snapshot
// requests in the clone pattern). Ports 5557/5567 are used by the
// publisher, and ports 5558/5568 are used by the collector:

int main (int argc, char *argv [])
{
clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
if (argc == 2 && streq (argv [1], "-p")) {
zclock_log ("I: primary active, waiting for backup (passive)");
self->bstar = bstar_new (BSTAR_PRIMARY, "tcp://*:5003",
"tcp://localhost:5004");
bstar_voter (self->bstar, "tcp://*:5556",
ZMQ_ROUTER, s_snapshots, self);
self->port = 5556;
self->peer = 5566;
self->primary = true;
}
else
if (argc == 2 && streq (argv [1], "-b")) {
zclock_log ("I: backup passive, waiting for primary (active)");
self->bstar = bstar_new (BSTAR_BACKUP, "tcp://*:5004",
"tcp://localhost:5003");
bstar_voter (self->bstar, "tcp://*:5566",
ZMQ_ROUTER, s_snapshots, self);
self->port = 5566;
self->peer = 5556;
self->primary = false;
}
else {
printf ("Usage: clonesrv4 { -p | -b }\n");
free (self);
exit (0);
}
// Primary server will become first active
if (self->primary)
self->kvmap = zhash_new ();

self->ctx = zctx_new ();
self->pending = zlist_new ();
bstar_set_verbose (self->bstar, true);

// Set up our clone server sockets
self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
self->collector = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_set_subscribe (self->collector, "");
zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1);
zsocket_bind (self->collector, "tcp://*:%d", self->port + 2);

// Set up our own clone client interface to peer
self->subscriber = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_set_subscribe (self->subscriber, "");
zsocket_connect (self->subscriber,
"tcp://localhost:%d", self->peer + 1);

// After we've setup our sockets, we register our binary star
// event handlers, and then start the bstar reactor. This finishes
// when the user presses Ctrl-C or when the process receives a SIGINT
// interrupt:

// Register state change handlers
bstar_new_active (self->bstar, s_new_active, self);
bstar_new_passive (self->bstar, s_new_passive, self);

// Register our other handlers with the bstar reactor
zmq_pollitem_t poller = { self->collector, 0, ZMQ_POLLIN };
zloop_poller (bstar_zloop (self->bstar), &poller, s_collector, self);
zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_flush_ttl, self);
zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_send_hugz, self);

// Start the bstar reactor
bstar_start (self->bstar);

// Interrupted, so shut down
while (zlist_size (self->pending)) {
kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
kvmsg_destroy (&kvmsg);
}
zlist_destroy (&self->pending);
bstar_destroy (&self->bstar);
zhash_destroy (&self->kvmap);
zctx_destroy (&self->ctx);
free (self);

return 0;
}

// We handle ICANHAZ? requests exactly as in the clonesrv5 example.

// Routing information for a key-value snapshot
typedef struct {
void *socket; // ROUTER socket to send to
zframe_t *identity; // Identity of peer who requested state
char *subtree; // Client subtree specification
} kvroute_t;

// Send one state snapshot key-value pair to a socket
// Hash item data is our kvmsg object, ready to send
static int
s_send_single (const char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
&& memcmp (kvroute->subtree,
kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
zframe_send (&kvroute->identity, // Choose recipient
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send (kvmsg, kvroute->socket);
}
return 0;
}

static int
s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

zframe_t *identity = zframe_recv (poller->socket);
if (identity) {
// Request is in second frame of message
char *request = zstr_recv (poller->socket);
char *subtree = NULL;
if (streq (request, "ICANHAZ?")) {
free (request);
subtree = zstr_recv (poller->socket);
}
else
printf ("E: bad request, aborting\n");

if (subtree) {
// Send state socket to client
kvroute_t routing = { poller->socket, identity, subtree };
zhash_foreach (self->kvmap, s_send_single, &routing);

// Now send END message with sequence number
zclock_log ("I: sending shapshot=%d", (int) self->sequence);
zframe_send (&identity, poller->socket, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (self->sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) subtree, 0);
kvmsg_send (kvmsg, poller->socket);
kvmsg_destroy (&kvmsg);
free (subtree);
}
zframe_destroy(&identity);
}
return 0;
}

// The collector is more complex than in the clonesrv5 example because the
// way it processes updates depends on whether we're active or passive.
// The active applies them immediately to its kvmap, whereas the passive
// queues them as pending:

// If message was already on pending list, remove it and return true,
// else return false.
static int
s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg)
{
kvmsg_t *held = (kvmsg_t *) zlist_first (self->pending);
while (held) {
if (memcmp (kvmsg_uuid (kvmsg),
kvmsg_uuid (held), sizeof (uuid_t)) == 0) {
zlist_remove (self->pending, held);
return true;
}
held = (kvmsg_t *) zlist_next (self->pending);
}
return false;
}

static int
s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

kvmsg_t *kvmsg = kvmsg_recv (poller->socket);
if (kvmsg) {
if (self->active) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_send (kvmsg, self->publisher);
int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
if (ttl)
kvmsg_set_prop (kvmsg, "ttl",
"%" PRId64, zclock_time () + ttl * 1000);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: publishing update=%d", (int) self->sequence);
}
else {
// If we already got message from active, drop it, else
// hold on pending list
if (s_was_pending (self, kvmsg))
kvmsg_destroy (&kvmsg);
else
zlist_append (self->pending, kvmsg);
}
}
return 0;
}

// We purge ephemeral values using exactly the same code as in
// the previous clonesrv5 example.
// If key-value pair has expired, delete it and publish the
// fact to listening clients.
static int
s_flush_single (const char *key, void *data, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

kvmsg_t *kvmsg = (kvmsg_t *) data;
int64_t ttl;
sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl);
if (ttl && zclock_time () >= ttl) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, self->publisher);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: publishing delete=%d", (int) self->sequence);
}
return 0;
}

static int
s_flush_ttl (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
if (self->kvmap)
zhash_foreach (self->kvmap, s_flush_single, args);
return 0;
}

// We send a HUGZ message once a second to all subscribers so that they
// can detect if our server dies. They'll then switch over to the backup
// server, which will become active:

static int
s_send_hugz (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

kvmsg_t *kvmsg = kvmsg_new (self->sequence);
kvmsg_set_key (kvmsg, "HUGZ");
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, self->publisher);
kvmsg_destroy (&kvmsg);

return 0;
}

// When we switch from passive to active, we apply our pending list so that
// our kvmap is up-to-date. When we switch to passive, we wipe our kvmap
// and grab a new snapshot from the active server:

static int
s_new_active (zloop_t *loop, zmq_pollitem_t *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

self->active = true;
self->passive = false;

// Stop subscribing to updates
zmq_pollitem_t poller = { self->subscriber, 0, ZMQ_POLLIN };
zloop_poller_end (bstar_zloop (self->bstar), &poller);

// Apply pending list to own hash table
while (zlist_size (self->pending)) {
kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_send (kvmsg, self->publisher);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: publishing pending=%d", (int) self->sequence);
}
return 0;
}

static int
s_new_passive (zloop_t *loop, zmq_pollitem_t *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

zhash_destroy (&self->kvmap);
self->active = false;
self->passive = true;

// Start subscribing to updates
zmq_pollitem_t poller = { self->subscriber, 0, ZMQ_POLLIN };
zloop_poller (bstar_zloop (self->bstar), &poller, s_subscriber, self);

return 0;
}

// When we get an update, we create a new kvmap if necessary, and then
// add our update to our kvmap. We're always passive in this case:

static int
s_subscriber (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
// Get state snapshot if necessary
if (self->kvmap == NULL) {
self->kvmap = zhash_new ();
void *snapshot = zsocket_new (self->ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:%d", self->peer);
zclock_log ("I: asking for snapshot from: tcp://localhost:%d",
self->peer);
zstr_sendm (snapshot, "ICANHAZ?");
zstr_send (snapshot, ""); // blank subtree to get all
while (true) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // Interrupted
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
self->sequence = kvmsg_sequence (kvmsg);
kvmsg_destroy (&kvmsg);
break; // Done
}
kvmsg_store (&kvmsg, self->kvmap);
}
zclock_log ("I: received snapshot=%d", (int) self->sequence);
zsocket_destroy (self->ctx, snapshot);
}
// Find and remove update off pending list
kvmsg_t *kvmsg = kvmsg_recv (poller->socket);
if (!kvmsg)
return 0;

if (strneq (kvmsg_key (kvmsg), "HUGZ")) {
if (!s_was_pending (self, kvmsg)) {
// If active update came before client update, flip it
// around, store active update (with sequence) on pending
// list and use to clear client update when it comes later
zlist_append (self->pending, kvmsg_dup (kvmsg));
}
// If update is more recent than our kvmap, apply it
if (kvmsg_sequence (kvmsg) > self->sequence) {
self->sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: received update=%d", (int) self->sequence);
}
else
kvmsg_destroy (&kvmsg);
}
else
kvmsg_destroy (&kvmsg);

return 0;
}

  该模型只有几百行代码,但让它运行起来花费了很长时间。更准确来说,构建模型6花费了整整一个星期,”哦my上帝,这对一个例子来说真是太复杂了”。我们在这个小程序中使用了相当多的东西,有故障转移、临时值、子树等等。让我吃惊的是之前的设计相当准确。当然那么多socket流的构建和调试也是相当有挑战的。

  基于反应器的设计去除了很多代码中的啰里啰嗦,剩下的就相当简单且容易理解了。我们使用了第四章的bstar反应器。整个server是一个线程,因此没有奇怪的线程间通信——只是一个结构体指针(self)在所有的句柄之间传递,该结构体指针就能愉快的完成所有的事。使用反应器模型的一个附带好处是代码被紧密组织到一个轮询循环,这就很容易重用了。模型6的很多部分是来自模型5的。

  我是一部分一部分写的,写完一部分工作”正确”之后才去写下一部分。因为有四个或五个主要的socket流,这意味着大量的调试和测试。我是通过把message打印到控制台来调试的。不要在zmq程序中使用传统的调试器:你需要监控message流来知道到底发生了什么。

  为了测试,我总是试着用Valgrind,它会检测内存泄露和无效的内存使用。在C中,这是一个很重要的方面,因为你没办法把内存管理委派给一个垃圾收集器。使用像kvmsg和CZMQ会很有用。

集群hashy映射协议 The Clustered Hashmap Protocol

  尽管server只是一个先前模型和双子星模型的混搭,但client相当复杂。在我们详细介绍之前,先来看看最终的协议。我把它作为一个说明写在了zmo RFC网站,也就是Clustered Hashmap Protocol

  大致说来,有两种方法来设计一个像这个这么复杂的协议。一种是把每种数据流都分配到各自的一组socket上,这是我们常用的方法。优点是每个数据流都很简单且清晰。缺点是同时管理多个socket数据流会很复杂。使用反应器会简单点儿,但反应器模型会产生很多必须相互适配的可变部分。

  第二种办法是用一个单独是socket来处理所有的事情。在本例中,我在server中使用了ROUTER,在client中使用了DEALER,然后在这个连接之上处理所有的事情。这样会产生一个更复杂的协议,但至少复杂性都集中在一个地方。在第七章我们会看到一个基于ROUTER-DEALER组合的协议例子。

  现在我们来看看CHP协议。注意”SHOULD”,”MUST”和”MAY”是我们在协议中用来明确需求等级的关键字。】

目标 Goals

  CHP用于为一组基于zmq网络的client之间可靠的pub-sub模型提供基础。它定义了一个包含key-value对的”hashmap”抽象概念。任一client都能在任何时候修改任一key-value对,并且更新会被递送到所有的client。一个client能在任一时刻加入网络。

架构 Architecture

  CHP连接一组client应用和一组server。client连接到server。client不能连接彼此。client能随意加入和退出。

端口和连接  Ports and Connections

  server必须(MUST)启动如下三个端口地址:

  • 一个端口在P上的SNAPSHOT端口(zmq ROUTER socket)。
  • 一个端口在P+1上的PUBLISHER 端口(zmq PUB socket)。
  • 一个端口在P+2上的COLLECTOR端口(zmq SUB socekt)。

  client应该(SHOULD)至少启动如下两个连接:

  • 一个端口在P上的SNAPSHOT连接(zmq DEALER socket)。
  • 一个端口在P+1上的SUBSCRIBER连接(zmq SUB socket)。

  如果client想更新haspmap的话,它可以(MAY)启动第三个连接:

  • 一个端口在P+2上的PUBLISHER连接(zmq PUB socket)。

  剩下的框架没在下面的命令中说明。

状态同步 State Synchronization

  client必须(MUST)在启动的时候发送一个ICANHAZ命令到它的快照连接。该命令包括以下两个帧:

1
2
3
4
ICANHAZ command
-----------------------------------
Frame 0: "ICANHAZ?"
Frame 1: subtree specification

  每个帧都是zmq字符串。子树说明可以(MAY)为空。如果不为空,它会包含一个通过反斜杠分开的一个个路径段,以反斜杠最后结束。

  server必须(MUST)返回给ICANHAZ命令零个或多个KVSYNC命令到它的快照端口,然后跟一个KTHXBAI命令。server必须(MUST)在每个命令之前前缀上client的标识,该标识由ICANHAZ命令提供。KVSYNC命令定义了一个kay-value对:

1
2
3
4
5
6
7
KVSYNC command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: <empty>
Frame 3: <empty>
Frame 4: value, as blob

  序列编号没有符号,可以是0.

  KTHXBAI命令采用如下格式:

1
2
3
4
5
6
7
KTHXBAI command
-----------------------------------
Frame 0: "KTHXBAI"
Frame 1: sequence number, 8 bytes in network order
Frame 2: <empty>
Frame 3: <empty>
Frame 4: subtree specification

  序列编号必须(MUST)是KVSYNC命令先前送过来的最大数字。

  当client收到了一个KTHXBAI命令,它应该(SHOULD)开始去接收从它的订阅连接发来的数据然后处理它们。

Server-to-Client Updates

  当server有一个hashmap的更新,它必须(MUST)把它作为一个KVPUB命令通过发布socket广播出去。KVPUB命令有如下格式:

1
2
3
4
5
6
7
KVPUB command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: UUID, 16 bytes
Frame 3: properties, as ZeroMQ string
Frame 4: value, as blob

  序列编号必须(MUST)严格递增。client必须(MUST)丢掉那些序列编号没有严格大于上一条接收到的KTHXBAI或KVPUB命令的序列编号的任何KVPUB命令。

  UUID是可选项并且frame 2可以(MAY)为空(长度为0)。属性字段组织成零个或多个”name=value”后跟一个换行符的字段的格式。如果该key-value对没有属性,那属性字段就为空。

  如果value为空,client应该(SHOULD)应该删除指定key的key-value项。

  在缺少更新的时候server应该(SHOULD)定期发送一个HUGZ命令,比如说每秒一次。HUGZ命令格式如下:

1
2
3
4
5
6
7
HUGZ command
-----------------------------------
Frame 0: "HUGZ"
Frame 1: 00000000
Frame 2: <empty>
Frame 3: <empty>
Frame 4: <empty>

  client可以(MAY)把HUGZ的缺失作为server已经挂掉的指示器(见下面可靠性)。

Client-to-Server Updates

  当client有一个针对它自己的hashmap的更新,它可以(MAY)把它作为一个KVSET命令通过发布连接发送给server。KVSET命令格式如下:

1
2
3
4
5
6
7
KVSET command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: UUID, 16 bytes
Frame 3: properties, as ZeroMQ string
Frame 4: value, as blob

  序列编号没有符号且可以为0。如果使用一个可靠的server架构的话,UUID应该(SHOULD)是个全局唯一标识。

  如果value为空,server必须(MUST)删除该指定key的key-value项。

  server应该(SHOULD)接收如下属性:

  • **ttl:**用秒定义一个存活时间(time-to-live)。如果KVSET命令有个ttl属性,server应该(SHOULD)在TTL过期的时候删除该key-value对并且广播一个value为空的KVPUB命令来删除每个client中的该向。

可靠性 Reliability

  CHP可以配置成双server,如果主server失效那备份server就会接替工作。CHP没有定义故障转移的机制,不过可以参照双子星模型的定义。

  要辅助server的可靠性设计,client可以(MAY):

  • 在每个KVSET命令中设置一个UUID。
  • 检测一段时间的HUGZ缺失,把这作为当前服务器失效的一个指示器。
  • 连接备份server并且重新请求状态同步。

扩展性和性能 Scalability and Performance

CHP被设计成能适应大规模(成千)client的架构,限制只存在于broker的系统资源。由于所有的更新都会通过一台单独的server,总体吞吐量会被限制在峰值为每秒几百万个更新,或者更少些。

安全 Security

  CHP没有实现任何认证、接入控制或者密码机制,不应该在需要这些的场景中使用。

构建一个多线程栈和API Building a Multithreaded Stach and API

  目前我们使用的client栈还不能正确处理该协议。在开始构建心跳的时候,我们需要一个能运行在后台线程的client栈。在第四章结尾的自由者模型中我们使用过一个多线程API但没有详细说明它。当你开始构建像CHP那样更复杂的zmq协议的时候多线程API就相当有用了。

f63

  如果你设计设计了一个很重要的协议并且期望应用正确实现它,那大多数开发者会在大部分时间弄错的。你会听到很多人抱怨协议太复杂、太脆弱、太难使用。即使你给它们一个很简单的API,也会很难让它们去买该系统。

  我们的多线程API包括一个前端对象和一个后台代理,通过两个PAIR socket连接。像这样连接两个PAIR socket会很有用,你的高级绑定很可能跟CZMQ一样,就是封装一个”创建一个带有我可以用来向它发送信息的管道的新线程”的方法。

  我们在本书中看到的多线程API全都采用相同的格式:

  • 对象的构造函数(clone_new)创建一个context并且启动一个带有管道的后台线程。它保存有该管道的一端以便能向后台线程发送命令。
  • 后台线程启动一个agent,该agent实际上是一个从管道socket和任何其他socket(这里是DEALER和SUB socekt)读取信息的zmq_poll循环。
  • 现在主应用线程和后台线程只通过zmq message通信。通过连接,前端发送的字符串命令会通过该类中的每个方法转换成一条message发送到后台agent,就像这样:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    void
    clone_connect (clone_t *self, char *address, char *service)
    {
    assert (self);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "CONNECT");
    zmsg_addstr (msg, address);
    zmsg_addstr (msg, service);
    zmsg_send (&msg, self->pipe);
    }
  • 如果该函数需要返回码,它可以等着一个从agent的回复message。
  • 如果该agent需要向后台返回异步事件,我们就在该类中增加一个recv函数,它会在前端管道上等着message。
  • 我们可能希望把前端管道socket暴露出来好让该类能进一步整合到轮询循环。否则每个recv方法都会阻塞该应用。

  该克隆类有着跟第四章flicliapi类相同的结构,并且增加了最后一个克隆模型client的逻辑。没有zmq,这种多线程API的设计就需要几周时间的艰苦工作。使用了zmq的话,就差不多是一两天的工作了。

  该克隆类的真实API相当简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//  Create a new clone class instance
clone_t *
clone_new (void);

// Destroy a clone class instance
void
clone_destroy (clone_t **self_p);

// Define the subtree, if any, for this clone class
void
clone_subtree (clone_t *self, char *subtree);

// Connect the clone class to one server
void
clone_connect (clone_t *self, char *address, char *service);

// Set a value in the shared hashmap
void
clone_set (clone_t *self, char *key, char *value, int ttl);

// Get a value from the shared hashmap
char *
clone_get (clone_t *self, char *key);

  下面是克隆client的模型6,它现在变成了使用该克隆类的一个瘦壳:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//  clonecli6: Clone client, Model Six in C
// Clone client Model Six

// Lets us build this source without creating a library
#include "clone.c"
#define SUBTREE "/client/"

int main (void)
{
// Create distributed hash instance
clone_t *clone = clone_new ();

// Specify configuration
clone_subtree (clone, SUBTREE);
clone_connect (clone, "tcp://localhost", "5556");
clone_connect (clone, "tcp://localhost", "5566");

// Set random tuples into the distributed hash
while (!zctx_interrupted) {
// Set random value, check it was stored
char key [255];
char value [10];
sprintf (key, "%s%d", SUBTREE, randof (10000));
sprintf (value, "%d", randof (1000000));
clone_set (clone, key, value, randof (30));
sleep (1);
}
clone_destroy (&clone);
return 0;
}

  注意连接函数,它指定了一个server端点地址。在该函数下,我们实际上在跟三个端口通信。然而,就像CHP协议说的那样,这三个端口是三个连续的端口数字:

  • server状态路由(ROUTER)在port P。
  • server更新发布者(PUB)在port P+1。
  • server更新订阅者(SUB)在port P+2。

  因此我们可以把这三个连接整合到一个逻辑操作中(我们实现为三个独立的zmq连接调用)。

  我们以该克隆栈的源代码结束。这是段复杂的代码,但只要你把它分解为前端对象类和后台agent类就很容易理解了。前端向agent发送字符串命令(“SUBTREE”, “CONNECT”, “SET”, “GET”),agent会处理这些命令并更server(s)通信。下面是agent的逻辑:

  1. 启动并从第一台server拿到快照。
  2. 当拿到一个快照后就切换到从订阅者socket读取信息。
  3. 如果没拿到快照就故障转移到第二台server。
  4. 在管道和订阅者socket上轮询。
  5. 如果得到管道上的输入,处理这个从前端对象来的控制命令。
  6. 如果得到订阅者上的输入,存储或应用该更新。
  7. 如果在一定时间内没有从server得到任何东西,就进行故障转移。
  8. 重复直到进程被Ctrl-C中断。

  下面是真正的克隆类实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
//  clone: Clone class in C
// clone class - Clone client API stack (multithreaded)

#include "clone.h"
// If no server replies within this time, abandon request
#define GLOBAL_TIMEOUT 4000 // msecs

// =====================================================================
// Synchronous part, works in our application thread

// Structure of our class

struct _clone_t {
zctx_t *ctx; // Our context wrapper
void *pipe; // Pipe through to clone agent
};

// This is the thread that handles our real clone class
static void clone_agent (void *args, zctx_t *ctx, void *pipe);

// Here are the constructor and destructor for the clone class. Note that
// we create a context specifically for the pipe that connects our
// frontend to the backend agent:

clone_t *
clone_new (void)
{
clone_t
*self;

self = (clone_t *) zmalloc (sizeof (clone_t));
self->ctx = zctx_new ();
self->pipe = zthread_fork (self->ctx, clone_agent, NULL);
return self;
}

void
clone_destroy (clone_t **self_p)
{
assert (self_p);
if (*self_p) {
clone_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}

// Specify subtree for snapshot and updates, which we must do before
// connecting to a server as the subtree specification is sent as the
// first command to the server. Sends a [SUBTREE][subtree] command to
// the agent:

void clone_subtree (clone_t *self, char *subtree)
{
assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "SUBTREE");
zmsg_addstr (msg, subtree);
zmsg_send (&msg, self->pipe);
}

// Connect to a new server endpoint. We can connect to at most two
// servers. Sends [CONNECT][endpoint][service] to the agent:

void
clone_connect (clone_t *self, char *address, char *service)
{
assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "CONNECT");
zmsg_addstr (msg, address);
zmsg_addstr (msg, service);
zmsg_send (&msg, self->pipe);
}

// Set a new value in the shared hashmap. Sends a [SET][key][value][ttl]
// command through to the agent which does the actual work:

void
clone_set (clone_t *self, char *key, char *value, int ttl)
{
char ttlstr [10];
sprintf (ttlstr, "%d", ttl);

assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "SET");
zmsg_addstr (msg, key);
zmsg_addstr (msg, value);
zmsg_addstr (msg, ttlstr);
zmsg_send (&msg, self->pipe);
}

// Look up value in distributed hash table. Sends [GET][key] to the agent and
// waits for a value response. If there is no value available, will eventually
// return NULL:

char *
clone_get (clone_t *self, char *key)
{
assert (self);
assert (key);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "GET");
zmsg_addstr (msg, key);
zmsg_send (&msg, self->pipe);

zmsg_t *reply = zmsg_recv (self->pipe);
if (reply) {
char *value = zmsg_popstr (reply);
zmsg_destroy (&reply);
return value;
}
return NULL;
}

// The backend agent manages a set of servers, which we implement using
// our simple class model:

typedef struct {
char *address; // Server address
int port; // Server port
void *snapshot; // Snapshot socket
void *subscriber; // Incoming updates
uint64_t expiry; // When server expires
uint requests; // How many snapshot requests made?
} server_t;

static server_t *
server_new (zctx_t *ctx, char *address, int port, char *subtree)
{
server_t *self = (server_t *) zmalloc (sizeof (server_t));

zclock_log ("I: adding server %s:%d…", address, port);
self->address = strdup (address);
self->port = port;

self->snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (self->snapshot, "%s:%d", address, port);
self->subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (self->subscriber, "%s:%d", address, port + 1);
zsocket_set_subscribe (self->subscriber, subtree);
zsocket_set_subscribe (self->subscriber, "HUGZ");
return self;
}

static void
server_destroy (server_t **self_p)
{
assert (self_p);
if (*self_p) {
server_t *self = *self_p;
free (self->address);
free (self);
*self_p = NULL;
}
}

// Here is the implementation of the backend agent itself:

// Number of servers to which we will talk to
#define SERVER_MAX 2

// Server considered dead if silent for this long
#define SERVER_TTL 5000 // msecs

// States we can be in
#define STATE_INITIAL 0 // Before asking server for state
#define STATE_SYNCING 1 // Getting state from server
#define STATE_ACTIVE 2 // Getting new updates from server

typedef struct {
zctx_t *ctx; // Context wrapper
void *pipe; // Pipe back to application
zhash_t *kvmap; // Actual key/value table
char *subtree; // Subtree specification, if any
server_t *server [SERVER_MAX];
uint nbr_servers; // 0 to SERVER_MAX
uint state; // Current state
uint cur_server; // If active, server 0 or 1
int64_t sequence; // Last kvmsg processed
void *publisher; // Outgoing updates
} agent_t;

static agent_t *
agent_new (zctx_t *ctx, void *pipe)
{
agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
self->ctx = ctx;
self->pipe = pipe;
self->kvmap = zhash_new ();
self->subtree = strdup ("");
self->state = STATE_INITIAL;
self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
return self;
}

static void
agent_destroy (agent_t **self_p)
{
assert (self_p);
if (*self_p) {
agent_t *self = *self_p;
int server_nbr;
for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++)
server_destroy (&self->server [server_nbr]);
zhash_destroy (&self->kvmap);
free (self->subtree);
free (self);
*self_p = NULL;
}
}

// Here we handle the different control messages from the frontend;
// SUBTREE, CONNECT, SET, and GET:

static int
agent_control_message (agent_t *self)
{
zmsg_t *msg = zmsg_recv (self->pipe);
char *command = zmsg_popstr (msg);
if (command == NULL)
return -1; // Interrupted

if (streq (command, "SUBTREE")) {
free (self->subtree);
self->subtree = zmsg_popstr (msg);
}
else
if (streq (command, "CONNECT")) {
char *address = zmsg_popstr (msg);
char *service = zmsg_popstr (msg);
if (self->nbr_servers < SERVER_MAX) {
self->server [self->nbr_servers++] = server_new (
self->ctx, address, atoi (service), self->subtree);
// We broadcast updates to all known servers
zsocket_connect (self->publisher, "%s:%d",
address, atoi (service) + 2);
}
else
zclock_log ("E: too many servers (max. %d)", SERVER_MAX);
free (address);
free (service);
}
else
// When we set a property, we push the new key-value pair onto
// all our connected servers:
if (streq (command, "SET")) {
char *key = zmsg_popstr (msg);
char *value = zmsg_popstr (msg);
char *ttl = zmsg_popstr (msg);
zhash_update (self->kvmap, key, (byte *) value);
zhash_freefn (self->kvmap, key, free);

// Send key-value pair on to server
kvmsg_t *kvmsg = kvmsg_new (0);
kvmsg_set_key (kvmsg, key);
kvmsg_set_uuid (kvmsg);
kvmsg_fmt_body (kvmsg, "%s", value);
kvmsg_set_prop (kvmsg, "ttl", ttl);
kvmsg_send (kvmsg, self->publisher);
kvmsg_destroy (&kvmsg);
free (ttl);
free (key); // Value is owned by hash table
}
else
if (streq (command, "GET")) {
char *key = zmsg_popstr (msg);
char *value = zhash_lookup (self->kvmap, key);
if (value)
zstr_send (self->pipe, value);
else
zstr_send (self->pipe, "");
free (key);
free (value);
}
free (command);
zmsg_destroy (&msg);
return 0;
}

// The asynchronous agent manages a server pool and handles the
// request-reply dialog when the application asks for it:

static void
clone_agent (void *args, zctx_t *ctx, void *pipe)
{
agent_t *self = agent_new (ctx, pipe);

while (true) {
zmq_pollitem_t poll_set [] = {
{ pipe, 0, ZMQ_POLLIN, 0 },
{ 0, 0, ZMQ_POLLIN, 0 }
};
int poll_timer = -1;
int poll_size = 2;
server_t *server = self->server [self->cur_server];
switch (self->state) {
case STATE_INITIAL:
// In this state we ask the server for a snapshot,
// if we have a server to talk to…
if (self->nbr_servers > 0) {
zclock_log ("I: waiting for server at %s:%d…",
server->address, server->port);
if (server->requests < 2) {
zstr_sendm (server->snapshot, "ICANHAZ?");
zstr_send (server->snapshot, self->subtree);
server->requests++;
}
server->expiry = zclock_time () + SERVER_TTL;
self->state = STATE_SYNCING;
poll_set [1].socket = server->snapshot;
}
else
poll_size = 1;
break;

case STATE_SYNCING:
// In this state we read from snapshot and we expect
// the server to respond, else we fail over.
poll_set [1].socket = server->snapshot;
break;

case STATE_ACTIVE:
// In this state we read from subscriber and we expect
// the server to give HUGZ, else we fail over.
poll_set [1].socket = server->subscriber;
break;
}
if (server) {
poll_timer = (server->expiry - zclock_time ())
* ZMQ_POLL_MSEC;
if (poll_timer < 0)
poll_timer = 0;
}
// We're ready to process incoming messages; if nothing at all
// comes from our server within the timeout, that means the
// server is dead:

int rc = zmq_poll (poll_set, poll_size, poll_timer);
if (rc == -1)
break; // Context has been shut down

if (poll_set [0].revents & ZMQ_POLLIN) {
if (agent_control_message (self))
break; // Interrupted
}
else
if (poll_set [1].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (poll_set [1].socket);
if (!kvmsg)
break; // Interrupted

// Anything from server resets its expiry time
server->expiry = zclock_time () + SERVER_TTL;
if (self->state == STATE_SYNCING) {
// Store in snapshot until we're finished
server->requests = 0;
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
self->sequence = kvmsg_sequence (kvmsg);
self->state = STATE_ACTIVE;
zclock_log ("I: received from %s:%d snapshot=%d",
server->address, server->port,
(int) self->sequence);
kvmsg_destroy (&kvmsg);
}
else
kvmsg_store (&kvmsg, self->kvmap);
}
else
if (self->state == STATE_ACTIVE) {
// Discard out-of-sequence updates, incl. HUGZ
if (kvmsg_sequence (kvmsg) > self->sequence) {
self->sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: received from %s:%d update=%d",
server->address, server->port,
(int) self->sequence);
}
else
kvmsg_destroy (&kvmsg);
}
}
else {
// Server has died, failover to next
zclock_log ("I: server at %s:%d didn't give HUGZ",
server->address, server->port);
self->cur_server = (self->cur_server + 1) % self->nbr_servers;
self->state = STATE_INITIAL;
}
}
agent_destroy (&self);
}