1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > C++ 多线程通信方式简介并结合生产者-消费者模式代码实现

C++ 多线程通信方式简介并结合生产者-消费者模式代码实现

时间:2019-07-06 03:11:18

相关推荐

C++ 多线程通信方式简介并结合生产者-消费者模式代码实现

C++ 多线程通信方式简介并结合生产者-消费者模式代码实现

一,多线程通信1,全局变量2,自定义消息3,std::promise与std::future(c++11)4,IO完成端口 二,生产者-消费者模式三,代码实现1,使用场景介绍2,.h声明3,cpp实现

一,多线程通信

一般而言,应用程序中的一个某一个线程总是为另一个线程执行特定的任务,这样,这两个线程之间必定有一个信息传递的渠道。这种线程间的通信不但是难以避免的,而且在多线程编程中也是复杂和频繁的,下面将进行说明。

1,全局变量

通过全局变量来访问共同的数据段,这个全局变量可以是类中的成员变量(线程函数是类的成员函数),也可以是类外的实际全局变量(整个程序中所有地方都可以访问到)。多个线程访问同一个数据,那么就会涉及到一个问题:线程互斥,当一个线程在读,一个线程在写的时候,会有冲突,这个时候就需要用到线程互斥的处理方式了:锁(此处不做过多介绍,本文主要讲解多线程通信)

2,自定义消息

借助于windows程序的消息通信机制,当两个线程之间至少有一个为UI线程,那么就可以直接通过SendMessage或者PostMessage来发送消息到指定线程进行响应。这种方法涉及到线程的切换,如果SendMessage/PostMessage指定的窗口是由调用线程创建,那么就是一个普通的子程序;如果指定窗口由另一个线程创建,也即UI线程,那么系统会挂起当前工作线程,切换到ui线程,并调用合适的窗口过程(PostMessage则直接进消息队列)。

3,std::promise与std::future(c++11)

c++11新新特性中提出了std::promise和std::future,它们可以搭配使用来达到单次数据交换的目的

#include <iostream> // std::cout#include <functional>// std::ref#include <thread> // std::thread#include <future> // std::promise, std::futurevoid print_int(std::future<int>& fut) {int x = fut.get(); // 获取共享状态的值.std::cout << "value: " << x << '\n'; // 打印 value: 10.}int main (){std::promise<int> prom; // 生成一个 std::promise<int> 对象.std::future<int> fut = prom.get_future(); // 和 future 关联.std::thread t(print_int, std::ref(fut)); // 将 future 交给另外一个线程t.prom.set_value(10); // 设置共享状态的值, 此处和线程t保持同步.t.join();return 0;}

4,IO完成端口

见我另外一篇博客IO完成端口的使用

二,生产者-消费者模式

生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。

生产者是一个线程,用来产生数据,消费者是一个线程,用来消费数据。生产线程源源不断地生产出数据, 消费线程也源源不断地取出数据,很明显,它们之间所共享的数据应该使用队列结构来实现(FIFO),生产者-消费者模式的实现有两个难点:1,共享数据的保护,避免多线程读写冲突;2,当队列为空时,消费者线程应该在等待生产者线程的通知,再去进行下一步操作。

三,代码实现

1,使用场景介绍

RTSP点播是一种实时视频流推送协议,当我们连接上RTSP源后,会接收到rtsp数据,然后需要对这些rtsp数据进行解码操作,转换为RGB或者YUV数据,然后显示到我们的窗口上。这个场景就涉及到三个线程:

1,RTSP流接收线程:接收到来自RTSP源推送的RTSP数据

2,解码线程:将RTSP线程接收到的RTSP数据解码得到RGB/YUV

3,UI线程:将解码得到的RGB/YUV数据绘制位图显示到窗口上

2,.h声明

首先定义一个RTSP工作类,因为本文只是描述一个整体的架构设计,所以功能代码的细节就都省略掉。

class RtspWork{public:RtspWork(PullStreamHandler* handler = nullptr){handler_ = handler;}public:void DoWork(){//rtsp的一些流程//假设注册的回调为OnGetFrame//....do{//....int nLength = 255;byte *pByte = new byte[nLength];OnGetFrame(pByte, nLength);} while (1);}void OnGetFrame(byte* pByte, int nLength){if (handler_){handler_->NoticeStream(pByte, nLength);}}private:PullStreamHandler* handler_;};

PullStreamHandler是一个纯虚类,用于两个类之间的交互(避免单纯的使用类对象或者类指针)。

//假设这个是rtsp拉流后的回调接口class PullStreamHandler{public:virtual void NoticeStream(byte* pByte, int nLength) = 0;};

然后再就是整个播放管理类,线程的创建,RTSP流的解码等操作都在这里进行

class PlayManager: public PullStreamHandler{public:PlayManager(const std::string &strUrl, HWND hWnd);~PlayManager();private://点播rtsp流的线程,接收到的rtsp流的数据加到队列中void PullStreamThread();//解码rtsp流的线程,从队列中取rtsp流数据,进行解码操作,然后把解码后得到的RGB数据传到UI线程进行界面渲染void DecodeThread();//解码操作void DoDecode(StreamInfo & info);private://栈的多线程操作void PushData(const StreamInfo& info);bool PopData(StreamInfo& info);bool IsDequeEmpty();public:virtual void NoticeStream(byte* pByte, int nLength);private:HWND hwnd_;//渲染窗口句柄std::deque<StreamInfo> s_deque;//保存视频流信息std::mutex s_mutex; //用于两个大线程std::mutex s_mutex2;//用于队列(主要是因为用到empty()函数,也可能存在多线程互斥的问题)//无效的锁,造成性能损耗//比如DecodeThread一直在等待,每次等待都会锁定,但是这个时候如果没数据到来,刚好在它锁的时候,//PullStreamThread数据到来了,会造成延时,所以这个地方考虑到用condition_variablestd::condition_variable m_ConVar;//通过条件变量来通知其他线程(配合std::unique_lock使用)std::thread m_TrackerThread;std::thread m_DetectorThread;};

3,cpp实现

PlayManage的构造函数主要实现线程的启动,当然肯定也可以不这么做,另外提供接口也是一样的,此处为了简单,就放在构造函数里了

PlayManager::PlayManager(const std::string &strUrl, HWND hWnd){if (!strUrl.empty()){hwnd_ = hWnd;m_TrackerThread = std::thread(std::bind(&PlayManager::PullStreamThread, this));m_DetectorThread = std::thread(std::bind(&PlayManager::DecodeThread, this));m_TrackerThread.join();m_DetectorThread.join();}}PlayManager::~PlayManager(){}

拉流线程PullStreamThread很简单,因为主要的工作都在RtspWork里实现了

void PlayManager::PullStreamThread(){//在这里处理rtsp点播的一系列流程//然后通过PullStreamHandler来上报结果//DoWork()是个阻塞函数RtspWork rtspWork(this);rtspWork.DoWork();}

在RtspWork.DoWork()中,拉到视频流后,通知到PlayManage,因为我们整个线程的创建,锁等变量也都是在该类中实现的,所以最后还是切换到PlayManage中来(通过纯虚类实现)

void PlayManager::NoticeStream(byte* pByte, int nLength){if (pByte == NULL || nLength <= 0)return;std::unique_lock<std::mutex> lk(s_mutex);StreamInfo info;info.nLength = nLength;info.pByte = new byte[nLength];memcpy(info.pByte, pByte, nLength);PushData(info);//这一个看情况是外面释放,还是里面释放delete[]pByte;// 通过条件变量通知其它等待的线程 m_ConVar.notify_all();lk.unlock();}

大家在这里看到了m_ConVar.notify_all()这一行代码可能还不明白,接着往下看,解码线程的工作。

void PlayManager::DecodeThread(){do{std::unique_lock<std::mutex> lk(s_mutex);while (IsDequeEmpty())m_ConVar.wait(lk); if (!s_deque.empty()){StreamInfo info;PopData(info);//对info.pByte进行解码,假设结果还是存在info里;DoDecode(info);//第一种多线程通信方式:SendMessage/PostMessage//对stream进行操作,最简单的就是发消息到HWND上,然后在里面绘图SendMessage(hwnd_, WM_USER + 1, (WPARAM)info.pByte, (LPARAM)info.nLength);if (info.pByte){delete[] info.pByte;info.pByte = nullptr;}}lk.unlock();} while (1);}

解码线程是一个循环,不停地读取数据并进行解码操作,当第一次线程启动时,这两个线程启动的线程顺序是不定的,我们需要在拉流线程(生产者)生产出了数据,然后再去通知解码线程(消费者)去进行解码操作,避免解码线程在作无效的工作(甚至于后面的每次队列为空的情况都是这样的)。至此,生产者-消费者模式实现的两个难点也都解决了。

本程序中用到了两个锁,

1,s_mutex是两个线程的大锁,用于和std::condition_variable m_ConVar搭配使用来实现线程的通知(生产者生产出了数据,告诉消费者,避免消费者傻傻地循环)

2,s_mutex2,主要用于队列数据的保护,此处用的是STL中的deque结构,而STL并没有对这些结构做多线程保护,为了安全起见,所以给它加了一层。

附上完整代码链接:Demo下载

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。