1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > python3 异步 非阻塞 IO多路复用 select poll epoll 使用

python3 异步 非阻塞 IO多路复用 select poll epoll 使用

时间:2019-07-27 04:47:53

相关推荐

python3 异步 非阻塞 IO多路复用 select poll epoll 使用

有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。

下面记录下分别基于Select/Poll/Epoll的echo server实现。

Python Select Server,可监控事件数量有限制:

#!/usr/bin/python# -*- coding: utf-8 -*-import selectimport socketimport Queueserver = socket.socket(socket.AF_INET,socket.SOCK_STREAM)server.setblocking(False)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1)server_address= ('192.168.1.5',8080)server.bind(server_address)server.listen(10)#select轮询等待读socket集合inputs = [server]#select轮询等待写socket集合outputs = []message_queues = {}#select超时时间timeout = 20while True:print "等待活动连接......"readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)if not (readable or writable or exceptional) :print "select超时无活动连接,重新select...... "continue; #循环可读事件for s in readable :#如果是server监听的socketif s is server:#同意连接connection, client_address = s.accept()print "新连接: ", client_addressconnection.setblocking(0)#将连接加入到select可读事件队列inputs.append(connection)#新建连接为key的字典,写回读取到的消息message_queues[connection] = Queue.Queue()else:#不是本机监听就是客户端发来的消息data = s.recv(1024)if data :print "收到数据:" , data , "客户端:",s.getpeername()message_queues[s].put(data)if s not in outputs:#将读取到的socket加入到可写事件队列outputs.append(s)else:#空白消息,关闭连接print "关闭连接:", client_addressif s in outputs :outputs.remove(s)inputs.remove(s)s.close()del message_queues[s]for s in writable:try:msg = message_queues[s].get_nowait()except Queue.Empty:print "连接:" , s.getpeername() , '消息队列为空'outputs.remove(s)else:print "发送数据:" , msg , "到", s.getpeername()s.send(msg)for s in exceptional:print "异常连接:", s.getpeername()inputs.remove(s)if s in outputs:outputs.remove(s)s.close()del message_queues[s]

Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:

#!/usr/bin/python# -*- coding: utf-8 -*-import socketimport selectimport Queueserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setblocking(False)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ("192.168.1.5", 8080)server.bind(server_address)server.listen(5)print "服务器启动成功,监听IP:" , server_addressmessage_queues = {}#超时,毫秒timeout = 5000#监听哪些事件READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)READ_WRITE = (READ_ONLY|select.POLLOUT)#新建轮询事件对象poller = select.poll()#注册本机监听socket到等待可读事件事件集合poller.register(server,READ_ONLY)#文件描述符到socket映射fd_to_socket = {server.fileno():server,}while True:print "等待活动连接......"#轮询注册的事件集合events = poller.poll(timeout)if not events:print "poll超时,无活动连接,重新poll......"continueprint "有" , len(events), "个新事件,开始处理......"for fd ,flag in events:s = fd_to_socket[fd]#可读事件if flag & (select.POLLIN | select.POLLPRI) :if s is server :#如果socket是监听的server代表有新连接connection , client_address = s.accept()print "新连接:" , client_addressconnection.setblocking(False)fd_to_socket[connection.fileno()] = connection#加入到等待读事件集合poller.register(connection,READ_ONLY)message_queues[connection] = Queue.Queue()else :#接收客户端发送的数据data = s.recv(1024)if data:print "收到数据:" , data , "客户端:" , s.getpeername()message_queues[s].put(data)#修改读取到消息的连接到等待写事件集合poller.modify(s,READ_WRITE)else :# Close the connectionprint " closing" , s.getpeername()# Stop listening for input on the connectionpoller.unregister(s)s.close()del message_queues[s]#连接关闭事件elif flag & select.POLLHUP :print " Closing ", s.getpeername() ,"(HUP)"poller.unregister(s)s.close()#可写事件elif flag & select.POLLOUT :try:msg = message_queues[s].get_nowait()except Queue.Empty:print s.getpeername() , " queue empty"poller.modify(s,READ_ONLY)else :print "发送数据:" , data , "客户端:" , s.getpeername()s.send(msg)#异常事件elif flag & select.POLLERR:print " exception on" , s.getpeername()poller.unregister(s)s.close()del message_queues[s]

Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:

#!/usr/bin/python# -*- coding: utf-8 -*-import socket, selectimport Queueserversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ("192.168.1.5", 8080)serversocket.bind(server_address)serversocket.listen(1)print "服务器启动成功,监听IP:" , server_addressserversocket.setblocking(0)timeout = 10#新建epoll事件对象,后续要监控的事件添加到其中epoll = select.epoll()#添加服务器监听fd到等待读事件集合epoll.register(serversocket.fileno(), select.EPOLLIN)message_queues = {}fd_to_socket = {serversocket.fileno():serversocket,}while True:print "等待活动连接......"#轮询注册的事件集合events = epoll.poll(timeout)if not events:print "epoll超时无活动连接,重新轮询......"continueprint "有" , len(events), "个新事件,开始处理......"for fd, event in events:socket = fd_to_socket[fd]#可读事件if event & select.EPOLLIN:#如果活动socket为服务器所监听,有新连接if socket == serversocket:connection, address = serversocket.accept()print "新连接:" , addressconnection.setblocking(0)#注册新连接fd到待读事件集合epoll.register(connection.fileno(), select.EPOLLIN)fd_to_socket[connection.fileno()] = connectionmessage_queues[connection] = Queue.Queue()#否则为客户端发送的数据else:data = socket.recv(1024)if data:print "收到数据:" , data , "客户端:" , socket.getpeername()message_queues[socket].put(data)#修改读取到消息的连接到等待写事件集合epoll.modify(fd, select.EPOLLOUT)#可写事件elif event & select.EPOLLOUT:try:msg = message_queues[socket].get_nowait()except Queue.Empty:print socket.getpeername() , " queue empty"epoll.modify(fd, select.EPOLLIN)else :print "发送数据:" , data , "客户端:" , socket.getpeername()socket.send(msg)#关闭事件elif event & select.EPOLLHUP:epoll.unregister(fd)fd_to_socket[fd].close()del fd_to_socket[fd]epoll.unregister(serversocket.fileno())epoll.close()serversocket.close()

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。