prepare context, frontend and backend sockets while true: poll on both sockets if frontend had input: read all frames from frontend send to backend if backend had input: read all frames from backend send to frontend
int total = 0; while (1) { // Tell the broker we're ready for work s_send (worker, "Hi Boss");
// Get workload from broker, until finished char *workload = s_recv (worker); int finished = (strcmp (workload, "Fired!") == 0); free (workload); if (finished) { printf ("Completed: %d tasks\n", total); break; } total++;
// Do some random work s_sleep (randof (500) + 1); } zmq_close (worker); zmq_ctx_destroy (context); return NULL; }
// While this example runs in a single process, that is only to make // it easier to start and stop the example. Each thread has its own // context and conceptually acts as a separate process.
int main (void) { void *context = zmq_ctx_new (); void *broker = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (broker, "tcp://*:5671"); srandom ((unsigned) time (NULL));
int worker_nbr; for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) { pthread_t worker; pthread_create (&worker, NULL, worker_task, NULL); } // Run for five seconds and then tell workers to end int64_t end_time = s_clock () + 5000; int workers_fired = 0; while (1) { // Next message gives us least recently used worker char *identity = s_recv (broker); s_sendmore (broker, identity); free (identity); free (s_recv (broker)); // Envelope delimiter free (s_recv (broker)); // Response from worker s_sendmore (broker, "");
// Encourage workers until it's time to fire them if (s_clock () < end_time) s_send (broker, "Work harder"); else { s_send (broker, "Fired!"); if (++workers_fired == NBR_WORKERS) break; } } zmq_close (broker); zmq_ctx_destroy (context); return 0; }
int total = 0; while (1) { // Tell the broker we're ready for work s_sendmore (worker, ""); s_send (worker, "Hi Boss");
// Get workload from broker, until finished free (s_recv (worker)); // Envelope delimiter char *workload = s_recv (worker); int finished = (strcmp (workload, "Fired!") == 0); free (workload); if (finished) { printf ("Completed: %d tasks\n", total); break; } total++;
// Do some random work s_sleep (randof (500) + 1); } zmq_close (worker); zmq_ctx_destroy (context); return NULL; }
// While this example runs in a single process, that is just to make // it easier to start and stop the example. Each thread has its own // context and conceptually acts as a separate process.
int main (void) { void *context = zmq_ctx_new (); void *broker = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (broker, "tcp://*:5671"); srandom ((unsigned) time (NULL));
int worker_nbr; for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) { pthread_t worker; pthread_create (&worker, NULL, worker_task, NULL); } // Run for five seconds and then tell workers to end int64_t end_time = s_clock () + 5000; int workers_fired = 0; while (1) { // Next message gives us least recently used worker char *identity = s_recv (broker); s_sendmore (broker, identity); free (identity); free (s_recv (broker)); // Envelope delimiter free (s_recv (broker)); // Response from worker s_sendmore (broker, "");
// Encourage workers until it's time to fire them if (s_clock () < end_time) s_send (broker, "Work harder"); else { s_send (broker, "Fired!"); if (++workers_fired == NBR_WORKERS) break; } } zmq_close (broker); zmq_ctx_destroy (context); return 0; }
// While this example runs in a single process, that is just to make // it easier to start and stop the example. Each thread has its own // context and conceptually acts as a separate process. // This is the worker task, using a REQ socket to do load-balancing. // Because s_send and s_recv can't handle 0MQ binary identities, we // set a printable text identity to allow routing.
// Tell broker we're ready for work s_send (worker, "READY");
while (1) { // Read and save all frames until we get an empty frame // In this example there is only 1, but there could be more char *identity = s_recv (worker); char *empty = s_recv (worker); assert (*empty == 0); free (empty);
// This is the main task. It starts the clients and workers, and then // routes requests between the two layers. Workers signal READY when // they start; after that we treat them as ready when they reply with // a response back to a client. The load-balancing data structure is // just a queue of next available workers.
int client_nbr; for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) { pthread_t client; pthread_create (&client, NULL, client_task, NULL); } int worker_nbr; for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) { pthread_t worker; pthread_create (&worker, NULL, worker_task, NULL); } // Here is the main loop for the least-recently-used queue. It has two // sockets; a frontend for clients and a backend for workers. It polls // the backend in all cases, and polls the frontend only when there are // one or more workers ready. This is a neat way to use 0MQ's own queues // to hold messages we're not ready to process yet. When we get a client // reply, we pop the next available worker and send the request to it, // including the originating client identity. When a worker replies, we // requeue that worker and forward the reply to the original client // using the reply envelope.
// Queue of available workers int available_workers = 0; char *worker_queue [10];
while (1) { zmq_pollitem_t items [] = { { backend, 0, ZMQ_POLLIN, 0 }, { frontend, 0, ZMQ_POLLIN, 0 } }; // Poll frontend only if we have available workers int rc = zmq_poll (items, available_workers ? 2 : 1, -1); if (rc == -1) break; // Interrupted
while (true) { // Get one address frame and empty delimiter char address [255]; int address_size = zmq_recv (worker, address, 255, 0); if (address_size == -1) break;
// Worker using REQ socket to do load-balancing // static void * worker_task (void *args) { zctx_t *ctx = zctx_new (); void *worker = zsocket_new (ctx, ZMQ_REQ); zsocket_connect (worker, "ipc://backend.ipc");
// Tell broker we're ready for work zframe_t *frame = zframe_new (WORKER_READY, 1); zframe_send (&frame, worker, 0);
// Process messages as they arrive while (true) { zmsg_t *msg = zmsg_recv (worker); if (!msg) break; // Interrupted zframe_reset (zmsg_last (msg), "OK", 2); zmsg_send (&msg, worker); } zctx_destroy (&ctx); return NULL; }
// Now we come to the main task. This has the identical functionality to // the previous lbbroker broker example, but uses CZMQ to start child // threads, to hold the list of workers, and to read and send messages:
int client_nbr; for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) zthread_new (client_task, NULL); int worker_nbr; for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) zthread_new (worker_task, NULL);
// Queue of available workers zlist_t *workers = zlist_new ();
// Here is the main loop for the load balancer. It works the same way // as the previous example, but is a lot shorter because CZMQ gives // us an API that does more with fewer calls: while (true) { zmq_pollitem_t items [] = { { backend, 0, ZMQ_POLLIN, 0 }, { frontend, 0, ZMQ_POLLIN, 0 } }; // Poll frontend only if we have available workers int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1); if (rc == -1) break; // Interrupted
// Handle worker activity on backend if (items [0].revents & ZMQ_POLLIN) { // Use worker identity for load-balancing zmsg_t *msg = zmsg_recv (backend); if (!msg) break; // Interrupted zframe_t *identity = zmsg_unwrap (msg); zlist_append (workers, identity);
// Forward message to client if it's not a READY zframe_t *frame = zmsg_first (msg); if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0) zmsg_destroy (&msg); else zmsg_send (&msg, frontend); } if (items [1].revents & ZMQ_POLLIN) { // Get client request, route to first available worker zmsg_t *msg = zmsg_recv (frontend); if (msg) { zmsg_wrap (msg, (zframe_t *) zlist_pop (workers)); zmsg_send (&msg, backend); } } } // When we're done, clean up properly while (zlist_size (workers)) { zframe_t *frame = (zframe_t *) zlist_pop (workers); zframe_destroy (&frame); } zlist_destroy (&workers); zctx_destroy (&ctx); return 0; }
// lbbroker3: Load balancing broker using zloop in C // Load-balancing broker // Demonstrates use of the CZMQ API and reactor style // // The client and worker tasks are identical from the previous example.
// Worker using REQ socket to do load-balancing // static void * worker_task (void *args) { zctx_t *ctx = zctx_new (); void *worker = zsocket_new (ctx, ZMQ_REQ); zsocket_connect (worker, "ipc://backend.ipc");
// Tell broker we're ready for work zframe_t *frame = zframe_new (WORKER_READY, 1); zframe_send (&frame, worker, 0);
// Process messages as they arrive while (true) { zmsg_t *msg = zmsg_recv (worker); if (!msg) break; // Interrupted //zframe_print (zmsg_last (msg), "Worker: "); zframe_reset (zmsg_last (msg), "OK", 2); zmsg_send (&msg, worker); } zctx_destroy (&ctx); return NULL; }
// Our load-balancer structure, passed to reactor handlers typedef struct { void *frontend; // Listen to clients void *backend; // Listen to workers zlist_t *workers; // List of ready workers } lbbroker_t;
// In the reactor design, each time a message arrives on a socket, the // reactor passes it to a handler function. We have two handlers; one // for the frontend, one for the backend:
// Handle input from client, on frontend int s_handle_frontend (zloop_t *loop, zmq_pollitem_t *poller, void *arg) { lbbroker_t *self = (lbbroker_t *) arg; zmsg_t *msg = zmsg_recv (self->frontend); if (msg) { zmsg_wrap (msg, (zframe_t *) zlist_pop (self->workers)); zmsg_send (&msg, self->backend);
// Cancel reader on frontend if we went from 1 to 0 workers if (zlist_size (self->workers) == 0) { zmq_pollitem_t poller = { self->frontend, 0, ZMQ_POLLIN }; zloop_poller_end (loop, &poller); } } return 0; }
// Handle input from worker, on backend int s_handle_backend (zloop_t *loop, zmq_pollitem_t *poller, void *arg) { // Use worker identity for load-balancing lbbroker_t *self = (lbbroker_t *) arg; zmsg_t *msg = zmsg_recv (self->backend); if (msg) { zframe_t *identity = zmsg_unwrap (msg); zlist_append (self->workers, identity);
// Enable reader on frontend if we went from 0 to 1 workers if (zlist_size (self->workers) == 1) { zmq_pollitem_t poller = { self->frontend, 0, ZMQ_POLLIN }; zloop_poller (loop, &poller, s_handle_frontend, self); } // Forward message to client if it's not a READY zframe_t *frame = zmsg_first (msg); if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0) zmsg_destroy (&msg); else zmsg_send (&msg, self->frontend); } return 0; }
// And the main task now sets up child tasks, then starts its reactor. // If you press Ctrl-C, the reactor exits and the main task shuts down. // Because the reactor is a CZMQ class, this example may not translate // into all languages equally well.
// asyncsrv: Asynchronous client/server in C // Asynchronous client-to-server (DEALER to ROUTER) // // While this example runs in a single process, that is to make // it easier to start and stop the example. Each task has its own // context and conceptually acts as a separate process.
#include "czmq.h"
// This is our client task // It connects to the server, and then sends a request once per second // It collects responses as they arrive, and it prints them out. We will // run several client tasks in parallel, each with a different random ID.
// Set random identity to make tracing easier char identity [10]; sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000)); zsocket_set_identity (client, identity); zsocket_connect (client, "tcp://localhost:5570");
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } }; int request_nbr = 0; while (true) { // Tick once per second, pulling in arriving messages int centitick; for (centitick = 0; centitick < 100; centitick++) { zmq_poll (items, 1, 10 * ZMQ_POLL_MSEC); if (items [0].revents & ZMQ_POLLIN) { zmsg_t *msg = zmsg_recv (client); zframe_print (zmsg_last (msg), identity); zmsg_destroy (&msg); } } zstr_send (client, "request #%d", ++request_nbr); } zctx_destroy (&ctx); return NULL; }
// This is our server task. // It uses the multithreaded server model to deal requests out to a pool // of workers and route replies back to clients. One worker can handle // one request at a time but one client can talk to multiple workers at // once.
// Backend socket talks to workers over inproc void *backend = zsocket_new (ctx, ZMQ_DEALER); zsocket_bind (backend, "inproc://backend");
// Launch pool of worker threads, precise number is not critical int thread_nbr; for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) zthread_fork (ctx, server_worker, NULL);
// Connect backend to frontend via a proxy zmq_proxy (frontend, backend, NULL);
zctx_destroy (&ctx); return NULL; }
// Each worker task works on one request at a time and sends a random number // of replies back, with random delays between replies:
注意我们在client和server之间用的是DEALER to ROUTER会话,但在server主线程和worker之间我们用的是DEALER to DEALER。如果worker是严格的同步的,我们会选用REP。然而,由于需要发送多个回复,我们就需要用一个异步的socket。我们不想路由回复,它们总是回复到发送给我们请求的一个单独的server线程中。
这给了我们在两个broker都简单的逻辑和一个合理的机制:有(译注:此处原文为when there are no workers,应该是错的)worker的时候,就告诉其他broker自己”准备就绪”,然后从其他broker那里接收一个任务。但问题也是它太简单了。一个联邦制的broker只能一次处理一个任务。如果broker模拟成一个锁同步(lock-step)的client和worker,它当然就是锁同步的了,如果有很多空闲的worker也不会被用到(译注:因为锁同步的时候只有一次只能处理一个任务)。我们的broker需要全异步的连接。
//peering1: Prototype state flow in C // Broker peering simulation (part 1) // Prototypes the state flow
#include "czmq.h"
int main (int argc, char *argv []) { // First argument is this broker's name // Other arguments are our peers' names // if (argc < 2) { printf ("syntax: peering1 me {you}…\n"); return 0; } char *self = argv [1]; printf ("I: preparing broker at %s…\n", self); srandom ((unsigned) time (NULL));
zctx_t *ctx = zctx_new ();
// Bind state backend to endpoint void *statebe = zsocket_new (ctx, ZMQ_PUB); zsocket_bind (statebe, "ipc://%s-state.ipc", self);
// Connect statefe to all peers void *statefe = zsocket_new (ctx, ZMQ_SUB); zsocket_set_subscribe (statefe, ""); int argn; for (argn = 2; argn < argc; argn++) { char *peer = argv [argn]; printf ("I: connecting to state backend at '%s'\n", peer); zsocket_connect (statefe, "ipc://%s-state.ipc", peer); } // The main loop sends out status messages to peers, and collects // status messages back from peers. The zmq_poll timeout defines // our own heartbeat:
while (true) { // Poll for activity, or 1 second timeout zmq_pollitem_t items [] = { { statefe, 0, ZMQ_POLLIN, 0 } }; int rc = zmq_poll (items, 1, 1000 * ZMQ_POLL_MSEC); if (rc == -1) break; // Interrupted
peering1 DC1 DC2 DC3 # Start DC1 and connect to DC2 and DC3 peering1 DC2 DC1 DC3 # Start DC2 and connect to DC1 and DC3 peering1 DC3 DC1 DC2 # Start DC3 and connect to DC1 and DC2
// The main task begins by setting-up its frontend and backend sockets // and then starting its client and worker tasks:
int main (int argc, char *argv []) { // First argument is this broker's name // Other arguments are our peers' names // if (argc < 2) { printf ("syntax: peering2 me {you}…\n"); return 0; } self = argv [1]; printf ("I: preparing broker at %s…\n", self); srandom ((unsigned) time (NULL));
// Connect cloud backend to all peers void *cloudbe = zsocket_new (ctx, ZMQ_ROUTER); zsocket_set_identity (cloudbe, self); int argn; for (argn = 2; argn < argc; argn++) { char *peer = argv [argn]; printf ("I: connecting to cloud frontend at '%s'\n", peer); zsocket_connect (cloudbe, "ipc://%s-cloud.ipc", peer); } // Prepare local frontend and backend void *localfe = zsocket_new (ctx, ZMQ_ROUTER); zsocket_bind (localfe, "ipc://%s-localfe.ipc", self); void *localbe = zsocket_new (ctx, ZMQ_ROUTER); zsocket_bind (localbe, "ipc://%s-localbe.ipc", self);
// Get user to tell us when we can start… printf ("Press Enter when all brokers are started: "); getchar ();
// Start local workers int worker_nbr; for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) zthread_new (worker_task, NULL);
// Start local clients int client_nbr; for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) zthread_new (client_task, NULL);
// Here, we handle the request-reply flow. We're using load-balancing // to poll workers at all times, and clients only when there are one // // or more workers available.//
// Least recently used queue of available workers int capacity = 0; zlist_t *workers = zlist_new ();
while (true) { // First, route any waiting replies from workers zmq_pollitem_t backends [] = { { localbe, 0, ZMQ_POLLIN, 0 }, { cloudbe, 0, ZMQ_POLLIN, 0 } }; // If we have no workers, wait indefinitely int rc = zmq_poll (backends, 2, capacity? 1000 * ZMQ_POLL_MSEC: -1); if (rc == -1) break; // Interrupted
// Handle reply from local worker zmsg_t *msg = NULL; if (backends [0].revents & ZMQ_POLLIN) { msg = zmsg_recv (localbe); if (!msg) break; // Interrupted zframe_t *identity = zmsg_unwrap (msg); zlist_append (workers, identity); capacity++;
// If it's READY, don't route the message any further zframe_t *frame = zmsg_first (msg); if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0) zmsg_destroy (&msg); } // Or handle reply from peer broker else if (backends [1].revents & ZMQ_POLLIN) { msg = zmsg_recv (cloudbe); if (!msg) break; // Interrupted // We don't use peer broker identity for anything zframe_t *identity = zmsg_unwrap (msg); zframe_destroy (&identity); } // Route reply to cloud if it's addressed to a broker for (argn = 2; msg && argn < argc; argn++) { char *data = (char *) zframe_data (zmsg_first (msg)); size_t size = zframe_size (zmsg_first (msg)); if (size == strlen (argv [argn]) && memcmp (data, argv [argn], size) == 0) zmsg_send (&msg, cloudfe); } // Route reply to client if we still need to if (msg) zmsg_send (&msg, localfe);
// Now we route as many client requests as we have worker capacity // for. We may reroute requests from our local frontend, but not from // // the cloud frontend. We reroute randomly now, just to test things // out. In the next version, we'll do this properly by calculating // cloud capacity://
while (capacity) { zmq_pollitem_t frontends [] = { { localfe, 0, ZMQ_POLLIN, 0 }, { cloudfe, 0, ZMQ_POLLIN, 0 } }; rc = zmq_poll (frontends, 2, 0); assert (rc >= 0); int reroutable = 0; // We'll do peer brokers first, to prevent starvation if (frontends [1].revents & ZMQ_POLLIN) { msg = zmsg_recv (cloudfe); reroutable = 0; } else if (frontends [0].revents & ZMQ_POLLIN) { msg = zmsg_recv (localfe); reroutable = 1; } else break; // No work, go back to backends
// If reroutable, send to cloud 20% of the time // Here we'd normally use cloud status information // if (reroutable && argc > 2 && randof (5) == 0) { // Route to random broker peer int peer = randof (argc - 2) + 2; zmsg_pushmem (msg, argv [peer], strlen (argv [peer])); zmsg_send (&msg, cloudbe); } else { zframe_t *frame = (zframe_t *) zlist_pop (workers); zmsg_wrap (msg, frame); zmsg_send (&msg, localbe); capacity--; } } } // When we're done, clean up properly while (zlist_size (workers)) { zframe_t *frame = (zframe_t *) zlist_pop (workers); zframe_destroy (&frame); } zlist_destroy (&workers); zctx_destroy (&ctx); return EXIT_SUCCESS; }
// Our own name; in practice, this would be configured per node static char *self;
// This is the client task. It issues a burst of requests and then // sleeps for a few seconds. This simulates sporadic activity; when // a number of clients are active at once, the local workers should // be overloaded. The client uses a REQ socket for requests and also // pushes statistics to the monitor socket:
while (true) { sleep (randof (5)); int burst = randof (15); while (burst--) { char task_id [5]; sprintf (task_id, "%04X", randof (0x10000));
// Send request with random hex ID zstr_send (client, task_id);
// Wait max ten seconds for a reply, then complain zmq_pollitem_t pollset [1] = { { client, 0, ZMQ_POLLIN, 0 } }; int rc = zmq_poll (pollset, 1, 10 * 1000 * ZMQ_POLL_MSEC); if (rc == -1) break; // Interrupted
if (pollset [0].revents & ZMQ_POLLIN) { char *reply = zstr_recv (client); if (!reply) break; // Interrupted // Worker is supposed to answer us with our task id assert (streq (reply, task_id)); zstr_send (monitor, "%s", reply); free (reply); } else { zstr_send (monitor, "E: CLIENT EXIT - lost task %s", task_id); return NULL; } } } zctx_destroy (&ctx); return NULL; }
// This is the worker task, which uses a REQ socket to plug into the // load-balancer. It's the same stub worker task that you've seen in // other examples:
// Tell broker we're ready for work zframe_t *frame = zframe_new (WORKER_READY, 1); zframe_send (&frame, worker, 0);
// Process messages as they arrive while (true) { zmsg_t *msg = zmsg_recv (worker); if (!msg) break; // Interrupted
// Workers are busy for 0/1 seconds sleep (randof (2)); zmsg_send (&msg, worker); } zctx_destroy (&ctx); return NULL; }
// The main task begins by setting up all its sockets. The local frontend // talks to clients, and our local backend talks to workers. The cloud // frontend talks to peer brokers as if they were clients, and the cloud // backend talks to peer brokers as if they were workers. The state // backend publishes regular state messages, and the state frontend // subscribes to all state backends to collect these messages. Finally, // we use a PULL monitor socket to collect printable messages from tasks:
int main (int argc, char *argv []) { // First argument is this broker's name // Other arguments are our peers' names if (argc < 2) { printf ("syntax: peering3 me {you}…\n"); return 0; } self = argv [1]; printf ("I: preparing broker at %s…\n", self); srandom ((unsigned) time (NULL));
// Connect cloud backend to all peers void *cloudbe = zsocket_new (ctx, ZMQ_ROUTER); zsocket_set_identity (cloudbe, self); int argn; for (argn = 2; argn < argc; argn++) { char *peer = argv [argn]; printf ("I: connecting to cloud frontend at '%s'\n", peer); zsocket_connect (cloudbe, "ipc://%s-cloud.ipc", peer); } // Bind state backend to endpoint void *statebe = zsocket_new (ctx, ZMQ_PUB); zsocket_bind (statebe, "ipc://%s-state.ipc", self);
// Connect state frontend to all peers void *statefe = zsocket_new (ctx, ZMQ_SUB); zsocket_set_subscribe (statefe, ""); for (argn = 2; argn < argc; argn++) { char *peer = argv [argn]; printf ("I: connecting to state backend at '%s'\n", peer); zsocket_connect (statefe, "ipc://%s-state.ipc", peer); } // Prepare monitor socket void *monitor = zsocket_new (ctx, ZMQ_PULL); zsocket_bind (monitor, "ipc://%s-monitor.ipc", self);
// After binding and connecting all our sockets, we start our child // tasks - workers and clients:
int worker_nbr; for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) zthread_new (worker_task, NULL);
// Start local clients int client_nbr; for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) zthread_new (client_task, NULL);
// Queue of available workers int local_capacity = 0; int cloud_capacity = 0; zlist_t *workers = zlist_new ();
// The main loop has two parts. First, we poll workers and our two service // sockets (statefe and monitor), in any case. If we have no ready workers, // then there's no point in looking at incoming requests. These can remain // // on their internal 0MQ queues://
while (true) { zmq_pollitem_t primary [] = { { localbe, 0, ZMQ_POLLIN, 0 }, { cloudbe, 0, ZMQ_POLLIN, 0 }, { statefe, 0, ZMQ_POLLIN, 0 }, { monitor, 0, ZMQ_POLLIN, 0 } }; // If we have no workers ready, wait indefinitely int rc = zmq_poll (primary, 4, local_capacity? 1000 * ZMQ_POLL_MSEC: -1); if (rc == -1) break; // Interrupted
// Track if capacity changes during this iteration int previous = local_capacity; zmsg_t *msg = NULL; // Reply from local worker
// If it's READY, don't route the message any further zframe_t *frame = zmsg_first (msg); if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0) zmsg_destroy (&msg); } // Or handle reply from peer broker else if (primary [1].revents & ZMQ_POLLIN) { msg = zmsg_recv (cloudbe); if (!msg) break; // Interrupted // We don't use peer broker identity for anything zframe_t *identity = zmsg_unwrap (msg); zframe_destroy (&identity); } // Route reply to cloud if it's addressed to a broker for (argn = 2; msg && argn < argc; argn++) { char *data = (char *) zframe_data (zmsg_first (msg)); size_t size = zframe_size (zmsg_first (msg)); if (size == strlen (argv [argn]) && memcmp (data, argv [argn], size) == 0) zmsg_send (&msg, cloudfe); } // Route reply to client if we still need to if (msg) zmsg_send (&msg, localfe);
// If we have input messages on our statefe or monitor sockets, we // can process these immediately:
if (primary [2].revents & ZMQ_POLLIN) { char *peer = zstr_recv (statefe); char *status = zstr_recv (statefe); cloud_capacity = atoi (status); free (peer); free (status); } if (primary [3].revents & ZMQ_POLLIN) { char *status = zstr_recv (monitor); printf ("%s\n", status); free (status); } // Now route as many clients requests as we can handle. If we have // local capacity, we poll both localfe and cloudfe. If we have cloud // capacity only, we poll just localfe. We route any request locally // if we can, else we route to the cloud.
if (secondary [0].revents & ZMQ_POLLIN) msg = zmsg_recv (localfe); else if (secondary [1].revents & ZMQ_POLLIN) msg = zmsg_recv (cloudfe); else break; // No work, go back to primary
if (local_capacity) { zframe_t *frame = (zframe_t *) zlist_pop (workers); zmsg_wrap (msg, frame); zmsg_send (&msg, localbe); local_capacity--; } else { // Route to random broker peer int peer = randof (argc - 2) + 2; zmsg_pushmem (msg, argv [peer], strlen (argv [peer])); zmsg_send (&msg, cloudbe); } } // We broadcast capacity messages to other peers; to reduce chatter, // we do this only if our capacity changed.
if (local_capacity != previous) { // We stick our own identity onto the envelope zstr_sendm (statebe, self); // Broadcast new capacity zstr_send (statebe, "%d", local_capacity); } } // When we're done, clean up properly while (zlist_size (workers)) { zframe_t *frame = (zframe_t *) zlist_pop (workers); zframe_destroy (&frame); } zlist_destroy (&workers); zctx_destroy (&ctx); return EXIT_SUCCESS; }