1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > 基于线程池+Epoll(IO多路复用)的高并发服务器(附C++源码)

基于线程池+Epoll(IO多路复用)的高并发服务器(附C++源码)

时间:2023-04-26 03:19:19

相关推荐

基于线程池+Epoll(IO多路复用)的高并发服务器(附C++源码)

文章目录

前言一、编译环境二、使用步骤1、编译2、启动运行服务器端启动方式:客户端启动方式: 3、运行结果:客户端终端页面:服务器端终端页面: 二、项目源码线程池 thread_pool.h通用头文件 general.h服务器 epserver.cpp 总结

前言

简介:这是一个基于线程池+Epoll的IO多路复用、非阻塞IO技术,的高并发服务器框架。称它为框架是因为服务器功能非常简单且无意义,使用时应该把这个功能替换掉。 但它可以直接拿来使用,很方便,所以也算半个项目。服务器的默认功能:客户端输入什么内容,服务器就返回什么内容。后续想实现什么功能,修改功能函数就可以。语言标准:C++11。尤其是线程池,用了好多C++11的精髓

一、编译环境

CentOS 7.6g++ 4.8

二、使用步骤

1、编译

g++ -std=c++11 -pthread epserver.cpp -o epserver# -std=c++11 按照c++11的标准编译# -pthread 引入外部线程库# EpollServer简称epserver,main主函数在这里面

2、启动运行
服务器端启动方式:

./epserver 9090# 启动服务器,指定监听的端口,我这里使用 9090 端口

客户端启动方式:

telnet 127.0.0.1 9090# telnet是系统自带的通讯协议/远程登陆工具# 127.0.0.1 是本机的IP地址,9090 是服务器监听的端口,

3、运行结果:
客户端终端页面:

[root@centOS output]# telnet 127.0.0.1 9090Trying 127.0.0.1...Connected to 127.0.0.1.Escape character is '^]'.jl g jlfdg // 这一句自己输入的jl g jlfdg // 这一句是服务器返回来的告诉对方 梵蒂冈的 jlk // 这一句自己输入的告诉对方 梵蒂冈的 jlk // 这一句是服务器返回来的^]// 键入Ctrl+]telnet> quit // 关闭telnet客户端Connection closed.[root@centOS output]#

服务器端终端页面:

[root@centOS HTTPserver]# ./httpserver 9090server main start ...[INFO][24:55:15][LISTEN_FD:[3] IP:[0.0.0.0]:[9090]][httpserver.cpp][39][INFO][24:55:21][COMM_FD:[5] IP:[127.0.0.1]:[37266] join][httpserver.cpp][185]IP: [127.0.0.1]:[37266]# jl g jlfdg // 客户端发送的IP: [127.0.0.1]:[37266]# 告诉对方 梵蒂冈的 jlk // 客户端发送的[INFO][24:55:51][IP:[127.0.0.1]:[37266] left][httpserver.cpp][202]

二、项目源码

线程池 thread_pool.h

这是基于C++11来编写的线程池,唯独不支持使用类中的非static函数创建线程,但支持其他任何函数或者类似函数的东西。使用起来非常方便。CSDN\Github中有很多线程池,借一个来用。

// Linux 线程池#pragma once#ifndef THREAD_POOL_H#define THREAD_POOL_H#include <iostream>#include <stdlib.h>#include <queue>#include <mutex> // 锁#include <thread> // 线程#include <condition_variable> // 条件变量#include <functional> // 接受所有类似函数的东西#include <future> // 来自未来的值#include <atomic> // 原子操作#include <memory> // 智能指针#include <stdexcept> // C++异常static const size_t THREAD_SIZE = 5; // 默认线程数量// 线程池class ThreadPool{public:ThreadPool(size_t);template <class F, class... Args>auto enqueue(F &&f, Args &&...args)-> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();private:// need to keep track of threads so we can join themstd::vector<std::thread> workers;// the task queuestd::queue<std::function<void()>> tasks;// synchronizationstd::mutex queue_mutex;std::condition_variable condition;std::atomic<bool> stop;};// the constructor just launches some amount of workersinline ThreadPool::ThreadPool(size_t threads = THREAD_SIZE): stop(false){threads = threads < 1 ? 1 : threads;for (size_t i = 0; i < threads; ++i)workers.emplace_back([this]{for (;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this]{return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});}// add new work item to the pooltemplate <class F, class... Args>auto ThreadPool::enqueue(F &&f, Args &&...args)-> std::future<typename std::result_of<F(Args...)>::type>{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){(*task)(); });}condition.notify_one();return res;}// the destructor joins all threadsinline ThreadPool::~ThreadPool(){this->stop.store(true);condition.notify_all();for (std::thread &worker : workers)worker.join();}#endif

线程池的使用方法:

// 线程池使用方法#include <需要用到的系统头文件>#include "thread_pool.h"using namespace std;int myadd(int a, int b){return a + b;}int main(){// 初始化 5 个线程ThreadPool th_pool(5);// 把需要运行的任务函数的指针、函数的参数,提交进任务队列future<int> f1 = th_pool.enqueue(myadd, 6, 3);/*提交进任务队列后任务会自动执行,如果想获取任务函数的返回值,则使用future 中的get()方法 注意:在主线程中调用get方法,如果线程没有执行完成,主线程会在这里阻塞,直到线程执行完成,所以要权衡利弊合理使用。*/int ret = f1.get();cout << ret << endl; // 终端会输出数字 9return 0;}

通用头文件 general.h

进行网络编程需要用到很多头文件,为了图方便,索性把可能用到的头文件都放在这里了。虽然有一些用不到,也有可能会缺少某些头文件,到时候再修改就是了。

其中两个我经常使用的函数: LOG 和 sys_error

// general 通用的东西都放在这里#pragma once#ifndef GENERAL_H#define GENERAL_H// 网络编程头文件// Windows环境下头文件#ifdef _WIN32#include <WinSock2.h>#include <Windows.h>#pragma comment(lib,"ws2_32.lib")// Linux环境下头文件#else#include <string.h>#include <sys/wait.h>// Linux进程等待#include <errno.h>// 错误处理#include <signal.h>// 信号#include <sys/types.h> // 提供各种数据类型#include <sys/socket.h> // 套结字函数和结构#include <netinet/in.h> // sockaddr_in 结构#include <arpa/inet.h> // IP 地址转换 htons#include <netdb.h>// 域名转换 inet_ntop#include <sys/ioctl.h> // I/O控制的函数#include <sys/poll.h> // socket 等待测试机制#include <unistd.h>// 标准IO,进程等等#include <pwd.h> // 对系统某些文件的访问#include <sys/epoll.h>// epoll IO多路复用#include <sys/stat.h>// 获取文件信息#include <fcntl.h>// open 等关于文件的函数#include <sys/sendfile.h>// 零拷贝,在两个内核之间传输数据#endif// 跨平台通用头文件#include <iostream>#include <sstream>#include <fstream>#include <vector>#include <string>#include <cassert>#include <stdio.h>#include <memory>#include <ctime>#include <atomic>#include <mutex>#include <algorithm>#include <unordered_map>const size_t LISTENQ = 1024;const size_t MAXLINE = 256; // 一行的最大长度/*INFO: 表示正常的日志输出,一切按预期运行。WARNING: 表示警告,该事件不影响服务器运行,但存在风险。ERROR: 表示发生了某种错误,但该事件不影响服务器继续运行。FATAL: 表示发生了致命的错误,该事件将导致服务器停止运行。*/#define INFO 1#define WARNING 2#define ERROR 3#define FATAL 4// 输出事件日志 --参数顺序:日志等级/日志信息// 输出顺序:cout--事件级别--事件发生的时间--事件信息--事件所在的文件--所在的行号--endl;#define LOG(level, message) Log(#level, message, __FILE__, __LINE__)// 自己的错误处理包装函数,只有发生严重错误时才调用它// 先打印错误信息,然后 exit(-1)void sys_error(const char* ptr){perror(ptr);exit(-1);}// 打印日志 --参数顺序:/日志等级/错误信息/文件名/行号/static void Log(std::string level, std::string message, std::string file_name, int line){time_t curr_t = time(nullptr);// 获取时间戳struct tm* curr_tm = gmtime(&curr_t);// 把时间戳转换为 时区为 0 的时间结构curr_tm->tm_hour += 8;// 小时 +8 才是东八区北京//const char* myformat = "%Y-%m-%d %H:%M:%S";// 自己的输出格式const char* myformat = "%H:%M:%S";// 定义自己的输出格式char mytime[25] = {0 };// 储存时间strftime(mytime, 25, myformat, curr_tm);// 把时间结构按照自己的输出格式转换// 打印顺序:事件级别--事件发生的时间--事件信息--事件所在的文件--所在的行号--\nstd::cout << "[" << level << "][" << mytime << "][" << message << "][" << file_name << "][" << line << "]" << std::endl;}#endif

服务器 epserver.cpp

封装了一个EpollServer类,是一个单例模式,对外只提供三个接口:

#接口一:单例模式的访问接口,访问这个类时必须调用这个接口EpollServer::get_server();#接口二:EpollServer类的初始化,使用之前必须初始化#,port:监听端口;thread_size:线程池中线程的数量(默认是5)epoll_server_init(int port, size_t thread_size = 5);# 接口三:运行接口,不需要参数也没有返回值(因为出现严重错误服务器自己会挂的hhh)void epoll_run();# 如何修改服务器的功能?1# 编写自己的功能函数,例如我编写的是 _server_io() 函数2# 把自己编写的函数,提交到私有函数 _epoll_run() 中的进线程池,例如:...code...size_t comm_fd = revts[i].data.fd;_th_pool->enqueue(_server_io, comm_fd);...code...

// Epoll 服务器端// 编译方法# g++ -std=c++11 -pthread httpserver.cpp -o httpserver#include <unordered_map>#include "general.h"#include "thread_pool.h"#include "http_general.hpp"using namespace std;static const size_t SER_PORT = 9090; /* 默认端口号 */static const size_t EPOLL_SIZE = 1024;static const size_t REVTS_SIZE = 128; /* 可执行事件容器的大小 */// 处理HTTP事务static void httpserver_io(size_t comm_fd);// Epoll + 线程池 高并发服务器class EpollServer{private:int _listen_fd;/* 监听套接字 */string _ipaddr;/* 点分十进制 IP 地址 */int _port;/* 监听端口号 */int _epoll_fd;/* epoll 模型 */shared_ptr<ThreadPool> _th_pool;/* 线程池 */public:// key: accept()产生的服务套接字;value:first: 客户端IP地址,second: 客户端的端口号unordered_map<size_t, pair<string, size_t>> _comm_fd_map;mutex _map_mutex;/* 在线程中操作映射的时候需要加锁 */public:// 单例模式 全局访问接口static EpollServer& get_server(){static EpollServer _epoll_server;return _epoll_server;}// 初始化服务器void epoll_server_init(int port, size_t thread_size = 5){_listen_fd = -1;_port = port;_epoll_fd = -1;_th_pool = make_shared<ThreadPool>(thread_size);// 初始化 套接字_socket_bind_listen();LOG(INFO, "LISTEN_FD:[" + to_string(_listen_fd) + "] IP:[" + _ipaddr + "]:[" + to_string(_port) + "]");}EpollServer(EpollServer&) = delete;EpollServer(EpollServer&&) = delete;EpollServer& operator=(EpollServer&) = delete;EpollServer& operator=(EpollServer&&) = delete;// 开始运行服务器void epoll_run(){// 初始化 epoll 模型_epoll_fd = epoll_create(EPOLL_SIZE);if (-1 == _epoll_fd){LOG(FATAL, "epoll_create error");sys_error("epoll_create: ");}// 在 epoll 中添加监听套接字_epoll_add(_listen_fd, EPOLLIN | EPOLLET);while (true){// epoll_waitepoll_event ep_revts[REVTS_SIZE]; /* 可以执行的集合 */int ret_wait = epoll_wait(_epoll_fd, ep_revts, REVTS_SIZE, -1);if (-1 == ret_wait){LOG(FATAL, "epoll_wail error");sys_error("epoll_wait: ");}else if (0 == ret_wait){LOG(WARNING, "epoll_wait timeout");}else{// 开始正常处理 epoll请求_epoll_run(ep_revts, ret_wait);}}}private:EpollServer() = default;~EpollServer(){if (_listen_fd >= 0){close(_listen_fd);}}// 初始化套接字void _socket_bind_listen(){_listen_fd = socket(AF_INET, SOCK_STREAM, 0);if (-1 == _listen_fd){LOG(FATAL, "socket error");sys_error("socket: ");}// 设置端口复用int opt = 1;setsockopt(_listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));sockaddr_in ser_addr;socklen_t ser_len = sizeof(ser_addr);memset(&ser_addr, 0, ser_len);ser_addr.sin_family = AF_INET;ser_addr.sin_addr.s_addr = INADDR_ANY;_ipaddr = inet_ntoa(ser_addr.sin_addr);// ser_addr.sin_addr.s_addr = inet_addr(m_ip.c_str());ser_addr.sin_port = htons(_port);if (0 != bind(_listen_fd, (sockaddr *)&ser_addr, ser_len)){LOG(FATAL, "bind error");sys_error("bind: ");}if (0 != listen(_listen_fd, LISTENQ)){LOG(FATAL, "listen error");sys_error("listen: ");}}// 往 epoll 模型中添加任务void _epoll_add(int fd, uint32_t __events){epoll_event ep_ev;ep_ev.data.fd = fd;ep_ev.events = __events;epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ep_ev);}// 在 epoll 模型中删除任务void _epoll_del(int fd){epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);}// 正常处理 epoll 请求void _epoll_run(epoll_event revts[], size_t ret_wait){for (int i = 0; i < ret_wait; i++){if (revts[i].events & EPOLLIN){if (revts[i].data.fd == _listen_fd){// 等待客户端的接入int comm_fd = _server_accept();}else{// 与客户端进行读写数据操作size_t comm_fd = revts[i].data.fd;_th_pool->enqueue(httpserver_io, comm_fd);}}}}// 等待客户端的接入, 如果接入成功,就建立映射void _server_accept(){sockaddr_in cli_addr;socklen_t cli_len = sizeof(cli_addr);memset(&cli_addr, 0, cli_len);int comm_fd = accept(_listen_fd, (sockaddr *)&cli_addr, &cli_len);if (comm_fd == -1){LOG(WARNING, "accept error");return;}// 接入成功,就建立映射char cli_ip[41] = "";inet_ntop(AF_INET, &cli_addr.sin_addr, cli_ip, 41);size_t cli_port = ntohs(cli_addr.sin_port);string str(cli_ip);LOG(INFO, "COMM_FD:[" + to_string(comm_fd) + "] IP:[" + str + "]:[" + to_string(cli_port) + "] join");_comm_fd_map[comm_fd] = make_pair(str, cli_port);// 有客户端接入成功,在 epoll 中添加这个套接字服务_epoll_add(comm_fd, EPOLLIN | EPOLLET);}};// EpollServer 的全局访问引用EpollServer& EP_PTR = EpollServer::get_server();// 开始与已连接的客户端进行读写, static void _server_io(size_t comm_fd){char buff[MAXLINE] = "";int size = read(comm_fd, buff, MAXLINE);auto& _map = EP_PTR._comm_fd_map[comm_fd];if (size > 0){// 读入正常,开始处理数据cout << "IP: [" << _map.first << "]:[" << _map.second << "]# " << buff << endl;write(comm_fd, buff, size);}else{// 读入失败,关闭连接if (size == -1){LOG(ERROR, "COMM_FD:[" + to_string(comm_fd) + "] IP:[" + _map.first+ "]:[" + to_string(_map.second) + "] read error");}close(comm_fd);// 关闭服务套接字LOG(ERROR, "COMM_FD:[" + to_string(comm_fd) + "] IP:[" + _map.first+ "]:[" + to_string(_map.second) + "] left");{lock_guard<mutex> lock(EP_PTR._map_mutex);EP_PTR._comm_fd_map.erase(comm_fd);// 删除映射_epoll_del(comm_fd); /* 处理完成,在epoll中删除这个套接字服务 */}}}int main(int argc, char **args){cout << "server main start ...\n";if (argc != 2){cout << "Please cin$$ ./httpserver port" << endl;cout << "For example$ ./httpserver 9090" << endl;return -1;}EpollServer& ptr = EpollServer::get_server();ptr.epoll_server_init(stoi(args[1]), 5);ptr.epoll_run();cout << "server main end ...\n";return 0;}

总结

epoll聊天室烂大街hhhhh

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。