ZeroMQ是一个高性能、跨语言网络编程框架,许多人称其命名为MQ并不准确,因为ZeroMQ已经超越消息队列范畴,完全是一种高抽象的Socket,原生使用C语言进行撰写,随即涵盖到基本几十种的开发语言中,当然包括C++/Java/Go/Python/Rust等。

ZeroMQ在2007年由比利时软件设计师皮特·亨特金斯(Pieter Hintjens)及其公司团队开发,旨在提高消息中间件的易用性性能,也由于此ZeroMQ推出几年间逐渐受到社区关注,在许多大型交易系统得到应用。2016年,身患癌症的亨特金斯在比利时接受安乐死,但他的作品仍然活跃以及被持续维护在十年后的今天,致敬。

ZeroMQ的设计思想十分精巧,虽然从今天视角来看许多艺术已经是非常惯用的设计,这些在英文文档Zguide中展示得详尽且淋漓尽致,详见:https://zguide.zeromq.org,暂未找到近年的中文翻译版本。

本文以Zguide示例为蓝本,完成了C++基本示例的阐述,仅集中于前五章基本设计,更多架构设计哲学参考原文,原文示例仓库来自:https://github.com/imatix/zguide.git,对于本文笔记,所有依赖代码已经呈现,克隆是非必要的,虽然ZeroMQ是跨平台的,非必要不建议在windows环境复现 代码(对部分通信模式支持较差)。

本文将涉及:

  1. ZeroMQ三种基本通信方式的入门,包括请求应答发布订阅推挽模式

  2. 高级请求应答模式,这也是框架的难点:Router、Dealer的引入、负载均衡、公平队列,已经如何实现可靠的请求应答通信;

  3. 高级的发布-订阅:基本的xpub/xsub特性。当然从原文这部分可以看到,可靠性/消息持久化不是ZeroMQ设计目标,因此此部分后存在许多可靠性实现的讨论。

请求-应答模式

zmq最简单的通信模式是请求-响应模式,这个模式对服务器和客户端有强制的通信顺序要求,对服务器,必须收到请求,再执行发送代码;对客户端,必须先发送请求,再接收响应,不允许客户端忽略请求直接接收响应或者发送两次请求,也不允许服务器未接收就发送,或者发送两次等,单次通信流程响应和请求必然是一对一的,这个流程再内部由状态机控制,违法规则的通信可能会抛出异常:

1
2
terminate called after throwing an instance of 'zmq::error_t'
what(): Operation cannot be accomplished in current state

基础接口

和之前接触的差异不大:

  • zmq::context_t context(threadNum):threadNum代表线程数量,context是一个负责socket线程调度的对象,官方建议同一个进程下只实例一个context,多个socket来共享这个context;

  • zmq::socket_typesocket类型,此处使用的是请求-应答模型,还有后续的发布订阅模型,或者适用于分布式的推拉模型等;

  • zmq::message_t:zmq的收发消息都被封装成message_t,具有获取指针、大小等接口,但是message_t可能是非文本数据,打印不一定完全准确;

  • zmq::send_flags/zmq::recv_flags发送和接收模式,最常用是默认发送/接收zmq::send_flags::nonezmq::recv_flags::none,这是一种阻塞发送和接收,当发送队列和接收队列满了,会阻塞在函数中;另一种非阻塞模式zmq::send_flags::dontwaitzmq::recv_flags::dontwait,当队列满时不会阻塞,而是直接返回错误;发送模式还支持分段发送zmq::send_flags::sndmore,表示该次发送完,后续还有数据,接收端使用message_t.more()接口判断是否sndmore而来以作处理,此外,在ZeroMQ通信时序中非常重要

完整请求-响应通信示例:

zmq的请求-响应模式有强大的功能:其一其可以同时接收成千上万个的客户端请求并依次处理;其二启动时,服务器可以在客户端启动之后才启动;暂时没有覆盖的一个缺点服务器崩溃后重新启动后,对客户端的响应不会恢复,因为恢复现场总是一个复杂的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//server.cpp
#include <iostream>
#include <zmq.hpp>
#ifndef _WIN32
#include <unistd.h> //linux
#else
#include <windows.h> //windows
#endif
#include <thread>

using namespace std;

int main(){
static const int threadNum = 2;
zmq::context_t context(threadNum);
zmq::socket_t socket(context, zmq::socket_type::rep); //应答者socket
socket.bind("tcp://*:5555");
while(true){
zmq::message_t requestMsg;

auto result = socket.recv(requestMsg, zmq::recv_flags::none);
assert(result.value_or(0) != 0);
//只有文本数据才可以尝试这样做:
std::string_view str{reinterpret_cast<const char*>(requestMsg.data()),requestMsg.size()};
cout << "Receive Msg:" <<str <<endl;

std::this_thread::sleep_for(std::chrono::seconds(2)); //模拟服务器响应时间

constexpr std::string_view replyWord = "Hello Client";
zmq::message_t reply(replyWord.length());
memcpy(reply.data(),replyWord.data(), replyWord.length());
socket.send(reply, zmq::send_flags::none);
}
return 0;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//client.cpp
#include <iostream>
#include <zmq.hpp>

using namespace std;
int main(){
zmq::context_t context(1);
zmq::socket_t socket(context, zmq::socket_type::req); //请求者socket
socket.connect("tcp://localhost:5555");
for(int i=0; i<10; i++){
std::string_view str("I want Hello");
zmq::message_t request(str.length());
memcpy(request.data(), str.data(), str.length());
cout << i <<"begin send request:" << str <<endl;
socket.send(request, zmq::send_flags::none);

zmq::message_t reply;
socket.recv(reply, zmq::recv_flags::none);
//只有文本数据才可以尝试这样做:
std::string_view replystr(reinterpret_cast<const char*>(reply.data()), reply.size());
cout << i <<"Got reply:" << replystr << endl;
}
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
cmake_minimum_required(VERSION 2.8)
project(zmqTest)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
find_package(cppzmq REQUIRED)
#find_package(Boost REQUIRED COMPONENTS filesystem log_setup log json program_options serialization thread)
#find_package(OpenCV REQUIRED)

set(EXECUTABLE_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/bin)

add_executable(server)
target_sources(server PRIVATE server.cpp)
target_link_libraries(server PRIVATE
# ${Boost_LIBRARIES}
# ${OpenCV_LIBRARIES}
cppzmq
)

add_executable(client)
target_sources(client PRIVATE client.cpp)
target_link_libraries(client PRIVATE
# ${Boost_LIBRARIES}
# ${OpenCV_LIBRARIES}
cppzmq
)

发布-订阅模式

发布端就像广播,无休止地向外广播消息,订阅端订阅相应的字符信息,也可以使用zmq::sockopt::unsubscribe取消订阅,注意一些特点:

  1. 发布订阅端并不强制谁来connect,谁来bind,但发布端pub不允许使用zmq.recv接收消息接收端sub也不允许发送消息,但有例外,例如xsub可以通过proxy转发消息、xpub可以接收订阅信号等,见高级发布订阅模式。

  2. 订阅端不确定何时开始接收消息,尽管先启动订阅端,再启动发布端,订阅端仍可能错过发布端第一条信息;更极端情况,启动订阅端,然后启动发布端发送1000条信息后退出,订阅端也有可能错过这些信息:因为发布订阅模式基于异步的IO,订阅端启动可能需要5ms来完成TCP连接,但发布端只需要1ms发送1000条信息,因此后续将讨论如何同步发布和订阅端行为。

  3. 发布端可以多次发布不同信息,订阅端也可以订阅多个topic,但注意,publisher.send(msg, zmq::send_flags::none)后,zmq::message_t指针的控制权已经被转移到send内部,因此不能再直接使用和操作这个msg指针,所以只能新建一个msg对象或者rebuild这个zmq::message_t对象

简单的发布订阅模式如下,这是一个关于天气信息发布的模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//server.cpp
#include <iostream>
#include <zmq.hpp>
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>
#endif
#include <thread>

using namespace std;

int main(){
zmq::context_t context(1);
zmq::socket_t publisher(context, zmq::socket_type::pub);
publisher.bind("tcp://*:5555");

srand((unsigned)time(NULL));
while(true){
#define within(num) (unsigned int)((float)num*random()/(RAND_MAX+1.0))
const size_t msgSize = 100;
int zipcode = 10001; //within(100000);
int temperature = within(215)- 80;
int humidity = within(50) + 10;
zmq::message_t msg(msgSize);
snprintf(reinterpret_cast<char*>(msg.data()), msgSize, "%05d temperature=%02d, humidity=%02d",\
zipcode, temperature, humidity);
// cout << std::string_view(reinterpret_cast<const char*>(msg.data()), msg.size()) << endl;
publisher.send(msg, zmq::send_flags::none);

zmq::message_t msg1(msgSize);
memset(reinterpret_cast<char*>(msg1.data()), 0, msgSize);
int n = snprintf(reinterpret_cast<char*>(msg1.data()), msgSize, "%05d second_send", zipcode+1);
// cout << std::string_view(reinterpret_cast<const char*>(msg1.data()), n) << endl;
publisher.send(msg1, zmq::send_flags::none);
}
return 0;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//client.cpp
#include <iostream>
#include <zmq.hpp>
#include <thread>

using namespace std;
int main(int argc, char* argv[]){
zmq::context_t context(1);
zmq::socket_t subscriber(context, zmq::socket_type::sub);
subscriber.connect("tcp://localhost:5555");

//仅接收10001 、10002开头的信息
const char* topic = (argc > 1) ? argv[1] : "10001 ";
subscriber.set(zmq::sockopt::subscribe, topic);
subscriber.set(zmq::sockopt::subscribe, "10002 ");

while(true){
zmq::message_t recvMsg;
subscriber.recv(recvMsg, zmq::recv_flags::none);
std::string_view msg(reinterpret_cast<const char*>(recvMsg.data()), recvMsg.size());
std::this_thread::sleep_for(std::chrono::seconds(1));
cout << "Got Subscribe Msg : " << msg <<endl;
if(cnt >10){
subscriber.set(zmq::sockopt::unsubscribe, topic);
}
}
return 0;
}

cmake同前。

推挽模式

以下例子由呼吸机(ventilator)工人(worker)水槽(sink)组成:呼吸机向工人发送时间信息,工人根据信息睡眠一定时间,完成睡眠向水槽发送信号;同步机制来自呼吸机的getchar信号,工人客户端全部准备完毕,呼吸机才开始向工人发送时间信号,并且发送信号到水槽统计结果:当一个工人客户端时,耗时约8s,当客户端数增多,水槽统计耗时也相应减少,而且每个客户端打印的数字基本上是任务数的平均

所以说,呼吸机均匀地将任务分布到PULL端,每个PULL端均匀执行,称负载均衡(load balancing);水槽同时会均匀地接收来自每个客户端的结果信息(而不是单独处理某一个),称公平队列(fair-queuing)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//ventilator.cpp
#include <iostream>
#include <zmq.hpp>
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>
#endif
#include <thread>

using namespace std;

int main(){
zmq::context_t context(1);
zmq::socket_t sender(context, zmq::socket_type::push);
sender.bind("tcp://*:5557");

zmq::socket_t sender5558(context, zmq::socket_type::push);
sender5558.connect("tcp://localhost:5558");
zmq::message_t msg(2);
memcpy(reinterpret_cast<char*>(msg.data()), "0", 1);
sender5558.send(msg, zmq::send_flags::none);

int task_num = 100;
int total_time = 0;
srandom((unsigned)time(NULL));
#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
std::cout << "Press Enter when the workers are ready: " << std::endl;

getchar(); //同步开始信号,非常重要

for(int i=0; i<task_num; i++){
int randomTime = within(100) + 1;
msg.rebuild(10); //复用msg,新分配空间10的指针
memset(reinterpret_cast<char*>(msg.data()), '\0', 10);
snprintf(reinterpret_cast<char*>(msg.data()),10,"%d", randomTime);
sender.send(msg, zmq::send_flags::none);
total_time += randomTime;
}
cout << "total_time:" << total_time <<endl; //统计发送时间
return 0;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//worker.cpp
#include <iostream>
#include <zmq.hpp>
#include <thread>
#include <assert.h>

using namespace std;
int main(){
zmq::context_t context(1);
zmq::socket_t receiver(context, zmq::socket_type::pull);
receiver.connect("tcp://localhost:5557");

zmq::socket_t sender(context, zmq::socket_type::push);
sender.connect("tcp://localhost:5558");
int cnt = 0;
while(true){
zmq::message_t msg;
auto result = receiver.recv(msg, zmq::recv_flags::none);
assert(result.value_or(0) != 0);
std::string str(reinterpret_cast<const char*>(msg.data()), msg.size());

try{
std::this_thread::sleep_for(std::chrono::milliseconds(stoi(str))); //执行睡眠任务
}
catch(std::exception& e){
cout <<e.what()<<endl;
return -1;
}

msg.rebuild();
sender.send(msg, zmq::send_flags::none);
cout << cnt <<endl;
cnt++;
}
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//sink.cpp
#include <iostream>
#include <zmq.hpp>
#include <chrono>
using namespace std;

int main(){
zmq::context_t context;
zmq::socket_t sink(context, zmq::socket_type::pull);
sink.bind("tcp://*:5558");

zmq::message_t msg;
sink.recv(msg, zmq::recv_flags::none);

auto startTime = std::chrono::steady_clock::now();

int task_num = 100;
int cnt = 0;
for(int i=0; i<task_num; i++){
msg.rebuild();
sink.recv(msg, zmq::recv_flags::none);
cout << cnt <<endl;
cnt++;
}

auto endTime = std::chrono::steady_clock::now();

int64_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(endTime-startTime).count(); //统计任务完成时间
cout << "Total time:" << duration <<endl;
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
cmake_minimum_required(VERSION 2.8)
project(zmqTest)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
find_package(cppzmq REQUIRED)

set(EXECUTABLE_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/bin)

add_executable(ventilator)
target_sources(ventilator PRIVATE ventilator.cpp)
target_link_libraries(ventilator PRIVATE
cppzmq
)

add_executable(worker)
target_sources(worker PRIVATE worker.cpp)
target_link_libraries(worker PRIVATE
cppzmq
)

add_executable(sink)
target_sources(sink PRIVATE sink.cpp)
target_link_libraries(sink PRIVATE
cppzmq
)

扩展的请求-应答模式

简单的请求-应答模式只有两种同步socket,分别是ReqRep,当一个Req连接了多个Rep,会按照公平队列的方式逐个发送请求-等待回应;反之如果一个Rep连接了多个Req也是同理,那么Rep会按公平队列的方式读取请求,并且回复到对应的请求端。

扩展的请求-应答模式中间插入了代理,完整的链路表示为Req-Router-Dealer-Rep,其中router是一个异步socket,可以识别身份的数据帧,在最简单的Req-Rep模式中,数据帧仅包括空白定界符和数据帧,而router模式在此二者的基础上在首部加上了身份数据帧,该数据帧唯一地标识了每一个连接,router内部会维护一个哈希表以记录Req和Rep的连接关系,这样就可以做到每次收到消息时,得知该连接下有哪些socket对象;

对于Dealer,其行为类似推挽模式,它在连接之间均衡地发送和接收信息,其也是异步的,而且一个链路上Router和Dealer的数量可以任意多,但Dealer是一个无身份socket类型,其发送行为甚至不会插入空白定界符,所以在使用Dealer时需要小心处理信息封装和发送(Handle your envelope)。

特别说明的是,Req-Router-Dealer-Rep链路只是说明几个socket的基本组合关系,并不是说一定要按这种连接,只要记住Router是为了Req服务的,而Dealer是为了Rep服务的,Req-Router-ReqReq-Router-Dealer等组合都是允许的(下文用到),但是绝对没有Req-DealerRouter-Rep这种连接组合。

总的而言,扩展请求应答模式至少完成了:

  1. 异步服务器和异步客户端的实现:当Req需求发送到Router而不是RepRouter如同一个异步的服务器,能够并行处理多个Req请求;同理当Rep响应发送给DealerDealer就相当于一个异步的客户端,能够接收任意数量的响应信息;当链路就是完全按照Req-Router-Dealer-Rep连接,那么就是双方都可进行任意数量的请求或响应,而不是阻塞的一对一模式。

  2. Dealer to Dealer或者Router to Router如同一个异步的服务器(本文基本不涉及):这两种通信也是允许的,Dealer to Dealer是无身份的通信(或者自行管理envelope,ZeroMQ通信的envelope自动剥除),相当于两个异步消息队列,只关心什么时候收发,并不关心收发对象;Router to Router是一种多跳路由,是最复杂的组合,您应该避免使用它,除非你足够精通ZeroMQ;

Router身份验证

以下对比了一个匿名的req和通过identified.set(zmq::sockopt::routing_id, "PEER2")设置了身份的req连接到同一个Router的行为,通过打印结果,可知router会为匿名的req随机分配一个5字节大小的二进制身份

注意代码中Router的技术细节:

  1. Router接收消息默认就是多帧的,尽管我们使用了send而不是sendmore,Router一般接收三帧信息,分别是身份帧、空白定界帧和数据帧,因此两次打印结果是:

    1
    2
    3
    4
    5
    6
    7
    8
    ----------------------------------------
    [005]006b8b4567
    [000]
    [025]anonymous socket uses random identity
    ----------------------------------------
    [005]PEER2
    [000]
    [01c]identified socket uses PEER2
    对于多帧打印,打印函数s_dump的more条件不能缺少,否则会一直阻塞;

  2. anonymousidentified都是req端,它们在代码中只向Router发送了信息,Router并没有回应也没有调用req的recv,因此它们不能再调用send函数发送新请求;

  3. socket_type中,能连接到路由的只能是Req、Dealer或者另一个Router(而Rep、push/pull等均禁止,意味着这些对象没必要也不能使用zmq::sockopt::routing_id设置身份),此处以Req作例子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    #include <iostream>
    #include <zmq.hpp>

    using namespace std;

    //ZeroMQ工具函数:打印socket收到的信息
    inline void s_dump(zmq::socket_t& socket){
    std::cout << "----------------------------------------" << std::endl;

    while (1) {
    // Process all parts of the message
    zmq::message_t message;
    zmq::recv_result_t rc = socket.recv(message);
    if (!rc) {
    std::runtime_error("recv error");
    }

    // Dump the message as text or binary
    size_t size = message.size();
    std::string data(static_cast<char*>(message.data()), size);

    bool is_text = true;

    size_t char_nbr;
    unsigned char byte;
    for (char_nbr = 0; char_nbr < size; char_nbr++) {
    byte = data [char_nbr];
    if (byte < 32 || byte > 127) //非ASCII码字符
    is_text = false;
    }
    std::cout << "[" << std::setfill('0') << std::setw(3) << size << "]";
    for (char_nbr = 0; char_nbr < size; char_nbr++) {
    if (is_text)
    std::cout << (char)data [char_nbr];
    else
    std::cout << std::setfill('0') << std::setw(2)
    << std::hex << static_cast<int>(static_cast<unsigned char>(data[char_nbr]));
    }
    std::cout << std::endl;

    int more = 0; // Multipart detection
    more = socket.get(zmq::sockopt::rcvmore);
    if (!more)
    break; // Last message part
    }
    }

    int main(){
    zmq::context_t context(1);
    zmq::socket_t sink(context, zmq::socket_type::router);
    sink.bind("inproc://example");

    zmq::socket_t anonymous(context, zmq::socket_type::req);
    anonymous.connect("inproc://example");
    std::string_view msg("anonymous socket uses random identity");
    zmq::message_t zmsg(msg.size());
    memcpy(zmsg.data(), msg.data(), msg.size());
    anonymous.send(zmsg, zmq::send_flags::none);
    s_dump(sink);

    zmq::socket_t identified(context, zmq::socket_type::req);
    identified.set(zmq::sockopt::routing_id, "PEER2");
    identified.connect("inproc://example");
    std::string_view msg1("identified socket uses PEER2");
    zmsg.rebuild(msg1.size());
    memcpy(zmsg.data(), msg1.data(), msg1.size());
    identified.send(zmsg, zmq::send_flags::none);
    s_dump(sink);

    return 0;
    }

Router的负载均衡

Req To Router

Router负载均衡体现在以下例子:例子创建了线程数量为10的线程池,执行随机的睡眠任务,执行的起始信号来自Router,一般而言如果想要每个线程的任务负载是相对平均的,需要满足两个条件

1. 每个线程收到起始信号的时机不应该相差太远;

2. 每个线程任务耗时不应该相差太远。

所谓负载均衡,就是在不满足这两个条件的情况下,Router仍然能保证每个线程的任务负载相对平均,你会发现条件1和条件2是互相调节的,当你的任务耗时较长,你这个线程下一个起始信号时机就会延后,换言之相同时间你要处理的任务数量更少,这种负载均衡如同邮局排队方案:当邮局一到两个窗口处理速度很慢其他窗口仍然以先到先得的原则处理正在排队的任务。

所以代码中,线程任务启动前会以请求的方式Router发起申请以表明自己是空闲的,Router从请求中获取到Req身份并且启动对应线程任务,通过这种方式做到了负载均衡。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include <iostream>
#include <zmq.hpp>
#include <thread>

using namespace std;

#define within(num) (int)((float)num*random()/(RAND_MAX+1.0))

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

bool s_send(zmq::socket_t& socket, const std::string& msg, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(msg.size());
memcpy(zmsg.data(), msg.data(), msg.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

void workerThread(void* arg){
zmq::context_t context(1);
zmq::socket_t worker(context, zmq::socket_type::req);
int ptr = intptr_t(arg);
stringstream ss;
ss << std::hex << std::uppercase //最短4位、全大写输出
<< std::setw(4) << std::setfill('0') << ptr;
worker.set(zmq::sockopt::routing_id, ss.str());
worker.connect("ipc://routing.ipc");
int total = 0;
while(true){
s_send(worker, std::string("Hi Boss"));

std::string rmsg = s_recv(worker);
if(rmsg == "Fired"){
cout<< "Process Tasks:" << total <<endl;
break;
}
total++;

//模拟随机任务
std::this_thread::sleep_for(std::chrono::milliseconds(within(500)+1));
}
}

int main(){
zmq::context_t context(1);
zmq::socket_t router(context, zmq::socket_type::router);
router.bind("ipc://routing.ipc");
vector<std::thread> threadPool;
const int threadNum = 10;
for(int i=0; i<threadNum; i++){
threadPool.push_back(std::thread(workerThread, (void*)i));
}
auto startTime = std::chrono::steady_clock::now();
int fireCnt = 0;
while(true){
string identity = s_recv(router); //身份帧
s_recv(router); //空间隔帧
s_recv(router); //数据帧

//路由分发需要指定Req身份,以便Router向特定Req发送信号
s_send(router,identity, zmq::send_flags::sndmore);
s_send(router,string(""), zmq::send_flags::sndmore);
auto endTime = std::chrono::steady_clock::now();
int64_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(endTime-startTime).count();
if(duration<5000){ //五秒后路由停止分配工作
s_send(router, string("Work Hard"));
}
else{
s_send(router, string("Fired"));
if(++fireCnt == threadNum)
break;
}
}

for(auto& pos : threadPool){
pos.join();
}
return 0;
}

Router To Dealer

任何使用Req的地方都可以换成dealer,但要注意两个主要区别

  1. ReqRep都会在发送数据帧前,自动插入一个空白分界帧,其作用是Rep接收来自Req请求时,会以该帧为界限分割数据,交付到应用层,反之亦然;而Dealer不会自动插入,因此如果消息发送到Rep,必须手动插入(当然如果网络中是纯Dealer,则只需要在一前一后插入即可);

  2. Dealer是异步的,而Req是同步的,Req在接收recv调用前,不能再使用send发送数据,虽然这里同步异步行为对负载均衡示例没有影响。

Dealer的负载均衡只需要基于上面代码在workerThread函数做出三个改变即可:

1
2
3
4
5
6
7
zmq::socket_t worker(context, zmq::socket_type::dealer); //修改称dealer

s_send(worker, std::string(""), zmq::send_flags::sndmore); //发送前补充sndmore补充空分界符
s_send(worker, std::string("Hi Boss"));

s_recv(worker); //接收时吞掉分界符
std::string rmsg = s_recv(worker);

完整的Req-Router-Router负载均衡示例

上述负载均衡都是不完整的示例,因为缺少了与客户端的交互,以下例子补充了这一点:该代码接受Req client的请求,将请求转发到Req worker处理,仍然使用了Req-Router模式来实现负载均衡;

此处比较费解的应该是消息的封装格式,具体而言:

  1. 初始时,clientRouter发送请求,worker发送就绪信号,它们在Router接收端表现为:

    1
    2
    [client_id][""][request] #client请求
    [worker_id][""]["READY"] #worker就绪

  2. Router转发请求,在Router的出口以及worker的接收端,分别表现为:

    1
    2
    [worker_id][""][client_id][""][request]
    [client_id][""][request] #worker接收到

  3. worker处理完回应以及进入Router接收端,分别为:

    1
    2
    3
    [client_id][""]["OK"]

    [worker_id][""][client_id][""]["OK"]

  4. Router转发响应,进入client接收端

    1
    ["OK"]
    清楚了以上信息的封装和剥除流程,自然明白代码写法;

此外使用了zmqepoll来对client请求和worker响应的监听,其中:

1
2
3
4
zmq::pollitem_t items[] = {
{backend, 0, ZMQ_POLLIN, 0},
{frontend, 0, ZMQ_POLLIN, 0}
};
zmq::pollitem_t是一个结构体,定义:
1
2
3
4
5
6
typedef struct {
void *socket; // 要监听的 ZMQ 套接字
int fd; // 文件描述符(仅用于原生 socket,比如 ZMQ_STREAM;ZMQ 套接字时设为 0)
short events; // 要监听的事件类型(如 ZMQ_POLLIN)
short revents; // 触发的事件类型(poll 之后会被填充)
}zmq::pollitem_t;
此外:
1
2
3
4
int zmq_poll(zmq_pollitem_t *items, //zmq_pollitem_t结构体数组
int nitems, //要监听的项数
long timeout //监听等待ms时间,-1代表一直阻塞监听
)

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include <iostream>
#include <zmq.hpp>
#include <thread>
#include <queue>

using namespace std;

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

bool s_send(zmq::socket_t& socket, const std::string& msg, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(msg.size());
memcpy(zmsg.data(), msg.data(), msg.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

void workerThread(void* arg){
zmq::context_t context(1);
zmq::socket_t worker(context, zmq::socket_type::req);

//设置id
int ptr = intptr_t(arg);
stringstream ss;
ss << std::hex << std::uppercase //最短4位、全大写输出
<< std::setw(4) << std::setfill('0') << ptr;
worker.set(zmq::sockopt::routing_id, ss.str());

worker.connect("ipc://backend.ipc");
s_send(worker, std::string("READY"));
while(true){
string client_identity = s_recv(worker);
s_recv(worker);
string data = s_recv(worker);
cout << "Worker Get Request:" << data << endl;

s_send(worker, client_identity, zmq::send_flags::sndmore);
s_send(worker, std::string(""), zmq::send_flags::sndmore);
s_send(worker, "OK");
}
}

void clientThread(void* arg){
zmq::context_t context(1);
zmq::socket_t client(context, zmq::socket_type::req);
int ptr = (intptr_t)arg;
stringstream ss;
ss<< std::hex << std:: uppercase <<
std::setw(4) << std::setfill('0') << ptr;
client.set(zmq::sockopt::routing_id, ss.str());
client.connect("ipc://frontend.ipc");

s_send(client, std::string("Hello"));
std::string reply = s_recv(client);
cout << "Client Got Reply:" << reply <<endl;

}

int main(){
zmq::context_t context(1);
zmq::socket_t frontend(context, zmq::socket_type::router);
zmq::socket_t backend(context, zmq::socket_type::router);

frontend.bind("ipc://frontend.ipc");
backend.bind("ipc://backend.ipc");

int clientNum = 10;
const int workerNum = 3;

for(int i=0; i<clientNum; i++){
std::thread t(clientThread, (void*)i);
t.detach();
}

for(int i=0; i<workerNum; i++){
std::thread t(workerThread, (void*)i);
t.detach();
}
std::queue<std::string> workerAddr_queue; //就绪worker的identity

while(true){
//epoll监听
zmq::pollitem_t items[] = {
{backend, 0, ZMQ_POLLIN, 0},
{frontend, 0, ZMQ_POLLIN, 0}
};
if(workerAddr_queue.size()){
zmq::poll(items, 2, -1);
}else { //当没有空闲worker时,不监听前端请求
zmq::poll(items, 1, -1);
}

if(items[0].revents & ZMQ_POLLIN){ //读取worker发来的数据
string workerAddr = s_recv(backend);
workerAddr_queue.push(workerAddr);
s_recv(backend);
string clientAddr = s_recv(backend); //此项可能是首次的Ready数据,也可能是后续处理完的client地址数据
if(clientAddr.compare("READY") != 0){ //不是ready数据
s_recv(backend);
string retMsg = s_recv(backend);

s_send(frontend, clientAddr, zmq::send_flags::sndmore);
s_send(frontend, std::string(""), zmq::send_flags::sndmore);
s_send(frontend, retMsg);
if(--clientNum == 0){ //客户请求都处理完,退出监听
break;
}
}
}

if(items[1].revents & ZMQ_POLLIN){ //读取来自client的数据
string clientAddr = s_recv(frontend);
s_recv(frontend);
string request = s_recv(frontend);

//向空闲worker转发请求
string workerAddr = workerAddr_queue.front();
workerAddr_queue.pop();
s_send(backend, workerAddr, zmq::send_flags::sndmore);
s_send(backend, std::string(), zmq::send_flags::sndmore);
s_send(backend, clientAddr, zmq::send_flags::sndmore);
s_send(backend, std::string(), zmq::send_flags::sndmore);
s_send(backend, request);
}
}
return 0;
}

Router-Dealer异步客户端与服务器

异步客户端体现在不等待服务器响应即发送多个请求异步服务器体现在不等待新请求情况下即可发送多个响应回复,以下例子,客户端以1s间隔不断发送请求,并且1s内每隔10ms进行回复监听,服务器收到请求通过zmq::proxy直接将消息转发到worker线程处理,worker线程会随机回复若干个响应,展示了客户端和服务端的异步效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#include <iostream>
#include <zmq.hpp>
#include <thread>

using namespace std;

//ZeroMQ工具函数:打印socket收到的信息
inline static void s_dump(zmq::socket_t& socket){
std::cout << "----------------------------------------" << std::endl;

while (1) {
// Process all parts of the message
zmq::message_t message;
zmq::recv_result_t rc = socket.recv(message);
if (!rc) {
std::runtime_error("recv error");
}

// Dump the message as text or binary
size_t size = message.size();
std::string data(static_cast<char*>(message.data()), size);

bool is_text = true;

size_t char_nbr;
unsigned char byte;
for (char_nbr = 0; char_nbr < size; char_nbr++) {
byte = data [char_nbr];
if (byte < 32 || byte > 127) //非ASCII码字符
is_text = false;
}
std::cout << "[" << std::setfill('0') << std::setw(3) << size << "]";
for (char_nbr = 0; char_nbr < size; char_nbr++) {
if (is_text)
std::cout << (char)data [char_nbr];
else
std::cout << std::setfill('0') << std::setw(2)
<< std::hex << static_cast<int>(static_cast<unsigned char>(data[char_nbr]));
}
std::cout << std::endl;

int more = 0; // Multipart detection
more = socket.get(zmq::sockopt::rcvmore);
if (!more)
break; // Last message part
}
}

//客户端发送请求、监听响应
class ClientTask{
public:
ClientTask():context(1), client(context, zmq::socket_type::dealer){}
void start(){
try{
#define within(num) (int)((float)num* random()/(RAND_MAX+1.0))
char randomId[10];
sprintf(randomId, "%04X-%04X", within(0x10000), within(0x10000));
printf("%s\n", randomId);
client.set(zmq::sockopt::routing_id, randomId);
client.connect("ipc://frontend.ipc");

zmq::pollitem_t items[] = {
{client, 0, ZMQ_POLLIN, 0}
};
int requestCnt = 0;
while(true){
//1s内监听和打印响应
for(int i=0; i<100; i++){ //100次尝试
zmq::poll(items, 1, 10); //10ms超时等待
if(items[0].revents & ZMQ_POLLIN){
printf("\n%s Request Done:", randomId);
s_dump(client);
}
}
char request_string[16];
sprintf(request_string, "request #%d", ++requestCnt);
client.send(request_string, strlen(request_string)); //循环,每秒发送一次请求信息
}
}
catch(std::exception&e){
cout << "ClientTask " <<e.what() <<endl;
}
}
private:
zmq::context_t context;
zmq::socket_t client;
};

//服务器处理线程类
class ServerWorker{
public:
ServerWorker(zmq::context_t& context, zmq::socket_type socket_type):context(context),worker(context,socket_type){}

void work(){
worker.connect("inproc://backend");
try{
while(true){
zmq::message_t clientAddr;
zmq::message_t request;
//Dealer对象不会插入空格,Router在request前插入身份帧就转发了
worker.recv(&clientAddr);
worker.recv(&request);
zmq::message_t clientAddrBak;
zmq::message_t requestBak;

int replies = within(5);
for(int i=0; i<replies; i++){ //随机回复若干响应
std::this_thread::sleep_for(std::chrono::milliseconds(within(1000)+1));
clientAddrBak.copy(&clientAddr);
requestBak.copy(&request);
worker.send(clientAddrBak, zmq::send_flags::sndmore);
worker.send(requestBak, zmq::send_flags::none);
}
}
}catch(std::exception& e){
cout << "ServerWorker" <<e.what() <<endl;
}
}

private:
zmq::context_t& context;
zmq::socket_t worker;

};

//服务器创建工作线程,转发客户端请求
class ServerTask{
public:
ServerTask():context(1),frontend(context, zmq::socket_type::router), backend(context, zmq::socket_type::dealer){}
enum {MAXTHREADNUM = 5}; //worker处理线程数量
void run(){
frontend.bind("ipc://frontend.ipc");
backend.bind("inproc://backend");

std::vector<ServerWorker*>workers;
std::vector<std::thread*> workers_thread;
for(int i=0; i<MAXTHREADNUM; i++){
workers.push_back(new ServerWorker(context, zmq::socket_type::dealer));
workers_thread.push_back(new std::thread(std::bind(&ServerWorker::work, workers[i])));
workers_thread[i]->detach();
}

try{
//阻塞函数,当frontend和backend均未关闭时,直接自动互相转发消息
zmq::proxy(static_cast<void*>(frontend), static_cast<void*>(backend), nullptr);
}catch(std::exception&e){
cout << "ServerTask" <<e.what() <<endl;
}

//proxy阻塞退出,说明socket被关闭,执行析构
for(int i=0; i<MAXTHREADNUM; i++){
delete workers[i];
delete workers_thread[i];
}
}

private:
zmq::context_t context;
zmq::socket_t frontend;
zmq::socket_t backend;
};

int main(){
ClientTask cl1,cl2,cl3;
ServerTask s1;
std::thread t1(std::bind(&ClientTask::start, &cl1));
std::thread t2(std::bind(&ClientTask::start, &cl2));
std::thread t3(std::bind(&ClientTask::start, &cl3));
std::thread t4(std::bind(&ServerTask::run, &s1));

t1.join(); //对象在主线程调用,必须保证子线程退出后才销毁主线程
t2.join();
t3.join();
t4.join();

getchar();
return 0;
}

工作示例:经纪间路由

暂略

可靠的请求-应答模式

本节介绍了若干种作者基于ZeroMQ上做的额外工作以实现可靠的请求应答模式(Reliable Request-Reply Pattern, RRR模式),所谓可靠,定义是我们能处理一些“明确”失败的方法,五种失败原因涵盖了大部分失败场景:

  1. 系统代码导致的崩溃、应用层业务设计原因导致的崩溃,例如因为处理客户端缓慢消息导致内存耗尽。

  2. 消息队列溢出异常。系统代码习惯去处理缓慢的客户端,队列溢出时系统开始丢失消息,这些信息可能异常地流向客户端。

  3. 网络出现故障导致连接断开,ZeroMQ已经支持自动重连,但是网络上的消息可能丢失。

  4. 硬件故障,导致进程无法运行。

  5. 网络奇奇怪怪的故障,例如端口失效,网络无法访问。

可靠模式几乎不考虑后两种硬件故障,因为处理这些低概率故障成本非常大(让作者别墅靠海的方案)。

懒惰海盗模式(Lazy Pirate Pattern)

懒惰海盗模式基于Req-Rep模式增加了一些错误处理

  1. 使用Poll监听消息到达,仅消息到达时轮询并且获取信息;

  2. 增加超时机制,在消息没有按时到达情况下,重新发送请求;消息没有按时到达可能来自服务器的崩溃或客户端的异常,这里采用了比较暴力的手段,将旧客户端直接关闭而重新发起新客户端连接;

  3. 增加请求计数机制,当重发消息达到一定数量,放弃请求任务并退出;

还有额外的一点是,连接断开ZeroMQ 客户端会自动发起重连,如果服务器没有退出或者重新被拉起,连接是可继续的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//server.cpp
#include <iostream>
#include <zmq.hpp>
#include <thread>

using namespace std;

#define within(num) (int)((float)num* random()/(RAND_MAX + 1.0))

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

bool s_send(zmq::socket_t& socket, const std::string& data, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(data.size());
memcpy(zmsg.data(), data.data(), data.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

int main(){
zmq::context_t context(1);
zmq::socket_t server(context, zmq::socket_type::rep);
server.bind("tcp://*:5555");

int cnt = 0;
while(true){
std::string request = s_recv(server);
cnt++;
if(cnt > 3 && within(3)==0){
cout << "I simulate crash" <<endl;
break;
}
else if(cnt > 3 && within(3)==0){
cout << "I simulate CPU overload!" <<endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
cout << "I simulate Normal responce" <<endl;
s_send(server, request);
}
return 0;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//client.cpp
#include <iostream>
#include <zmq.hpp>
#include <string>
#include <thread>

using namespace std;

#define TIMEOUT 2500 //发送请求后监听时间
#define RETRYTIMES 3 //重试次数

zmq::socket_t* getClientSocket(zmq::context_t& context){
cout << "I connect to Server" <<endl;
zmq::socket_t* client = new zmq::socket_t(context, zmq::socket_type::req);
//0代表socket不等待信息发送完就立刻关闭socket,-1代表无限等待发送完成才关闭,其余正数表示等待n毫秒发送信息才关闭socket
//因为客户端重试前要重建,应该立刻释放
client->set(zmq::sockopt::linger, 0);
client->connect("tcp://localhost:5555");
return client;
}

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

bool s_send(zmq::socket_t& socket, const std::string& data, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(data.size());
memcpy(zmsg.data(), data.data(), data.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

int main(){
zmq::context_t context(1);
zmq::socket_t* client = getClientSocket(context);
int retry = RETRYTIMES;
int requestCnt = 0;
while(retry){
char request[16];
sprintf(request, "request #%d", requestCnt++);
s_send(*client, std::string(request, strlen(request)));
cout << " Send Request: " << request << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
while(true){
zmq::pollitem_t items[] = {
{*client, 0, ZMQ_POLLIN, 0}
};
zmq::poll(items, 1, TIMEOUT);
if(items[0].revents & ZMQ_POLLIN){
string rmsg = s_recv(*client);
if(rmsg.compare(request) == 0){ //服务器发回响应信息
cout << rmsg <<" Got Reply!" << endl;
retry = RETRYTIMES;
break;
}
else{
cout << "UnKnown Format!" <<endl;
}
}
else if(--retry == 0){
cout << "Server maybe offline, Stop!"<<endl;
break;
}
else{
cout << "Retry to sned request!"<<endl;
delete client;
client = getClientSocket(context);
s_send(*client, std::string(request, strlen(request)));
}
}
}
return 0;
}

简单(Simple)海盗模式(基本可靠队列)

基本可靠队列基本上等同于扩展模式中的Router充当broker角色,通过worker发送的Req信号以及Router的负载均衡,实际上维护了对worker的任务队列分配,做到了负载均衡,基于懒惰海盗模式的基础上加入Router Broker节点即可,测试时可以尝试使用多个clientworker,这是一种N对N网络

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//broker.cpp clients与workers间负载均衡
#include <iostream>
#include <zmq.hpp>
#include <queue>
using namespace std;

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

bool s_send(zmq::socket_t& socket, const std::string& data, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(data.size());
memcpy(zmsg.data(), data.data(), data.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

int main(){
zmq::context_t context(1);
zmq::socket_t frontend(context, zmq::socket_type::router);
zmq::socket_t backend(context, zmq::socket_type::router);
frontend.bind("tcp://*:5555");
backend.bind("tcp://*:5556");

std::queue<std::string> workerAddr_queue;

while(true){
zmq::pollitem_t items[]={
{backend, 0, ZMQ_POLLIN, 0},
{frontend, 0, ZMQ_POLLIN, 0},
};

if(workerAddr_queue.size()){
zmq::poll(items, 2, -1);
}else{
zmq::poll(items,1, -1);
}

if(items[0].revents & ZMQ_POLLIN){
string workerAddr = s_recv(backend);
workerAddr_queue.push(workerAddr);
s_recv(backend);
string clientAddr = s_recv(backend);
if(clientAddr.compare("READY") != 0){
s_recv(backend);
string reply = s_recv(backend);

s_send(frontend, clientAddr, zmq::send_flags::sndmore);
s_send(frontend, string(""), zmq::send_flags::sndmore);
s_send(frontend, reply, zmq::send_flags::none);
}
}

if(items[1].revents & ZMQ_POLLIN){
string clientAddr = s_recv(frontend);
s_recv(frontend);
string request = s_recv(frontend);

s_send(backend, workerAddr_queue.front(),zmq::send_flags::sndmore);
workerAddr_queue.pop();
s_send(backend, string(""),zmq::send_flags::sndmore);
s_send(backend, clientAddr, zmq::send_flags::sndmore);
s_send(backend, string(""),zmq::send_flags::sndmore);
s_send(backend, request);
}
}

return 0;
}

对client和worker进行简单的改动(例如插入身份认证,server的rep类型改成req类型等):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <iostream>
#include <zmq.hpp>
#include <string>
#include <thread>

using namespace std;

#define TIMEOUT 2500 //发送请求后监听时间
#define RETRYTIMES 3 //重试次数

#define within(num) (int)((float)num* random()/(RAND_MAX + 1.0))

zmq::socket_t* getClientSocket(zmq::context_t& context, const char* randomId){
cout << "I connect to Server" <<endl;
zmq::socket_t* client = new zmq::socket_t(context, zmq::socket_type::req);
//0代表socket不等待信息发送完就立刻关闭socket,-1代表无限等待发送完成才关闭,其余正数表示等待n毫秒发送信息才关闭socket
client->set(zmq::sockopt::linger, 0);
client->set(zmq::sockopt::routing_id, randomId);
client->connect("tcp://localhost:5555");
return client;
}

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

bool s_send(zmq::socket_t& socket, const std::string& data, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(data.size());
memcpy(zmsg.data(), data.data(), data.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

int main(){
srandom ((unsigned) time (NULL));
zmq::context_t context(1);
char randomId[10];
sprintf(randomId, "%04X-%04X", within(0x10000), within(0x10000));
zmq::socket_t* client = getClientSocket(context, randomId);
int retry = RETRYTIMES;
int requestCnt = 0;
while(retry){
char request[16];
sprintf(request, "request #%d", requestCnt++);
s_send(*client, std::string(request, strlen(request)));
cout << "I ("<< randomId <<") Send Request: " << request << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
while(true){
zmq::pollitem_t items[] = {
{*client, 0, ZMQ_POLLIN, 0}
};
zmq::poll(items, 1, TIMEOUT);
if(items[0].revents & ZMQ_POLLIN){
string rmsg = s_recv(*client);
if(rmsg.compare(request) == 0){ //服务器发回响应信息
cout << "I ("<< randomId << ")" << rmsg <<" Got Reply!" << endl;
retry = RETRYTIMES;
break;
}
else{
cout << "UnKnown Format!" <<endl;
}
}
else if(--retry == 0){
cout << "I ("<< randomId <<") Server maybe offline, Stop!"<<endl;
break;
}
else{
cout << "I ("<< randomId <<") Retry to send :" << request <<endl;
delete client;
client = getClientSocket(context, randomId);
s_send(*client, std::string(request, strlen(request)));
}
}
}
return 0;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//worker.cpp
#include <iostream>
#include <zmq.hpp>
#include <thread>

using namespace std;

#define within(num) (int)((float)num* random()/(RAND_MAX + 1.0))

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

bool s_send(zmq::socket_t& socket, const std::string& data, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(data.size());
memcpy(zmsg.data(), data.data(), data.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

int main(){
srandom ((unsigned) time (NULL));
zmq::context_t context(1);
zmq::socket_t server(context, zmq::socket_type::req);
char randomId[10];
sprintf(randomId, "%04X-%04X", within(0x10000), within(0x10000));
server.set(zmq::sockopt::routing_id, randomId);
server.connect("tcp://localhost:5556");

s_send(server, "READY");

int cnt = 0;
while(true){
std::string clientAddr = s_recv(server);
s_recv(server);
std::string request = s_recv(server);
cnt++;
if(cnt > 3 && within(3)==0){
cout << "I ("<< randomId <<") simulate crash" <<endl;
break;
}
else if(cnt > 3 && within(3)==0){
cout << "I("<< randomId <<")simulate CPU overload!" <<endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
cout << "I ("<< randomId <<") simulate Normal responce" <<endl;
s_send(server, clientAddr, zmq::send_flags::sndmore);
s_send(server, string(""), zmq::send_flags::sndmore);
s_send(server, request);
}
return 0;
}

偏执(Paranoid)海盗模式(强大可靠队列)

简单海盗模式单单引入Router做队列负载均衡似乎达到不错效果,但仍然有若干问题没有解决:

  1. Router崩溃后client可以恢复(得益于ZeroMQ的自动重连),虽然工人也可以自动重连,但对于新的队列,工人没有发送READY信号(注意该信号发送在循环以外),所以Router不会将请求发送到这些worker所以相当于Router崩溃导致worker端工作异常

  2. Router不能检测worker状态worker自身崩溃了,客户发送的请求有概率丢失(虽然海盗模式的retry能解决,但显然不是一个好的趋势);

所以有必要维持从workerrouter的心跳检测

本例建立了一个最接近项目级的故障处理模式

  1. Router Broker崩溃,转发信息失效,worker会长久持续重连,而client在多次重连后走向放弃,这也是最接近现实的处理模式,一旦Router恢复,worker也会自动重连,client重新发起需求即可收到响应。

  2. 建立了一种N对N的集群模式,当存在多个workerclient时,存在worker下线情况,Router及时获知,client能够自动重连并且重新得到其他worker服务;

  3. 异步且高速,因为使用了RouterDealer

  4. 负载均衡、公平队列,使用了前述邮局式的Req-Router-Req方案。

以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
//broker.cpp clients与workers间负载均衡
#include <iostream>
#include <zmq.hpp>
#include <queue>
#include <stack>

using namespace std;

#define HEARTBEAT_RETRY 3 //超过三次等待无心跳,重启worker
#define HEARTBEAT_INTERVAL 1000 //心跳间隔

typedef struct{
std::string identity;
std::chrono::steady_clock::time_point expiry;
} worker_t;

struct WorkerQueue{
std::vector<worker_t> workerArray;

void addWorker(const string& identity){
auto expiry = std::chrono::steady_clock::now() + std::chrono::milliseconds(HEARTBEAT_INTERVAL*HEARTBEAT_RETRY);
for(int i=0; i<workerArray.size(); i++){
if(workerArray[i].identity == identity){
workerArray[i].expiry = expiry;
return;
}
}
worker_t worker;
worker.identity = identity;
worker.expiry = expiry;
workerArray.push_back(worker);
}

void removeExpiryWorker(){
if(workerArray.empty()){
return;
}
auto now = std::chrono::steady_clock::now();
int size = workerArray.size();
workerArray.erase(std::remove_if(workerArray.begin(), workerArray.end(), [now](const worker_t& w){
return w.expiry > now;
}), workerArray.end());

int after_size = workerArray.size();
if(after_size != size)
cout << "Remove Expiry Worker Num:" << size - after_size << endl;
}

void refreshWorker(const string& identity){
for(auto& pos : workerArray){
if(pos.identity == identity){
pos.expiry = std::chrono::steady_clock::now() + std::chrono::milliseconds(HEARTBEAT_INTERVAL*HEARTBEAT_RETRY);
return; //身份唯一
}
}
cout << "Error: No Such Identity: " << identity;
}

std::string pop_front(){
if(workerArray.empty()){
cout << "pop_front failed!" << endl;
return {};
}
string identity = workerArray.front().identity;
workerArray.erase(workerArray.begin());
return identity;
}
};

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

//如果s_recvAll前调用s_recv未接受完数据,s_recvAll会从头重新接收
vector<string> s_recvAll(zmq::socket_t& socket){
vector<string> temp;
while(true){
zmq::message_t rzmsg;
try{
if(!socket.recv(rzmsg, zmq::recv_flags::none))
return {};
string rmsg(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
temp.push_back(rmsg);
}
catch(zmq::error_t& e){
cout << "s_recvAll Error:" <<e.what() <<endl;
return {};
}
if(!rzmsg.more())
break;
}
return temp;
}

bool s_send(zmq::socket_t& socket, const std::string& data, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(data.size());
memcpy(zmsg.data(), data.data(), data.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

//ZeroMQ工具函数:打印socket收到的信息
inline static void s_print(zmq::message_t message){
std::cout << "----------------------------------------" << std::endl;

// Dump the message as text or binary
size_t size = message.size();
std::string data(static_cast<char*>(message.data()), size);

bool is_text = true;

size_t char_nbr;
unsigned char byte;
for (char_nbr = 0; char_nbr < size; char_nbr++) {
byte = data [char_nbr];
if (byte < 32 || byte > 127) //非ASCII码字符
is_text = false;
}
std::cout << "[" << std::setfill('0') << std::setw(3) << size << "]";
for (char_nbr = 0; char_nbr < size; char_nbr++) {
if (is_text)
std::cout << (char)data [char_nbr];
else
std::cout << std::setfill('0') << std::setw(2) \
<< std::hex << static_cast<int>(static_cast<unsigned char>(data[char_nbr]));
}
std::cout << std::endl;
}


int main(){
zmq::context_t context(1);
zmq::socket_t frontend(context, zmq::socket_type::router);
zmq::socket_t backend(context, zmq::socket_type::router);
frontend.bind("tcp://*:5555");
backend.bind("tcp://*:5556");

WorkerQueue workerQueue;
auto heartBeatTimePoint = std::chrono::steady_clock::now() + std::chrono::milliseconds(HEARTBEAT_INTERVAL);

while(true){
zmq::pollitem_t items[]={
{backend, 0, ZMQ_POLLIN, 0},
{frontend, 0, ZMQ_POLLIN, 0},
};

if(workerQueue.workerArray.size()){
zmq::poll(items, 2, HEARTBEAT_INTERVAL);
}
else{
zmq::poll(items,1, HEARTBEAT_INTERVAL);
}

if(items[0].revents & ZMQ_POLLIN){ //来自woker信息
vector<string> rmsgArray = s_recvAll(backend);
// cout << "Broker Receive Msg from worker, Msg size: " << rmsgArray.size() <<endl;
// cout << "{ " <<endl;
// for(const auto& pos : rmsgArray){
// cout << pos <<endl;
// }
// cout << "} endddd " <<endl;
std::string workerAddr = rmsgArray[0];
if(rmsgArray.size() == 2){
if(rmsgArray[1].compare("READY") == 0){
cout << "workerAddr: " << workerAddr << " is READY " <<endl;
workerQueue.addWorker(workerAddr);
}
else if(rmsgArray[1].compare("TOPIC_HEARTBEAT_PING")==0){
//cout << "workerAddr: " << workerAddr << " send PING " <<endl;
workerQueue.refreshWorker(workerAddr);
}
}
else if(rmsgArray.size() == 3){ //来自dealer server的处理请求信息
workerQueue.addWorker(workerAddr);
string clientAddr = rmsgArray[1];
string request_res = rmsgArray[2];
cout << "workerAddr Done Request: " << request_res<<endl;
s_send(frontend, clientAddr, zmq::send_flags::sndmore);
s_send(frontend, string(""), zmq::send_flags::sndmore);
s_send(frontend, request_res);
}
}

if(items[1].revents & ZMQ_POLLIN){ //来自req客户端信息,转发至dealer
vector<string> rmsgArray = s_recvAll(frontend);
cout<< "Receive from req Client: Msg size: " << rmsgArray.size() << endl;
std::string clientAddr = rmsgArray[0];
string request = rmsgArray[2];
//当dealer不是连接rep时,空分割符不是必须的
s_send(backend, workerQueue.pop_front(),zmq::send_flags::sndmore);
s_send(backend, string(""),zmq::send_flags::sndmore);
s_send(backend, clientAddr, zmq::send_flags::sndmore);
s_send(backend, string(""),zmq::send_flags::sndmore);
s_send(backend, request);
}

if(std::chrono::steady_clock::now() >= heartBeatTimePoint){
for(const auto& pos : workerQueue.workerArray){
s_send(backend, pos.identity, zmq::send_flags::sndmore);
s_send(backend, string("TOPIC_HEARTBEAT_PONG"));
}
heartBeatTimePoint = std::chrono::steady_clock::now() + std::chrono::milliseconds(HEARTBEAT_INTERVAL);
}
//workerQueue.removeExpiryWorker();
}

return 0;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
//worker.cpp
#include <iostream>
#include <zmq.hpp>
#include <thread>

using namespace std;

#define within(num) (int)((float)num* random()/(RAND_MAX + 1.0))
#define HEARTBEAT_RETRY 3 //超过三次等待无心跳,重启worker
#define HEARTBEAT_INTERVAL 1000 //心跳间隔
#define HEARTBEAT_INITWAIT 1000 //初次等待
#define HEARTBEAT_WAITMAX 32000 //最长等待时间

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

vector<string> s_recvAll(zmq::socket_t& socket){
vector<string> temp;
while(true){
zmq::message_t rzmsg;
try{
if(!socket.recv(rzmsg, zmq::recv_flags::none))
return {};
string rmsg(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
temp.push_back(rmsg);
}
catch(zmq::error_t& e){
cout << "s_recvAll Error:" <<e.what() <<endl;
return {};
}
if(!rzmsg.more())
break;
}
return temp;
}

bool s_send(zmq::socket_t& socket, const std::string& data, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(data.size());
memcpy(zmsg.data(), data.data(), data.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

zmq::socket_t* getWorkerSocket(zmq::context_t& context, const char* randomId){
zmq::socket_t* worker = new zmq::socket_t(context, zmq::socket_type::dealer);
worker->set(zmq::sockopt::routing_id, randomId);
worker->set(zmq::sockopt::linger, 0);
worker->connect("tcp://localhost:5556");

cout << "I (" << randomId << ") Worker is READY" <<endl;
s_send(*worker, "READY");
return worker;
}

int main(){
srandom ((unsigned) time (NULL));
zmq::context_t context(1);
char randomId[10];
sprintf(randomId, "%04X-%04X", within(0x10000), within(0X10000));
zmq::socket_t* worker = getWorkerSocket(context, randomId);
int cnt = 0;
auto heartBeatTimePoint = std::chrono::steady_clock::now() + std::chrono::milliseconds(HEARTBEAT_INTERVAL);
int initWait = HEARTBEAT_INITWAIT;
int heartBeatFail = HEARTBEAT_RETRY;

int requestDoneCnt = 0;
while(true){
zmq::pollitem_t items[] = {
{*worker, 0, ZMQ_POLLIN, 0}
};
zmq::poll(items, 1, HEARTBEAT_INTERVAL);

if(items[0].revents & ZMQ_POLLIN){
vector<string> rmsgArray = s_recvAll(*worker);
// cout << "I(" << randomId << ") receive Msg size:"<< rmsgArray.size() <<endl;
// cout << "{ " <<endl;
// for(const auto& pos : rmsgArray){
// cout << pos <<endl;
// }
// cout << "} endddd " <<endl;
if(rmsgArray.size() == 4){
std::string clientAddr = rmsgArray[1];
std::string request = rmsgArray[3];
#if 0 //故障模拟
cnt++;
if(cnt > 3 && within(20)==0){
cout << "I ("<< randomId <<") simulate crash" <<endl;
break;
}
else if(cnt > 3 && within(3)==0){
cout << "I("<< randomId <<")simulate CPU overload!" <<endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
}
#endif
std::this_thread::sleep_for(std::chrono::seconds(1)); //模拟处理请求
requestDoneCnt ++;
cout << "I ("<< randomId <<") simulate Normal responce" <<endl;
s_send(*worker, clientAddr, zmq::send_flags::sndmore);
s_send(*worker, request);
}
else if(rmsgArray.size() == 1){ //心跳
if(rmsgArray[0].compare("TOPIC_HEARTBEAT_PONG") == 0){
auto now = std::chrono::system_clock::now();
auto timestamp = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
//cout << "Server Got HeartBeat" << timestamp <<endl;
heartBeatFail = HEARTBEAT_RETRY;
}
else{
cerr << "Invalid format!" <<endl;
}
}else{
////...未预料的信息处理
}
initWait = HEARTBEAT_INITWAIT;
}
else if(--heartBeatFail == 0){
cout << "Worker:(" << randomId << ") heartBeat ReTry Failed, ReStart..." << endl;
delete worker;
std::this_thread::sleep_for(std::chrono::milliseconds(initWait));
//重复的连续重启前,指数退避睡眠
if(initWait < HEARTBEAT_WAITMAX)
initWait *= 2;

worker = getWorkerSocket(context,randomId);
heartBeatFail = HEARTBEAT_RETRY;
}
if(requestDoneCnt && requestDoneCnt % 10 == 0)
cout << "Success Handle Request Num: " <<requestDoneCnt <<endl;

if(std::chrono::steady_clock::now() >= heartBeatTimePoint){
s_send(*worker, std::string("TOPIC_HEARTBEAT_PING"));
heartBeatTimePoint = std::chrono::steady_clock::now() + std::chrono::milliseconds(HEARTBEAT_INTERVAL);
}
}
return 0;
}

客户端代码基本来自前述简单海盗模式的客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
//client.cpp 客户端代码,发起请求
#include <iostream>
#include <zmq.hpp>
#include <string>
#include <thread>

using namespace std;

#define TIMEOUT 2500 //发送请求后监听时间
#define RETRYTIMES 3 //重试次数

#define within(num) (int)((float)num* random()/(RAND_MAX + 1.0))

zmq::socket_t* getClientSocket(zmq::context_t& context, const char* randomId){
cout << "I connect to Server" <<endl;
zmq::socket_t* client = new zmq::socket_t(context, zmq::socket_type::req);
//0代表socket不等待信息发送完就立刻关闭socket,-1代表无限等待发送完成才关闭,其余正数表示等待n毫秒发送信息才关闭socket
client->set(zmq::sockopt::linger, 0);
client->set(zmq::sockopt::routing_id, randomId);
client->connect("tcp://localhost:5555");
return client;
}

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

bool s_send(zmq::socket_t& socket, const std::string& data, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(data.size());
memcpy(zmsg.data(), data.data(), data.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

int main(){
srandom ((unsigned) time (NULL));
zmq::context_t context(1);
char randomId[10];
sprintf(randomId, "%04X-%04X", within(0x10000), within(0x10000));
zmq::socket_t* client = getClientSocket(context, randomId);
int retry = RETRYTIMES;
int requestCnt = 0;
while(retry){
char request[16];
sprintf(request, "request #%d", requestCnt++);
s_send(*client, std::string(request, strlen(request)));
cout << "I ("<< randomId <<") Send Request: " << request << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
while(true){
zmq::pollitem_t items[] = {
{*client, 0, ZMQ_POLLIN, 0}
};
zmq::poll(items, 1, TIMEOUT);
if(items[0].revents & ZMQ_POLLIN){
string rmsg = s_recv(*client);
if(rmsg.compare(request) == 0){ //服务器发回响应信息
cout << "I ("<< randomId << ")" << rmsg <<" Got Reply!" << endl;
retry = RETRYTIMES;
break;
}
else{
cout << "UnKnown Format!" <<endl;
}
}
else if(--retry == 0){
cout << "I ("<< randomId <<") Server maybe offline, Stop!"<<endl;
break;
}
else{
cout << "I ("<< randomId <<") Retry to send :" << request <<endl;
delete client;
client = getClientSocket(context, randomId);
s_send(*client, std::string(request, strlen(request)));
}
}
}
return 0;
}

高级发布-订阅模式

设计艺术

ZeroMQ的发布-订阅模式针对的是集群的扩展性,意味着该模式能够以快速的模式将成千上万个消息,发送到数以千计的节点。为此,发布订阅模式和推挽模式一样采用了相同技巧:不做后台交流(get rid of back-chatter),即发布者和订阅者不会直接交谈,一些例外是订阅者会将订阅发送到发布者,但是这不常见,且匿名的。

不做后台交流的好处是带来了简洁的API以及信息流,当然也意味着:发布者无法获知订阅者的连接时机、存活状态等、订阅者无法约束发布者的发送速率,在大多数情况下,多播的质量都是理想的,但仍要清楚发布订阅模式无法实现可靠的多播,当发布者和订阅者失去同步,信息将任意丢失;

发布-订阅侦听(浓缩咖啡模式)

Pub-Sub Tracing,亦称Espresso Pattern,通过listen线程的zmq::socket_type::pair(pair1)连接到另一个线程的zmq::socket_type::pair(pair2),并且pair1会打印任何pair2收到的内容,pair2的内容是来自两个发布-订阅组xpub-subpub-xsub)的信息流拷贝,即所有的信息流都会通过zmq::proxy映射到pair2中,具体来说:

  1. publisher发布信息,xsub接收到通过proxy被转发到xpub,同时该消息被监听者捕获;

  2. xpub连接subscriber时,同时也会收到来自subscriber的订阅控制帧消息(该消息也被监听者捕获),随即xpub会将xsub中被subscirber订阅的消息发送到subscirber,收到五次后subscirber销毁,自动发送订阅取消帧(监听者捕获),完成通信。

以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <iostream>
#include <zmq.hpp>
#include <thread>

using namespace std;

void subscriber(zmq::context_t& context){
zmq::socket_t subscriber(context, zmq::socket_type::sub);
subscriber.connect("tcp://localhost:6001");
subscriber.set(zmq::sockopt::subscribe, "A");
subscriber.set(zmq::sockopt::subscribe, "B");

int cnt = 0;
while(cnt<5){
zmq::message_t rzmsg;
if(subscriber.recv(rzmsg, zmq::recv_flags::none)){
string rmsg(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
cout << "receive msg:" << rmsg <<endl;
cnt++;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}

void publisher(zmq::context_t& context){
zmq::socket_t publisher(context, zmq::socket_type::pub);
publisher.bind("tcp://*:6000");

while(true){
char msg[10];
sprintf(msg, "%c-%05d", 'A'+random()%10, random()%100000);
publisher.send(msg, strlen(msg));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

void listener(zmq::context_t& context){
zmq::socket_t listener(context, zmq::socket_type::pair);
listener.connect("inproc://proxy");
zmq::message_t rzmsg;
while(true){
listener.recv(rzmsg, zmq::recv_flags::none);
string rmsg(static_cast<const char*>(rzmsg.data()),rzmsg.size());
if(rmsg[0]==0 || rmsg[0]==1){ //订阅控制帧, 0取消订阅,1订阅
cout << static_cast<int>(rmsg[0]) << ": " << rmsg[1] <<endl;
}
else{ //普通数据
cout << "listener Print: " <<rmsg <<endl;
}
}
}

int main(){
zmq::context_t context(1);
zmq::socket_t xpub(context, zmq::socket_type::xpub);
xpub.bind("tcp://*:6001");
zmq::socket_t xsub(context, zmq::socket_type::xsub);
xsub.connect("tcp://localhost:6000");
zmq::socket_t proxy_(context, zmq::socket_type::pair);
proxy_.bind("inproc://proxy");

//启动线程
std::thread subscriberThread(subscriber, std::ref(context)); //context不可复制,需使用ref
std::thread publishThread(publisher, std::ref(context));
std::thread listenerThread(listener, std::ref(context));

//c++接口,需void*
zmq::proxy(xsub, xpub, static_cast<void*>(proxy_)); //xsub
//等效写法:C接口
//zmq_proxy(xsub, xpub, proxy_);
//等效写法:新版本接口:
//zmq::proxy_steerable(xsub, xpub, proxy_);

subscriberThread.join();
publishThread.join();
listenerThread.join();

return 0;
}

最后的值缓存(Last Value Caching,LVC)

最后值缓存策略指的是发布者会缓存最后发布的主题,并在新订户加入时向其发送该主题,使得订户无需等待就能够获取信息,在大型的Pub-Sub网络中直接实现这种是不可能的,因为订户的数据量太多,要实现此可能需要通过PGWM协议,其可以通过交换机控制以太网开关,实现和订户的TCP单播,而且这可能引入不公平的信息分布以及网络拥塞,而且这个单播编程发生在交换机中。

在浓缩咖啡模式中可以看到,xpub是可以接收到xsub的订阅消息的,这适用于只有几十个pub-sub小型网络中,ZeroMQ可以基于此搭建LVC网络。

以下模拟了一个必须LVC的场景:发布端开始时发布1000个订阅消息,随即进入缓慢的随机更新阶段,如果此时有新订户加入,需要经过长的时间才能收到订阅消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//publisher.cpp
#include <iostream>
#include <zmq.hpp>
#include <thread>
#include <sstream>

using namespace std;

#define within(num) (int)((float)num * random()/(RAND_MAX+1.0))

bool s_send(zmq::socket_t& socket, const std::string& data, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(data.size());
memcpy(zmsg.data(), data.data(), data.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

int main(){
srand((unsigned)time(NULL));
zmq::context_t context(1);
zmq::socket_t publisher(context, zmq::socket_type::pub);
publisher.bind("tcp://*:5556");

std::this_thread::sleep_for(std::chrono::seconds(2)); //等待connect

for(int i=0; i<1000; i++){
stringstream topic;
topic << std::hex << std::uppercase <<std::setw(3) << std::setfill('0') <<i;
s_send(publisher, topic.str(), zmq::send_flags::sndmore);
s_send(publisher, string("Save Roger"));
}

while(true){
std::this_thread::sleep_for(std::chrono::seconds(1)); //1s随机发一次主题
stringstream topic;
topic << std::hex << std::uppercase <<std::setw(3) << std::setfill('0') << within(1000);
s_send(publisher, topic.str(), zmq::send_flags::sndmore);
s_send(publisher, string("Off His Head"));
}

return 0;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//subscriber.cpp
#include <iostream>
#include <zmq.hpp>
#include <string>
#include <thread>

using namespace std;

#define within(num) (int)((float)num * random()/(RAND_MAX+1.0))

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

int main(){
srand((unsigned)time(NULL));
zmq::context_t context(1);
zmq::socket_t subscriber(context, zmq::socket_type::sub);
subscriber.connect("tcp://localhost:5557");

stringstream topic;
topic << std::hex << std::uppercase <<std::setw(3) << std::setfill('0') << within(1000);
subscriber.set(zmq::sockopt::subscribe, topic.str());

while(true){
string topic_ = s_recv(subscriber); //滤去第一帧取数据
if(topic_ != topic.str())
break;
string data = s_recv(subscriber);
cout << "subscriber Print:" << data << endl;
}
return 0;
}

以下增加一个缓存进程实现LVC:由此每个新订户加入时,都能够获得Save Roger消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//cache.cpp
#include <iostream>
#include <zmq.hpp>
#include <thread>

using namespace std;

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

bool s_send(zmq::socket_t& socket, const std::string& data, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(data.size());
memcpy(zmsg.data(), data.data(), data.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

int main(){
zmq::context_t context(1);
//前端收到缓存topic,后端收到订阅控制帧下发topic
zmq::socket_t frontend(context, zmq::socket_type::sub);
zmq::socket_t backend(context, zmq::socket_type::xpub);

frontend.connect("tcp://localhost:5555");
frontend.set(zmq::sockopt::subscribe, ""); //Subscribe All

//backend.set(zmq::sockopt::xpub_verbose, 1);
backend.bind("tcp://*:5557");

zmq::pollitem_t items[] = {
{frontend, 0, ZMQ_POLLIN, 0},
{backend,0, ZMQ_POLLIN, 0}
};

std::unordered_map<string, string> cacheMap;

while(true){
if(zmq::poll(items, 2, 1000) == -1)
break;

if(items[0].revents & ZMQ_POLLIN){
string topic = s_recv(frontend);
string data = s_recv(frontend);
if(topic.empty())
break;
cacheMap[topic] = data;
s_send(backend, topic, zmq::send_flags::sndmore);
s_send(backend, data);
}

if(items[1].revents & ZMQ_POLLIN){
cout << "Receive Msg: ";
zmq::message_t rzmsg;
backend.recv(&rzmsg);
if(rzmsg.empty())
break;

uint8_t *event = (uint8_t *)rzmsg.data();

if(event[0] == 1){ //第一字节:0x01为subscriber发送的订阅标志位
string topic((char*)event + 1, rzmsg.size()-1);
cout << "Receive Msg: " << topic;
if(cacheMap.count(topic)){
s_send(backend, topic, zmq::send_flags::sndmore);
s_send(backend, cacheMap[topic]);
}
}
}
}
return 0;
}

缓慢订户检测(自杀蜗牛模式)

Slow Subscriber Detetion,亦称Suicidal Snail Pattern,是用于处理订户速度太慢的问题,回顾一些经典的策略

  1. 在发布者端排队:但大量的上游消息可能会耗尽发布者端内存,当存在大量订户时,可能因为性能原因无法写入磁盘;

  2. 在订阅者端排队:这似乎合理一些,尽管内存耗尽也是订阅者的内存,而且无法赶上时阻塞的也是订阅者自身;

  3. HWM(High Water Mark)策略,发布者端设置高水位,当接收消息溢出,停止接收消息,消息要么丢失要么阻塞;

  4. 断开连接。对ZeroMQ做不到,因为订阅者对发布者是不可见的。

ZeroMQ采取的方法是说服过慢的订户自杀,例如发布者发布消息时,总是带有当前时间戳,订户收到消息时检查时间戳,如果时间相隔已经1s,那么就退出订户

后略。

补充

zmq::poller_t

前述C++版本使用的监听方法zmq::pollitem_t仍然是C风格的,随着ZeroMQ发展和兼容C++11后,更推荐使用zmq::poller_t<>wait_all获取事件(在更新版本的甚至支持add+lambda,但未普及,缺省),其比C风格优异的地方主要有:

  1. RAII动态管理监听数组,在跨线程使用上更安全;

  2. 支持zmq socket外的普通文件描述符监听

  3. 支持addremove增加和删除监听事件

  4. 内部实现了线程安全机制,add/remove/wait_all等能够跨线程使用而保证线程安全。

重点是第四点,C++习惯将一些免锁的特性称为线程安全,但是这要分开来看,线程安全并不意味着对象完全是免锁的,虽然zmq::poller_t的一些操作是线程安全,例如可以不加锁就去增加和删去poller监听事件,但是zmq::poller_t管理的对象并非是线程安全的,所以如果在不同的线程使用同一个poller同时去监听和处理输入输出,是会引起数据竞争的。

事实上在ZeroMQ中,能真正达到免锁的只有zmq::context,不同的zmq::context彼此是相互隔离的,其socket间也无法使用inproc等线程间高速套接字通信;然而官方并不建议在一个进程中大量新建多个context,因为context的构造本身就会在ZeroMQ内部创建IO线程,而且对socket间的数据交互、监听等十分不便。

zmq::poller的基本使用如下,单个poller可以监听多个socket,但一般不会对同一个socket同时监听读写(pollin和pollout),所以应该为每一个socket提前开辟1空间数组,以下代码展示了同时对两个socket实现监听:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//subscriber.cpp
#include <iostream>
#include <zmq.hpp>
#include <string>
#include <thread>

using namespace std;

#define within(num) (int)((float)num * random()/(RAND_MAX+1.0))

std::string s_recv(zmq::socket_t& socket, zmq::recv_flags flag = zmq::recv_flags::none){
zmq::message_t rzmsg;
socket.recv(rzmsg, flag);
return std::string(reinterpret_cast<const char*>(rzmsg.data()), rzmsg.size());
}

int main(){
srand((unsigned)time(NULL));
zmq::context_t context(1);
zmq::socket_t subscriber_1(context, zmq::socket_type::sub);
subscriber_1.connect("tcp://localhost:5555");

zmq::socket_t subscriber_2(context, zmq::socket_type::sub);
subscriber_2.connect("tcp://localhost:5556");

stringstream topic_1;
topic_1 << std::hex << std::uppercase <<std::setw(3) << std::setfill('0') << 999;
subscriber_1.set(zmq::sockopt::subscribe, topic_1.str());

stringstream topic_2;
topic_2 << std::dec <<std::setw(3) << std::setfill('0') << 999;
subscriber_2.set(zmq::sockopt::subscribe, topic_2.str());

zmq::poller_t <>pollers;
pollers.add(subscriber_1, zmq::event_flags::pollin);
pollers.add(subscriber_2, zmq::event_flags::pollin);

std::vector<zmq::poller_event<>> pevents; //监听事件
pevents.resize(2); //需要提前开辟
int cnt = 0;
while(true){
auto count = pollers.wait_all(pevents,std::chrono::milliseconds(1000));
if(!count){} //超时,没有监听到事件处理
else{ //事件处理
for(int i=0; i<count; i++){
if(i > pevents.size())
break;
zmq::poller_event<> pevent = pevents[i];
if(static_cast<int>(pevent.events) & static_cast<int>(zmq::event_flags::pollin)){
if(pevent.socket == subscriber_1){
string topic = s_recv(subscriber_1);
string data = s_recv(subscriber_1);
cout << "subscriber_1 Got:" << topic << "-" <<data <<endl;
}
else if(pevent.socket == subscriber_2){
if(cnt == 5){
pollers.remove(subscriber_2);
}
string topic = s_recv(subscriber_2);
string data = s_recv(subscriber_2);
cout << "subscriber_2 Got:" << topic << "-" <<data <<endl;
cnt++;
}
else{
cout << "Wrong" << endl;
}
}
}
}
}
return 0;
}

发布端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//publisher.cpp
#include <iostream>
#include <zmq.hpp>
#include <thread>
#include <sstream>

using namespace std;

#define within(num) (int)((float)num * random()/(RAND_MAX+1.0))

bool s_send(zmq::socket_t& socket, const std::string& data, zmq::send_flags flag = zmq::send_flags::none){
zmq::message_t zmsg(data.size());
memcpy(zmsg.data(), data.data(), data.size());
auto ret = socket.send(zmsg, flag);
return (bool)ret;
}

int main(){
srand((unsigned)time(NULL));
zmq::context_t context(1);
zmq::socket_t publisher_1(context, zmq::socket_type::pub);
publisher_1.bind("tcp://*:5555");

zmq::socket_t publisher_2(context, zmq::socket_type::pub);
publisher_2.bind("tcp://*:5556");

std::this_thread::sleep_for(std::chrono::seconds(2)); //等待connect

// for(int i=0; i<1000; i++){
// stringstream topic;
// topic << std::hex << std::uppercase <<std::setw(3) << std::setfill('0') <<i;
// s_send(publisher_1, topic.str(), zmq::send_flags::sndmore);
// s_send(publisher_1, string("Save Roger"));
// }

// for(int i=0; i<1000; i++){
// stringstream topic;
// topic << std::dec <<std::setw(3) << std::setfill('0') <<i;
// s_send(publisher_2, topic.str(), zmq::send_flags::sndmore);
// s_send(publisher_2, string("Save Lucy"));
// }

while(true){
std::this_thread::sleep_for(std::chrono::seconds(1)); //1s发一次主题
stringstream topic_1;
topic_1 << std::hex << std::uppercase <<std::setw(3) << std::setfill('0') << 999;
cout << topic_1.str() <<endl;
s_send(publisher_1, topic_1.str(), zmq::send_flags::sndmore);
s_send(publisher_1, string("Off His Head"));

std::this_thread::sleep_for(std::chrono::seconds(1)); //1s发一次主题
stringstream topic_2;
topic_2 << std::dec <<std::setw(3) << std::setfill('0') << 999;
cout << topic_2.str() <<endl;
s_send(publisher_2, topic_2.str(), zmq::send_flags::sndmore);
s_send(publisher_2, string("Off Her Head"));
}
return 0;
}