1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > Linux 编程 —— 进程间的通信 之 消息队列(zmq socket 学习笔记)

Linux 编程 —— 进程间的通信 之 消息队列(zmq socket 学习笔记)

时间:2022-12-21 21:55:50

相关推荐

Linux 编程 —— 进程间的通信 之 消息队列(zmq socket 学习笔记)

文档声明:

以下资料均属于本人在学习过程中产出的学习笔记,如果错误或者遗漏之处,请多多指正。并且该文档在后期会随着学习的深入不断补充完善。感谢各位的参考查看。

笔记资料仅供学习交流使用,转载请标明出处,谢谢配合。

如果存在相关知识点的遗漏,可以在评论区留言,看到后将在第一时间更新。

作者:Aliven888

1、ZMQ Socket

  Thread safety: ZeroMQ 的 socket 是非线程安全的,并且 ZeroMQ 本身不建议在多个线程中传递同一个 Socket,即使保证了线程同步。

  详细介绍,可以参考下这篇文章 /dvwei/p/3608119.html

  Socket types: ZeroMQ一共具有12种类型的socket,5种消息模式。

请求/应答模式:ZMQ_REQ、ZMQ_REP、ZMQ_DEALER、ZMQ_ROUTER 发布/订阅模式:ZMQ_PUB、ZMQ_SUB、ZMQ_XPUB、ZMQ_XSUB 管道模式:ZMQ_PUSH、ZMQ_PULL 配对模式:ZMQ_PAIR 本地模式:ZMQ_STREAM

Socket types:

  下面简单介绍下每一种 Socket types 的含义。

Socket option:

  概要:通过zmq_setsockoptzmq_getsockopt函数来设置/读取指定选项。

//设置参数,有两个接口(1)int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);(2)zmp::socket_t::setsockopt(int option_name, const void *option_value);//获取设置参数,有两个接口(1)int zmq_getsockopt (void *socket, int option_name, void *option_value, size_t *option_len);(2)zmp::socket_t::setsockopt(int option_name, void *option_value);//下面举例只有 设置 的, 获取的用法一样。eg:int iTimeOut = 1;zmq::context_t ctx(1);void *sock = zmq_socket(ctx, ZMQ_DEALER);assert(qskt);zmq_setsockopt(sock, ZMQ_SNDTIMEO, (const void *)iTimeOut, sizeof(int));// orzmq::socket_t sock(ctx, ZMQ_DEALER);int iTimeOut = 1;sock.setsockopt(ZMQ_SNDTIMEO, (const void *)iTimeOut);

  ZMQ是一种基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。

&emsap; ZMQ 并不是一个对 socket 的封装,不能用它去实现已有的网络协议。它有自己的模式,不同于更底层的点对点通讯模式。

  它有比 tcp 协议更高一级的协议。(当然 ZeroMQ 不一定基于 TCP 协议,它也可以用于进程间和进程内通讯)它改变了通讯都基于一对一的连接这个假设。

2、常用的通讯模式只有三类:

2.1、请求回应模型:

  由请求端发起请求,并等待回应端回应请求。从请求端来看,一定是一对对收发配对的;反之,在回应端一定是发收对。请求端和回应端都可以是 1:N 的模型。通常把 1 认为是 server ,N 认为是 Client 。ZeroMQ 可以很好的支持路由功能(实现路由功能的组件叫作 Device),把 1:N 扩展为 N:M (只需要加入若干路由节点)。从这个模型看,更底层的端点地址是对上层隐藏的。每个请求都隐含有回应地址,而应用则不关心它。

2.1.1、ZMQ_REQ

  一般用于客户端发送请求消息,此类型的 Socket 必须严格要求先发送后接收的顺序。如果发生异常或者当前没有可用的服务(连接),Socket 会阻塞,直到有可用的服务(新的连接到来),再把这个消息发送出去。ZMQ_REQ 类型的 Socket 不会丢弃数据消息的。

  ZMQ_REP 发送消息时会自动在消息顶部插入一个空帧。

特点总结:

可兼容的 Socket types: ZMQ_REP, ZMQ_ROUTER数据传输: 双向发送/接收模式: 发送 --> 接收 --> 发送 …发送路由策略: Round-robin(循环队列)接收路由策略: Last peer进入mute状态后: 阻塞

2.1.2、ZMQ_REP

  一般用于服务端接收消息,此类型的 Socket 必须严格遵循先接收后发送的顺序,即:从客户端接收请求消息使用了公平队列,回应客户端时,所有的 reply 都会被路由到最后下达请求的客户端。如果发生异常或者当前没有可用的客户端连接,所有消息都会毫无提示的被丢弃,不会发生阻塞。

特点总结:

可兼容的 Socket types : ZMQ_REQ, ZMQ_DEALER数据传输: 双向发送/接收模式: 接收 --> 发送 --> 接收…发送路由策略: Last peer接收路由策略: Fair-queued(公平队列)

2.1.3、ZMQ_DEALER

  ZMQ_DEALER 是一种用于请求/答应模式的更高级的扩展 Socket,它可以自由的收发消息,没有 ZMQ_REP/ZMQ_REQ 那样的限制。对于每一个连接,接收消息也是使用了公平队列,发送使用了循环队列(RR)。ZMQ_DEALER 受 ZMQ_RCVHW 和 ZMQ_SNDHW 两个阀值影响(可通过zmq_setsockopt 函数设置),一旦发送或接收队列达到阀值,Socket 就会进入 mute 状态,此时对 DEALER 的任何 zmq_send 操作都会阻塞,直到 mute 状态结束。

  如果当前没有有效的连接,zmq_send操作也会阻塞,直到有新的连接到来为止。DEALER发生阻塞并不会丢弃消息。

注意:如果ZMQ_DEALER连接到ZMQ_REP,每一个消息包必须包含一个空帧,然后再紧跟着数据包体。

特点总结:

可兼容的 Socket types: ZMQ_ROUTER, ZMQ_REP, ZMQ_DEALER数据传输: 双向发送/接收模式: 无限制发送路由策略: Round-robin(循环队列)接收路由策略: Fair-queued(公平队列)进入mute状态后: 阻塞

2.1.4、ZMQ_ROUTER

  ZMQ_ROUTER 是一种用于请求/答应模式的更高级的扩展 Socket,它可以自由的收发消息。当 ZMQ_ROUTER 接收到消息时,会自动在消息顶部加入来源地址标识符,接收消息使用了公平队列。当发送消息时,ZMQ_ROUTER 又会自动去掉这个标识符,并且根据这个标识符路由到相应的端点。

 如果此地址标识的端点不存在,默认会毫无征兆的丢弃消息,除非 ZMQ_ROUTER_MANDATORY 选项设置为1。当队列达到阀值时,Socket 就会进入 mute 状态,此时所有后续发送到此Socket的消息都会被丢弃,直到 mute 状态结束。同样的,如果对端的接收队列达到了阀值,消息也会被丢弃。

  如果 ZMQ_REQ 连接到 ZMQ_ROUTER,从 ZMQ_ROUTER 接收到 ZMQ_REQ 的消息时,除了会在消息前加上来源地址标识符之外,还会加上一个空帧与原消息分隔,因此消息可以包含多个地址标识符和多个数据包体,如:地址和数据体之间必须用空帧分隔;

发送回应消息给ZMQ_REQ时,必须至少包含一个空帧;

发送消息时,ZMQ_ROUTER会根据第一个地址标识符路由到对应的端点;

特点总结:

可兼容的 Socket types: ZMQ_DEALER, ZMQ_REQ, ZMQ_ROUTER数据传输: 双向发送/接收模式: 无限制接收路由策略: Fair-queued(公平队列)进入mute状态后: 丢弃消息

2.1.5、实伪代码:

/************** Clinet 实现伪代码 ***************************/zmq::context_t ctx(1);zmq::socket_t s(ctx, ZMQ_DEALER);s.connect("tcp://127.0.0.1:80001");zmq::message_t msg(1024); // 定义变量并申请空间memset (msg.data (), 0, 1024); // 初始化空间memcpy((void *)msg.data(), "hello", 5); //赋值//先发送,后回复while(true){s.send(msg)s.recv(&msg)}/************** Server 实现伪代码 ***************************/zmq::context_t ctx(1);zmq::socket_t s(ctx, ZMQ_DEALER);s.bind ("tcp://127.0.0.1:80001"); //绑定端口zmq::message_t msg(1024);// 定义变量并申请空间memset (msg.data (), 0, 1024); 初始化空间//先接收,后回复while(true){s.recv(&msg)s.send(msg)}

请求应答模式案例:

Client 端:

// zmq_send.cpp#include <stdio.h>#include <zmq.hpp>#include <thread>#include <unistd.h>#include <string.h>#include <iostream>using namespace std;#define NET_INFO_LEN 30 //网络信息#define SEND_DATA_LEN 1024 //雷达数据长度void zmqSend(){zmq::context_t context;context.setctxopt(ZMQ_IO_THREADS, 1); // 设置参数zmq::socket_t sock(context, ZMQ_DEALER);sock.setsockopt(ZMQ_SNDTIMEO, 1000); // 设置超时等待时间sock.setsockopt(ZMQ_SNDBUF, 1024); //设置消息缓存空间sock.setsockopt(ZMQ_SNDHWM, 50);//设置最大消息缓存sock.connect("tcp://127.0.0.1:20001"); // 设置网络信息while(true){if(!sock.connected()) //判断连接是否建立成功{std::cout << "connect err." << std::endl;break;}// 定义变量, 发送数据zmq::message_t message(SEND_DATA_LEN);memset(message.data(), 'A', SEND_DATA_LEN);zmq::detail::recv_result_t result = sock.send(message, zmq::send_flags::none);std::cout << "send size = " << static_cast<std::uint32_t>(result.value()) << std::endl;sleep(1); // 休眠等待//因为不关心回复结果,这里省略//sock.recv(&message, zmq::recv_flags::none);}//关闭连接context.close();sock.close();}int main(int argc, char *argv[]){// 开启工作线程thread th(zmqSend);th.join();return 0;}

Server 端:

// zmq_recv.cpp#include <stdio.h>#include <zmq.hpp>#include <thread>#include <unistd.h>#include <string.h>#include <iostream>using namespace std;#define NET_INFO_LEN 30 //网络信息#define SEND_DATA_LEN 483200 //雷达数据长度void zmqRecv(){zmq::context_t context;context.setctxopt(ZMQ_IO_THREADS, 1); // 设置参数zmq::socket_t sock(context, ZMQ_DEALER);sock.setsockopt(ZMQ_RCVTIMEO, 1000); // 设置超时等待时间sock.bind("tcp://*:20001"); // 设置监听端口while(true){if(!sock.connected()) // 判断连接状态{std::cout << "connect err." << std::endl;break;}// 定义变量,接收数据zmq::message_t message;zmq::detail::recv_result_t result = sock.recv(message, zmq::recv_flags::none);std::cout << "recv size = " << static_cast<std::uint32_t>(result.value()) << std::endl;//因为不关心结果回复,这里省略//sock.send(message, zmq::send_flags::none);}//关闭连接context.close();sock.close();}int main(int argc, char *argv[]){// 开启工作线程thread th(zmqRecv);th.join();return 0;}

编译脚本

#!/bin/bash# 编译 client 端g++ -std=c++11 -g zmq_send.cpp -o zmqSend -lzmq -lpthread# 编译 server 端g++ -std=c++11 -g zmq_recv.cpp -o zmqRecv -lzmq -lpthread

2.2、发布订阅模型:

  这个模型里,发布端是单向只发送数据的,且不关心是否把全部的信息都发送给订阅端。如果发布端开始发布信息的时候,订阅端尚未连接上来,这些信息直接丢弃。不过一旦订阅端连接上来,中间会保证没有信息丢失。同样,订阅端则只负责接收,而不能反馈。如果发布端和订阅端需要交互(比如要确认订阅者是否已经连接上),则使用额外的 socket 采用请求回应模型满足这个需求。

  ZeroMQ的订阅发布模式是一种单向的数据发布,当客户端向服务端订阅消息之后,服务端便会将产生的消息源源不断的推送给订阅者。

参考文章:/mysky007/p/12288729.html

特点:

1.一个发布者,多个订阅者的关系,1:n;

2.当发布者数据变化时发布数据,所有订阅者均能够接收到数据并处理,这就是发布/订阅模式。

注意事项:

使用SUB设置一个订阅时,必须使用zmq_setsockopt()对消息进行过滤zmq_setsockopt (zmq_socket, ZMQ_SUBSCRIBE, filter<订阅信息>, strlen (filter))

//这里是需要设置channel,也就是发送的数据的开头,这里设置为nullptr,//长度为 0 ,意思是对接收的消息不过滤socket.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0); //只接收以AB开头的消息,2为AB的长度。socket.setsockopt(ZMQ_SUBSCRIBE, "AB", 2)

发布者使用PUB套接字将消息发送到队列中,订阅者使用SUB套接字从队列中源源不断的接收消息。新的订阅者可以随时加入,但之前的消息是无法接收到的;已有的订阅者可以随时退出;订阅者还可以添加“过滤器”用来有选择性的接收消息。

2.2.1、ZMQ_PUB

  ZMQ_PUB 类型的 Socket 以发布者的身份向订阅者分发消息,消息以扇出的形式发送给所有订阅者连接。

  ZMQ_PUB 类型的 Socket 没有实现 zmq_recv 函数,所以不能对其调用 zmq_recv 函数!

  当 ZMQ_PUB 的 Socket 达到阀值时进入 mute 状态,此时后续发送的消息会被丢弃,知道 mute 状态结束。

  对 ZMQ_PUB 的 Socket 调用 zmq_send 永远不会发生阻塞。

特点总结:

可兼容的Socket types: ZMQ_SUB, ZMQ_XSUB数据传输: 单向发送/接收模式: 只能发送接收路由策略: Fan out(扇出)进入mute状态后: 丢弃消息

2.2.2、ZMQ_SUB

  ZMQ_SUB 类型的 Socket 以订阅者的身份接收消息。初始的 ZMQ_SUB 的 Socket 没有订阅任何消息,可以通过设置 ZMQ_SUBSRIBE 选项来指定需要订阅的消息。

  ZMQ_SUB 的 Socket 没有实现 zmq_send 函数,所以不能对其调用 zmq_send 函数!

特点总结:

可兼容的Socket types: ZMQ_PUB, ZMQ_XPUB数据传输: 单向发送/接收模式: 只能接收接收路由策略: Fair-queued(公平队列)

2.2.3、实现伪代码:

/************** 发布端 实现伪代码 ***************************/void pubImage(){zmq::context_t context(1);zmq::socket_t socket(context, ZMQ_PUB);socket.bind("tcp://192.168.0.100:5555"); // 这里是绑定地址while (true) {zmq::message_t msg(data_encode.size());memcpy(msg.data(), data_encode.data(), data_encode.size());socket.send(msg); // 发送数据}}/************** 订阅端 实现伪代码 ***************************/void subscribe(){zmq::context_t ctx(1);zmq::socket_t socket(ctx, ZMQ_SUB);socket.connect("tcp://192.168.0.100:5555");这里是需要设置channel,也就是发送的数据的开头,这里设置为nullptr,长度为0 //意思是对接收的消息不过滤,如果设置为 socket.setsockopt(ZMQ_SUBSCRIBE, "AB", 2) //只接收以AB开头的消息,2为AB的长度。socket.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0); while (true) {std::cout <<"subscribe ..." << std::endl;zmq::message_t request;socket.recv(&request);}}

2.3、管道模型:

  这个模型里,管道是单向的,从 PUSH 端单向的向 PULL 端单向的推送数据流。因为这个模型几乎不怎么使用了,我就没有特意去了解,后面等有时间学习了这方面的知识后在做更新。

3、Socket options(部分)

  通过 zmq_setsockopt 和 zmq_getsockopt 函数来设置/读取指定选项。

3.1、ZMQ_SNDHWM

设置指定 Socket 的发送消息队列的高水位标识(阀值),ZMQ 会严格限制发送队列的上限数。0 表示无限制。

如果达到了这个限制,Socket 就会进入异常状态,ZMQ 此时会采取适当的措施——阻塞或丢弃消息,这取决于 Socket 的类型。

注意:ZMQ 不保证 Socket 一定能接受 ZMQ_SNDHWM 这么多的消息,甚至可能会低60%-70%,这取决于socket上的信息流。

3.2、ZMQ_RCVHWM

设置指定 Socket 的接受消息队列的高水位标识(阀值),ZMQ 会严格限制接受队列的上限数。0表示无限制。

如果达到了这个限制,socket 就会进入异常状态,ZMQ 此时会采取适当的措施——阻塞或丢弃消息,这取决于 socket 的类型。

3.3、ZMQ_SUBSCRIBE

ZMQ_SUBSCRIBE 选项会在 ZMQ_SUB 的 Socket 上建立一个消息过滤器。初始的 ZMQ_SUB 的 Socket 会过滤掉所有的消息,因此必须设置这个选项,否则将收不到任何消息。

如果设置一个 0 长度(option_value)的空值(option_value),ZMQ_SUB 的 Socket 会接受所有的消息。设置一个非空值将接受指定的消息。可以在同一个 ZMQ_SUB 的 Socket 上设置多个过滤器,它将会接受至少一个匹配的消息。

3.4、ZMQ_UNSUBSCRIBE

此选项用来删除 ZMQ_SUB 的 Socket 上通过 ZMQ_SUBSCRIBE 设置过的消息过滤器。如果 Socket 有多个实例有相同的过滤器,只删除其中一个。

3.5、ZMQ_IDENTITY

此选项用来设置 Socket 的身份标识,只能用于请求/答应模式。ROUTER 的 Socket 可以根据这个身份标识来路由信息。

身份标识的长度规定在 1-255 bytes. 由二进制零开头的标识符为 ZMQ 保留使用。

如果两个身份标识相同的Socket连接到同一个对端(ROUTER),结果行为是未定义的。

3.6、ZMQ_RCVTIMEO

设置 Socket 的 receive 操作的超时。

如果为0,则 zmq_recv 会立即返回,如果没有接收到消息,会返回一个 EAGAIN 错误;

如果为-1,Socket 会阻塞到有可用消息为止;

如果为其他值,Socket 要么阻塞达到指定的时间还没接收到可用的消息,返回一个 EAGAIN 错误,要么在指定时间前接收到可用消息。

3.7、ZMQ_SNDTIMEO

设置 Socket 的 Send 操作的超时。

如果为 0,则 zmq_send 会立即返回,如果消息没有发送成功,会返回一个EAGAIN错误;

如果为 -1,Socket 会一直阻塞到消息消息发送完毕;

如果为其他值,Socket 要么阻塞达到指定的时间还没发送完成,返回一个EAGAIN错误,要么在指定时间前发送完消息。

参考文章

/mysunshinexia01/article/details/80871696

/dvwei/p/3608119.html

/

/mysky007/p/12288729.html

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。