1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > 使用asio搭建商用服务器

使用asio搭建商用服务器

时间:2021-01-21 22:50:23

相关推荐

使用asio搭建商用服务器

1. 背景介绍

1.1 什么是asio

从5月份开始我主持了webyy服务器项目(/webyy.html),项目中没有按照惯例使用公司既有的基于epoll的网络框架,而是尝试了C++ tr2标准中的实验网络库asio,无论从开发效率、程序性能、稳定性上来说,都是一次成功的尝试。虽然是商业项目,但使用了linux、asio、protobuf等大量开源项目,开发过程共也借鉴了其他一些开源项目,因此我决定把与公司无关的部分剥离一下,分享出来,尽到使用自由软件的义务。

asio由Christopher M. Kohlhoff大牛从着手开发,申请加入C++ tr1,3月份加入boost1.35.0,按照boost与C++标准库的发展惯例,预测很快会加入C++标准库中。其中的async调用方式已经作为非常重要的新特性,加入到C++0x标准库。

1.2 asio的相关资料

asio官方提供了及其详细的文档、例子、教程,没有必要再累赘地将其转述一遍。如果有朋友对英文有些吃力,网上也早有很多翻译版。这里提供一些官方的文档资料:

非boost版本的asio ——http://think-/

与boost::asio的主要区别就是名字空间是boost::asio还是asio。

boost::asio ——/doc/libs/1520/doc/html/boost_asio.html

申请加入tr2时的申报材料 —— 猛击此处下载高清pdf

proactor模式的首篇论文 —— 猛击此处下载高清pdf

使用asio的大型开源网络项目sip ——/rep/resiprocate/main

2. 源码参考

由于代码使用了一点其他的工具,所以并没有想让读者能够编译通过。但是对从头开始搭建服务器的朋友来说,一定是一份非常有价值的参考。

2.1 作为Client的模块

这部分供作为Client去连接其他服务器时使用。给出的源码中有三个类:TcpConnection, BizConnection, Client. 其中

TcpConnection提供了与协议无关的tcp连接,异步操作的结果以虚函数方式供派生类使用

BizConnection继承自TcpConnection,使用具体的协议解析报文

Client使用BizConnection,并提供了等待具体某条消息的wait_for、心跳、延迟等功能

2.1.1tcpconnection.h

#ifndefTCPCONNECTION_H#defineTCPCONNECTION_H/***@authoryurunsun@*/#include<asio.hpp>#include<asio/deadline_timer.hpp>#include<boost/shared_ptr.hpp>#include<boost/enable_shared_from_this.hpp>#include<boost/timer.hpp>#include<sstream>#include"safehandler.h"classTcpConnection:publicboost::enable_shared_from_this<TcpConnection>,privateboost::noncopyable{public:typedefstd::vector<uint8_t>DataBuffer;typedefboost::shared_ptr<TcpConnection>TcpPtr;staticTcpPtrcreate(asio::io_service&io_service,conststring&name){returnTcpPtr(newTcpConnection(io_service,name));}virtual~TcpConnection();voidstart(conststring&ip,conststring&port);voidstart(unsignedip,uint16_tport);voidstart(conststring&ip,uint16_tport);voidstop();boolisConnected(){returnm_socket.is_open();}///GettersandSettersvoidsetName(conststring&name){m_name=name;}conststring&getName(){returnm_name;}voidsetHeadLength(uint32_tsize){m_headLength=size;}uint32_tgetHeadLength(){returnm_headLength;}voidsetConnectTimeoutSec(uint32_tsec){m_connectTimeoutSec=sec;}uint32_tgetConnectTimeoutSec(){returnm_connectTimeoutSec;}conststring&getip(){returnm_ip;}uint16_tgetport(){returnm_port;}stringgetFarpointInfo(){stringstreamss;ss<<m_name<<""<<m_ip<<":"<<m_port<<"";returnss.str();}protected:explicitTcpConnection(asio::io_service&io_service,conststring&name);///Provideforderivedclassvoidconnect(asio::ip::tcp::endpointendpoint);voidreceiveHead();voidreceiveBody(uint32_tbodyLength);voidsend(constvoid*data,uint32_tlength);///ClassoverridecallbacksvirtualvoidonConnectSuccess(){assert(false);}virtualvoidonConnectFailure(constasio::error_code&e){(void)e;assert(false);}virtualvoidonReceiveHeadSuccess(DataBuffer&data){(void)data;assert(false);}virtualvoidonReceiveBodySuccess(DataBuffer&data){(void)data;assert(false);}virtualvoidonReceiveFailure(constasio::error_code&e){(void)e;assert(false);}virtualvoidonSendSuccess(){assert(false);}virtualvoidonSendFailure(constasio::error_code&e){(void)e;assert(false);}virtualvoidonTimeoutFailure(constasio::error_code&e){(void)e;assert(false);}virtualvoidonCommonError(uint32_tec,conststring&em){(void)ec;(void)em;assert(false);}private:voidcheckDeadline(constasio::error_code&e);voidhandleConnect(constasio::error_code&e);voidhandleReceiveHead(constasio::error_code&e);voidhandleReceiveBody(constasio::error_code&e);voidhandleSend(constasio::error_code&e);typedefTcpConnectionthis_type;asio::ip::tcp::socketm_socket;asio::deadline_timerm_deadline;boolm_stopped;DataBufferm_readBuf;stringm_name;boost::shared_ptr<Probe>m_probe;uint32_tm_headLength;uint32_tm_connectTimeoutSec;stringm_ip;uint16_tm_port;};#endif//TCPCONNECTION_H

2.1.2tcpconnection.cpp

#include"stdafx.h"#include"tcpconnection.h"usingasio::ip::tcp;TcpConnection::TcpConnection(asio::io_service&io_service,conststring&name):m_socket(io_service),m_deadline(io_service),m_stopped(false),m_name(name),m_probe(newProbe),m_headLength(10),m_connectTimeoutSec(5),m_ip(""),m_port(0){}TcpConnection::~TcpConnection(){stop();}voidTcpConnection::start(conststring&ip,conststring&port){start(ip,atoi(port.c_str()));}voidTcpConnection::start(unsignedip,uint16_tport){connect(tcp::endpoint(asio::ip::address_v4(ip),port));}voidTcpConnection::start(conststring&ip,uint16_tport){asio::ip::address_v4addr_v4=asio::ip::address_v4::from_string(ip);connect(tcp::endpoint(addr_v4,port));}voidTcpConnection::stop(){if(!m_stopped){m_stopped=true;try{m_readBuf.clear();asio::error_codeignored;m_socket.shutdown(tcp::socket::shutdown_both,ignored);m_socket.close(ignored);m_deadline.cancel();}catch(constasio::system_error&err){FATAL("asio::system_errorem%s",err.what());}}}voidTcpConnection::connect(tcp::endpointendpoint){m_stopped=false;m_ip=endpoint.address().to_string();m_port=endpoint.port();INFO("Tryingconnect%s:%u...%s",STR(m_ip),m_port,STR(m_name));m_deadline.expires_from_now(boost::posix_time::seconds(m_connectTimeoutSec));m_socket.async_connect(endpoint,boost::bind(&TcpConnection::handleConnect,shared_from_this(),asio::placeholders::error));m_deadline.async_wait(SafeHandler1<this_type,constasio::error_code&>(&this_type::checkDeadline,this,m_probe));}voidTcpConnection::receiveHead(){m_readBuf.resize(m_headLength);asio::async_read(m_socket,asio::buffer(&m_readBuf[0],m_headLength),boost::bind(&TcpConnection::handleReceiveHead,shared_from_this(),asio::placeholders::error));}voidTcpConnection::receiveBody(uint32_tbodyLength){if(!m_stopped){if((bodyLength<=MAX_BUFFER_SIZE)&&(bodyLength>0)){m_readBuf.resize(bodyLength+m_headLength);asio::async_read(m_socket,asio::buffer(&m_readBuf[m_headLength],bodyLength),boost::bind(&TcpConnection::handleReceiveBody,shared_from_this(),asio::placeholders::error));}else{onCommonError(S_FATAL,"illegalbodyLengthtocallreceiveBody");}}else{onCommonError(S_ERROR,"illegaltocallreceiveBodywhiletcpisnotconnected");}}voidTcpConnection::send(constvoid*data,uint32_tlength){if(!m_stopped){if(length<=MAX_BUFFER_SIZE){asio::async_write(m_socket,asio::const_buffers_1(data,length),boost::bind(&TcpConnection::handleSend,shared_from_this(),asio::placeholders::error));}else{onCommonError(S_ERROR,"toobiglengthtocallsend");}}else{onCommonError(S_ERROR,"illegaltocallsendwhiletcpisnotconnected");}}/***@briefTcpConnection::checkDeadline*@parame*case1:m_stopped==truewhichmeansusercanceled*case2:m_deadline.expires_at()<=asio::deadline_timer::traits_type::now()*Checkwhetherthedeadlinehaspassed.Wecomparethedeadlineagainstthecurrenttimesinceanewasynchronousoperationmayhavemovedthedeadlinebeforethisactorhadachancetorun.*/voidTcpConnection::checkDeadline(constasio::error_code&e){if(!m_stopped){if(m_deadline.expires_at()<=asio::deadline_timer::traits_type::now()){onTimeoutFailure(e);}}}voidTcpConnection::handleConnect(constasio::error_code&e){if(!m_stopped){if(!e){m_deadline.cancel();onConnectSuccess();}else{onConnectFailure(e);}}else{INFO("%s%s%uuser'scanceledbystop()",STR(m_name),STR(m_ip),m_port);}}voidTcpConnection::handleReceiveHead(constasio::error_code&e){if(!m_stopped){if(!e){onReceiveHeadSuccess(m_readBuf);}elseif(isConnected()){onReceiveFailure(e);}}else{INFO("%s%s%uuser'scanceledbystop()",STR(m_name),STR(m_ip),m_port);}}voidTcpConnection::handleReceiveBody(constasio::error_code&e){if(!m_stopped){if(!e){onReceiveBodySuccess(m_readBuf);}elseif(isConnected()){onReceiveFailure(e);}}else{INFO("%s%s%uuser'scanceledbystop()",STR(m_name),STR(m_ip),m_port);}}voidTcpConnection::handleSend(constasio::error_code&e){if(!m_stopped){if(!e){//onSendSuccess();}elseif(isConnected()){onSendFailure(e);}}else{INFO("%s%s%uuser'scanceledbystop()",STR(m_name),STR(m_ip),m_port);}}

2.1.3bizconnection.h

#ifndefBIZCONNECTION_H#defineBIZCONNECTION_H/***@authoryurunsun@*/#include<asio.hpp>#include<cstdio>#include<stdexcept>#include"sigslot/sigslot.h"#include"tcpconnection.h"classBizConnection:publicTcpConnection{public:typedefboost::shared_ptr<BizConnection>BizPtr;staticBizPtrcreate(asio::io_service&io_service,conststring&name=string("")){returnBizPtr(newBizConnection(io_service,name));}voidsendBizMsg(uint32_turi,constBizPackage&pkg);sigslot::signal0<>BizConnected;sigslot::signal2<uint32_t,conststring&>BizError;sigslot::signal0<>BizClosed;sigslot::signal1<BizPackage&>BizMsgArrived;protected:explicitBizConnection(asio::io_service&io_service,conststring&name);///ImplementcallbacksinbaseclassvirtualvoidonConnectSuccess();virtualvoidonConnectFailure(constasio::error_code&e);virtualvoidonReceiveHeadSuccess(DataBuffer&data);virtualvoidonReceiveBodySuccess(DataBuffer&data);virtualvoidonReceiveFailure(constasio::error_code&e);virtualvoidonSendSuccess();virtualvoidonSendFailure(constasio::error_code&e);virtualvoidonTimeoutFailure(constasio::error_code&e);virtualvoidonCommonError(uint32_tec,conststring&em);staticvoidinitNeedErrorSet();staticstd::set<uint32_t>m_needError;private:inlineboolpeekLength(void*data,uint32_tlength,uint32_t&outputi32);voidhandleError(conststring&from,uint32_tec,constasio::error_code&e=asio::error_code());};inlineboolBizConnection::peekLength(void*data,uint32_tlength,uint32_t&outputi32){if(length>=4){memcpy(&outputi32,data,sizeof(uint32_t));returntrue;}else{returnfalse;}}#endif//BIZCONNECTION_H

2.1.4bizconnection.cpp

#include"stdafx.h"#include"bizconnection.h"#include"tcpconnection.h"std::set<uint32_t>BizConnection::m_needError;BizConnection::BizConnection(asio::io_service&io_service,conststring&name):TcpConnection(io_service,name){}voidBizConnection::sendBizMsg(unsigneduri,constBizPackage&pkg){///TODO:usuallythisBizPackagecontainsthebufferofstreamdatatobesenttofarpoint///YoushouldimplementthisbyretrivingbufferinBizPackagethencallTcpConnection::send();}voidBizConnection::onConnectSuccess(){receiveHead();BizConnected.emit();}voidBizConnection::onConnectFailure(constasio::error_code&e){if(e==asio::error::operation_aborted){INFO("%s%s%uoperationaborted...%s",STR(getName()),STR(getip()),getport(),STR(e.message()));}elseif((e==asio::error::already_connected)||(e==asio::error::already_open)||(e==asio::error::already_started)){WARN("%s%s%ualreadconnected...%s",STR(getName()),STR(getip()),getport(),STR(e.message()));}else{handleError("onConnectFailure",S_FATAL,e);}}voidBizConnection::onReceiveHeadSuccess(TcpConnection::DataBuffer&data){uint32_tpkglen=0;if(peekLength(data.data(),data.size(),pkglen)){receiveBody(pkglen-getHeadLength());}else{handleError("peekLength",S_FATAL);}}voidBizConnection::onReceiveBodySuccess(TcpConnection::DataBuffer&data){///Thisissimplyanexample,actuallyit'suser'sdutytounmarshalbuffertopackage.BizPackagemsg;BizPackage.unserializeFrom(data);BizMsgArrived.emit(msg);receiveHead();}voidBizConnection::onReceiveFailure(constasio::error_code&e){if((e==asio::error::operation_aborted)){INFO("%soperation_aborted...%s",STR(getFarpointInfo()),STR(e.message()));}else{handleError("onReceiveFailure",S_FATAL,e);}}voidBizConnection::onSendSuccess(){///Leaveitblank}voidBizConnection::onSendFailure(constasio::error_code&e){if((e==asio::error::operation_aborted)){INFO("%soperation_aborted...%s",STR(getFarpointInfo()),STR(e.message()));}else{handleError("onSendFailure",S_FATAL,e);}}voidBizConnection::onTimeoutFailure(constasio::error_code&e){if((e==asio::error::operation_aborted)){INFO("%soperation_aborted...%s",STR(getFarpointInfo()),STR(e.message()));}else{handleError("onTimeoutFailure",S_FATAL);}}voidBizConnection::onCommonError(uint32_tec,conststring&em){handleError(em,ec);}voidBizConnection::handleError(conststring&from,uint32_tec,constasio::error_code&e/*=asio::error_code()*/){stringstreamss;ss<<getFarpointInfo()<<""<<from;if(e){ss<<"asio"<<e.value()<<""<<e.message();}BizError.emit(ec,ss.str());}

2.1.5client.h

#ifndefCLIENT_H#defineCLIENT_H/***@authoryurunsun@*/#include"bizconnection.h"#include"handler.h"#include"safehandler.h"#include<asio.hpp>#include<boost/timer.hpp>classClient:publicsigslot::has_slots<>{protected:BizConnection::BizPtrm_pBizConnection;public:typedefvoid(Client::*RequestPtr)(BizPackage&);typedefstd::map<uint32_t,RequestPtr>RequestMap;typedefvoid(Client::*NotifyPtr)(BizPackage&);typedefstd::map<uint32_t,NotifyPtr>NotifyMap;explicitClient(conststring&name=string(""));///继承类需要实现的提供外部的方法virtualvoidstartServer()=0;virtualboolsendToServer(YProto&proto)=0;virtualvoidstopServer(){clearWaitforTimer();m_pBizConnection->stop();}protected:///继承类需要实现的初始化函数virtualvoidinitRequestMap(){assert(false);}virtualvoidinitNotifyMap(){assert(false);}virtualvoidinitSignal();///继承类需要实现的钩子函数,用于处理网络事件virtualvoidonBizMsgArrived(core::Request&msg)=0;virtualvoidonBizError(uint32_tec,conststring&em);virtualvoidonBizConnected()=0;///继承类可以使用的工具方法///1.心跳类voidsetKeepAliveSec(uint32_tsec){m_keepAliveSec=sec;}uint32_tgetKeepAliveSec(){returnm_keepAliveSec;}voidstartKeepAlive();voidkeepAlive(constasio::error_code&e);virtualvoidonKeepAlive(){assert(false);}///2.登陆状态类voidsetHasLogin(boolb){m_hasLogin=b;}boolgetHasLogin(){returnm_hasLogin;}boolisOnline(){return(m_pBizConnection->isConnected()&&m_hasLogin);}///3.消息保存类template<typenameHandler>boolsavePendingCommand(Handlerhandler){if(m_pendingCmd.size()<MaxPendingCommandCount){m_pendingCmd.push_back(Command(handler));returntrue;}returnfalse;}voidsendPendingCommand(){if(isOnline()){vector<Command>::iteratorit=m_pendingCmd.begin();for(;it!=m_pendingCmd.end();++it){(*it)();}m_pendingCmd.clear();}}///4.延迟处理类typedefvoid(Client::*HoldonCallback)();voidholdonSeconds(uint32_tsec,HoldonCallbackfunc);voidholdonHandler(HoldonCallbackfunc,constasio::error_code&e);///5.waitfor工具处理异步消息超时typedefboost::shared_ptr<asio::deadline_timer>SharedTimerPtr;typedefboost::scoped_ptr<asio::deadline_timer>ScopedTimerPtr;typedefboost::shared_ptr<Probe>SharedProbe;typedefmap<uint32_t,SharedTimerPtr>Uri2Timer;///等待收到的包uri-->这个时间timervoidwaitfor(uint32_turi,uint32_tsec);///在发送req的时候调用,sec秒数uri等待收到的urivoidwaitforTimeout(uint32_turi,constasio::error_code&e);///所有waitfor超时都会自动回调这个函数virtualvoidonWaitforTimeout(uint32_turi){(void)uri;assert(false);}///继承类覆盖这个钩子函数来进行错误处理voidwaitforReceived(uint32_turi);///当响应函数handler被回调时,记得调用waitforReceived做清理工作voideraseWaitforTimer(uint32_turi);voidclearWaitforTimer();///继承类可以使用的工具成员:心跳探针请求阻塞typedefstd::set<uint32_t>BlockReq;BlockReqm_block;SharedProbem_probe;ScopedTimerPtrm_timer;uint32_tm_keepAliveSec;boolm_hasLogin;vector<Command>m_pendingCmd;staticconstuint32_tMaxPendingCommandCount=20;ScopedTimerPtrm_holdonTimer;Uri2Timerm_uri2timer;};#defineBIND_REQ(m,uri,callback)\m[static_cast<uint32_t>(uri)]=static_cast<RequestPtr>(callback);#defineBIND_NOTIFY(m,uri,callback)\m[static_cast<uint32_t>(uri)]=static_cast<NotifyPtr>(callback);#endif//CLIENT_H

2.1.6client.cpp

#include"stdafx.h"#include"client.h"Client::Client(conststring&name):m_pBizConnection(BizConnection::create(ioService::instance(),name)),m_probe(newProbe),m_keepAliveSec(10),m_hasLogin(false){}voidClient::initSignal(){m_pBizConnection->BizError.connect(this,&Client::onBizError);m_pBizConnection->BizMsgArrived.connect(this,&Client::onBizMsgArrived);m_pBizConnection->BizConnected.connect(this,&Client::onBizConnected);}voidClient::onBizError(uint32_tec,conststring&em){m_facade.serverError.emit(ec,em);}voidClient::startKeepAlive(){m_timer.reset(newasio::deadline_timer(m_facade.io_service_ref));m_timer->expires_from_now(boost::posix_time::seconds(m_keepAliveSec));m_timer->async_wait(SafeHandler1<Client,constasio::error_code&>(&Client::keepAlive,this,m_probe));}voidClient::keepAlive(constasio::error_code&e){if(e!=asio::error::operation_aborted){FINE("%usendpingto%s%s:%u",m_facade.m_pInfo->uid,STR(m_pBizConnection->getName()),STR(m_pBizConnection->getip()),m_pBizConnection->getport());onKeepAlive();m_timer->expires_from_now(boost::posix_time::seconds(m_keepAliveSec));m_timer->async_wait(SafeHandler1<Client,constasio::error_code&>(&Client::keepAlive,this,m_probe));}}voidClient::holdonSeconds(uint32_tsec,HoldonCallbackfunc){m_holdonTimer.reset(newasio::deadline_timer(m_facade.io_service_ref));m_holdonTimer->expires_from_now(boost::posix_time::seconds(sec));SafeHandler1Bind1<Client,HoldonCallback,constasio::error_code&>h(&Client::holdonHandler,this,func,m_probe);m_holdonTimer->async_wait(h);}voidClient::holdonHandler(HoldonCallbackfunc,constasio::error_code&e){if(!e){if(m_holdonTimer!=NULL)m_holdonTimer->cancel();(this->*func)();}else{WARN("error:%s",STR(e.message()));}}voidClient::waitfor(uint32_turi,uint32_tsec){SharedTimerPtrt(newasio::deadline_timer(m_facade.io_service_ref));t->expires_from_now(boost::posix_time::seconds(sec));t->async_wait(SafeHandler1Bind1<Client,uint32_t,constasio::error_code&>(&Client::waitforTimeout,this,uri,m_probe));m_uri2timer[uri]=t;}voidClient::waitforTimeout(uint32_turi,constasio::error_code&e){if(e!=asio::error::operation_aborted){FATAL("%swaitforuri%utimeout",STR(m_pBizConnection->getName()),uri);eraseWaitforTimer(uri);onWaitforTimeout(uri);}}voidClient::waitforReceived(uint32_turi){eraseWaitforTimer(uri);}voidClient::eraseWaitforTimer(uint32_turi){Uri2Timer::iteratorit=m_uri2timer.find(uri);if(it!=m_uri2timer.end()){SharedTimerPtr&t=it->second;if(t){asio::error_codee;t->cancel(e);t.reset();}m_uri2timer.erase(it);}}voidClient::clearWaitforTimer(){Uri2Timer::iteratorit=m_uri2timer.begin();for(;it!=m_uri2timer.end();++it){SharedTimerPtr&t=it->second;if(t){asio::error_codee;t->cancel(e);t.reset();}}m_uri2timer.clear();}

2.2 作为server模块

作为server模块由于涉及公司的业务比较多,这里剥离出一个作为crossdomain服务器的部分,功能很简单:flash客户端通过socket请求crossdomain配置文件,server返回给定的字符串。这里使用了比较著名的pimpl模式,将实现完全隐藏在cpp文件中。

2.2.1crossdomain.h

#ifndefCROSSDOMAIN_H#defineCROSSDOMAIN_H#include<string>#include<boost/shared_ptr.hpp>#include<asio.hpp>/***@authoryurunsun@*/classCrossDomain{private:structServer;boost::shared_ptr<Server>m_pserver;CrossDomain(asio::io_service&io_service,conststd::string&local_port);staticCrossDomain*s_instance;public:staticvoidcreate(asio::io_service&io_service,conststd::string&local_port){s_instance=newCrossDomain(io_service,local_port);}staticCrossDomain*instance();voidstart_server();};#endif//CROSSDOMAIN_H

2.2.2crossdomain.cpp

#include"stdafx.h"#include"crossdomain.h"usingasio::ip::tcp;usingboost::uint8_t;CrossDomain*CrossDomain::s_instance=NULL;structCrossDomainImpl:publicboost::enable_shared_from_this<CrossDomainImpl>{public:staticconstunsignedMaxReadSize=22;typedefboost::shared_ptr<CrossDomainImpl>CrossDomainImplPtr;staticCrossDomainImplPtrcreate(asio::io_service&io_service){returnCrossDomainImplPtr(newCrossDomainImpl(io_service));}tcp::socket&get_socket(){returnm_socket;}voidstart(){start_read_some();}~CrossDomainImpl(){close();}voidclose(){if(m_socket.is_open()){m_socket.close();}}private:CrossDomainImpl(asio::io_service&io_service):m_socket(io_service){}voidstart_read_some(){m_socket.async_read_some(asio::buffer(m_readbuf,MaxReadSize),boost::bind(&CrossDomainImpl::handle_read_some,shared_from_this(),asio::placeholders::error()));}voidhandle_read_some(constasio::error_code&err){if(!err){stringstr(m_readbuf);stringreply("invalid");if(str=="<policy-file-request/>"){reply="anythingyouwannasendbacktoclient...";}asio::async_write(m_socket,asio::buffer(ref),boost::bind(&CrossDomainImpl::handle_write,shared_from_this(),asio::placeholders::error));}}voidhandle_write(constasio::error_code&error){FINE("CrossDomainhandle_write,gonnaclose");close();}tcp::socketm_socket;charm_readbuf[MaxReadSize];};structCrossDomain::Server{private:CrossDomain*m_facade;tcp::acceptorm_acceptor;boolm_listened;stringm_local_port;public:Server(asio::io_service&io_service,conststring&local_port):m_acceptor(io_service),m_listened(false),m_local_port(local_port){//intendtoleaveitblank}~Server(){if(m_acceptor.is_open()){INFO("closeserveracceptor");m_acceptor.close();}}voidstart_server(){FINE("CrossDomainstart_server....");if(!m_listened){FINE("Trytolisten...");try{tcp::endpointep(tcp::endpoint(tcp::v4(),atoi(m_local_port.c_str())));m_acceptor.open(ep.protocol());m_acceptor.bind(ep);m_acceptor.listen();}catch(constasio::system_error&ec){WARN("Port%salreadyinuse!Failtolisten...",STR(m_local_port));return;}catch(...){WARN("Unknownerrorwhiletryingtolisten...");return;}m_listened=true;FINE("Listenport%ssuccesfully!",STR(m_local_port));}CrossDomainImpl::CrossDomainImplPtrnew_server_impl=CrossDomainImpl::create(m_acceptor.get_io_service());m_acceptor.async_accept(new_server_impl->get_socket(),boost::bind(&Server::handle_accept,this,new_server_impl,asio::placeholders::error));}private:voidhandle_accept(CrossDomainImpl::CrossDomainImplPtrpserver_impl,constasio::error_code&err){FINE("CrossDomainhandle_accpet....");if(!err){FINE("CrossDomaineverythingok,start...");pserver_impl->start();//startthisserverstart_server();//waitingforanotherTunaConnection}else{pserver_impl->close();}}};CrossDomain::CrossDomain(asio::io_service&io_service,conststd::string&local_port):m_pserver(newServer(io_service,local_port)){}CrossDomain*CrossDomain::instance(){if(!s_instance){returnNULL;}returns_instance;}voidCrossDomain::start_server(){m_pserver->start_server();}

3. 使用asio的陷阱

上边代码其实有几点漏洞:

3.1std::vector<uint_8>不适合作为buffer

vector<uint8_t>不适合做buffer的原因是,sgi的内存分配器会以2倍的形式增长vector的内存,例如这个buffer要求100K,但当前vector的capability只有90K,那么sgi默认内存分配器会将vector的capability增长到180K。注意capability与size的区别。这就导致vector的内存占用依赖最大buffer的size,这是很危险的。

推荐使用boost的circular_buffer作为buffer,能有效避免内存碎片、隐式内存泄露等问题。

3.2asio::const_buffer拷贝构造函数没有深拷贝

const_buffer系列静态buffer只能从mutable_buffermerge过来,但是从const_buffer的拷贝构造函数源码能看到,他并不对buffer做深拷贝。所以试图将其放到队列或者容器中,期待产生buffer的拷贝,是错误的。

3.3.async_write可能会拆包发送

例如先调用async_write发送一个100K的大包,再马上调用async_write发送一个8字节的ping包,非常可能出现问题。async_write函数的实现是循环调用async_write_some,对于大包会将其拆分成几个小报文。如果此时收到用户一个新的async_write调用,非常可能将小包夹在大包的几个部分中间发送,导致接收端出现异常。

解决的办法可以直接操作async_write_some,代替async_write。但更方便的办法是创建一个发送队列。实际上asio会准确地将发送成功的通知发送给用户,例如刚刚100K的打包,直到所有100K全部发送完成,才会调用handle回调。因此可以在发送时将报文入队列,回调函数里将报文出队列,发送下一个小报时判断队列是否为空,如果非空说明100K的包还没有发完。示例代码如下:

///发送时入队列voidTcpConnection::send(constvoid*data,uint32_tlength){if(!m_stopped){if(length<=MAX_BUFFER_SIZE){constchar*begin=(constchar*)data;vector<char>vec(begin,begin+length);boolisLastComplete=m_bufQueue.empty();m_bufQueue.push_back(vec);///如果没有残余的包,就直接发送if(isLastComplete){vector<char>&b(m_bufQueue.front());send(b);}}else{onCommonError(S_ERROR,"toobiglengthtocallsend");}}else{onCommonError(S_ERROR,"illegaltocallsendwhiletcpisnotconnected");}}voidTcpConnection::send(conststd::vector<char>&vec){if(!m_stopped){asio::async_write(m_socket,asio::buffer(&vec[0],vec.size()),asio::transfer_all(),boost::bind(&TcpConnection::handleSend,shared_from_this(),asio::placeholders::error));}else{onCommonError(S_ERROR,"illegaltocallsendwhiletcpisnotconnected");}}///回调函数将之前的buffer出队列,同时检查是否有后来的包voidTcpConnection::handleSend(constasio::error_code&e){if(!m_stopped){if(!e){m_bufQueue.pop_front();if(!m_bufQueue.empty()){std::vector<char>&b(m_bufQueue.front());send(b);}//onSendSuccess();}elseif(isConnected()){onSendFailure(e);}}else{INFO("%s%s%uuser'scanceledbystop()",STR(m_name),STR(m_ip),m_port);}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。