1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > python3高级编程学习笔记(linux环境)

python3高级编程学习笔记(linux环境)

时间:2022-09-13 21:01:22

相关推荐

python3高级编程学习笔记(linux环境)

提示:记录一些vim编辑环境下常用命令以及pyhon3高级编程

目录

一.python3网络编程

1.udp

1.udp实现从linux发送数据到windows

2.接收数据到ubuntu

3.使用同一个套接字完成收发信息

4.一个udp聊天器案例

2.tcp实现

1.tcp-client发送信息青春版

2.tcp-server青春版

3.循环为多个客户端服务的服务器青春版

4.升级版的循环服务的服务器

5.案例-文件下载器

二.多任务

1.多任务(线程实现)

1.多任务-线程

2.继承Thread类完成多任务

3.函数里修改全局变量

4.互斥锁解决资源竞争问题

5.多任务实现udp聊天器

1.多任务(进程实现)

1.使用进程完成多任务

2.利用队列完成多任务进程之间的通信

3.案例-复制文件jia里所有文件青春版

4.案例-带进度条的文件夹拷贝器

3.多任务(协程实现)

1.迭代器-自己定义的类进行迭代青春版

2.迭代器-自己的类进行迭代

3.迭代器实现多任务(Fibonacci为例子)

4.生成器-实现斐波那契数列

5.生成器的一些知识

6.生成器send()实现多任务

7.通过yield完成多任务

8.greenlet多任务

9.gevent完成多任务

10.gevent多任务-2

11.案例-图片下载器

三.web服务器v3.1

1.正则表达式简介

1.正则判断变量名是否合理

2.正则判断常用邮箱地址

2.http

1.实现简单的http服务器

2.根据客户端需要完成http服务器

3.返回index页面http

4.使用多进程完成http服务器

5.使用多线程完成http

6.协程完成http服务器

7.单进程单线程非堵塞并发完成http服务器

8.单进程单线程非堵塞长连接

9.epoll实现http服务器

一.python3网络编程

1.udp

1.udp实现从linux发送数据到windows

windows编码是("gbk")

linux('utf-8')

import socketdef main():udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)while True:sent_data = input("请输入您需要发送的信息")if sent_data == 'exit':breakudp_socket.sendto(sent_data.encode('utf-8'),("172.21.126.225",8080))udp_socket.close()if __name__ =='__main__':main()

解码用decode

编码用encode

发信息基本步骤:1.创建套接字

2.输入需要发送的信息

3.发送信息

4.关闭套接字

2.接收数据到ubuntu

收信息基本步骤:1.创建套接字

2.绑定端口号

3.调用函数recvfrom() 第一个返回值是信息,第二个是对方的地址

4.关闭套接字

import socketdef main():# 创建套接字udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)# 绑定端口号local_addr = ("",4657)udp_socket.bind(local_addr)recv_data = udp_socket.recvfrom(1024)recv_msg = recv_data[0] # 接受的信息recv_addr = recv_data[1] # 发送者的地址信息print(f"{recv_addr}:{recv_msg}")# udp_socket.sendto(b"haha",("172.21.126.225",8080))udp_socket.close()if __name__ =='__main__':main()

3.使用同一个套接字完成收发信息

import socketdef main():# 创建套接字udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)# 获取对方的ip/portdest_ip = input("请输入对方的ip:")dest_port = int(input("请输入对方的port:"))# 从键盘获取数据send_data = input("请输入要发送的数据:")# 可以使用套接字收发数据udp_socket.sendto(send_data.encode("utf-8"),(dest_ip,dest_port))# 绑定端口号# local_addr = ("",4657)# udp_socket.bind(local_addr)# 接手数据recv_data = udp_socket.recvfrom(1024)recv_msg = recv_data[0] # 接受的信息recv_addr = recv_data[1] # 发送者的地址信息print(f"{recv_addr}:{recv_msg}")# udp_socket.sendto(b"haha",("192.168.220.1",8080))udp_socket.close()if __name__ =='__main__':main()

4.一个udp聊天器案例

import socketdef send_msg(udp_socket):"""发送消息"""dest_ip = input("请输入对方的ip:")dest_port = int(input("请输入对方的port:"))send_data = input("请输入要发送的数据:")# 可以使用套接字收发数据udp_socket.sendto(send_data.encode("utf-8"),(dest_ip,dest_port))def recv_msg(udp_socket):"""接收数据"""recv_data = udp_socket.recvfrom(1024)recv_msg = recv_data[0].decode("gbk") # 接受的信息recv_addr = recv_data[1] # 发送者的地址信息print(f"{recv_addr}:{recv_msg}")def main():# 创建套接字udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)#绑定信息udp_socket.bind(("",7788))# 获取对方的ip/port# dest_ip = input("请输入对方的ip:")# dest_port = int(input("请输入对方的port:"))# 从键盘获取数据while True:print("-----dup聊天器-----")print("1:发送消息")print("2:接收消息")print("0:退出聊天器")op = input("请输入功能:")if op == "1":#发送消息send_msg(udp_socket)elif op == "2":# 接收数据并显示 recv_msg(udp_socket)elif op == '0':breakelse:print("输入有误")# 绑定端口号# local_addr = ("",4657)# udp_socket.bind(local_addr)# 接收数据# udp_socket.sendto(b"haha",("192.168.220.1",8080))udp_socket.close()if __name__ =='__main__':main() 21,1Top

2.tcp实现

tcp分为客户端和服务器

1.tcp-client发送信息青春版

(1)先创建一个tcp套接字

tcp_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

不同于udp,socket()函数的第二个参数里更改为SOCK_STREAM

(2)然后调用connect函数调用服务器,将服务器的ip地址个port端口以元组形式传进connect里

(3)接着进行收发信息操作:发信息用send函数

(4)最后关闭套接字

import socketdef main():# 1创建tcp套接字tcp_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 2链接服务器server_ip = input("请输入需要连接的服务器的ip:")server_port = int(input("请输入此服务器的port:"))server_addr = (server_ip,server_port)tcp_socket.connect(server_addr)# 3发送/接收数据send_data = input("请输入要发送的信息:")tcp_socket.send(send_data.encode("utf-8"))# 4关闭套接字tcp_socket.close()if __name__ == '__main__':main()

2.tcp-server青春版

(1)创建套接字

(2)绑定本地port,因为是服务器所以要帮定

(3)调用listen(128)函数,是服务器处于被动状态,128可以理解为最多有128个客户端链接

(4)等待客户端的链接,调用accept()函数,会有两个返回值,第一个表示客户端的套接字,第二个是客户端的地址信息

(5)调用recv(1024)接受信息1024是最大字节数

(6)调用send()回信息,告诉客户端链接成功

(7)关闭套接字,先关闭客户端的套接字,然后是服务器的套接字

import socketdef main():# 1 买个手机 (创建套接字)tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 2 插个手机卡 (绑定本地信息 bind)tcp_server_socket.bind(("",7890))# 3 将手机设置为响铃模式(让默认的套接字由主动变为被动listen)tcp_server_socket.listen(128)# 4 等待别人打电话 (等待客户端的链接accept)print("-----1-----")new_client_socket,client_addr = tcp_server_socket.accept()print("-----2-----")print(client_addr)# 接受客户短发送过来的请求recv_data = new_client_socket.recv(1024)print(recv_data)# 回送一部分数据给客户端new_client_socket.send("hhhhhh-----ok------".encode('utf-8'))# 关闭套接字new_client_socket.close()tcp_server_socket.close()if __name__ == "__main__":main()

3.循环为多个客户端服务的服务器青春版

(1)创建套接字

(2)绑定ip和port ip一般用空字符串表示

(3)调用listen

(4)进入循环,accept循环接受客户端的链接,客户端的套接字关闭也要在循环里

(5)关闭服务器的套接字

import socketdef main():# 创建套接字tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 绑定ip 和 porttcp_server_socket.bind(("",4111))# 被动接受数据tcp_server_socket.listen(128)while True:print("-----1-----\n等待一个客户端到来")# 接收数据new_client_socket,client_addr = tcp_server_socket.accept()print("-----2-----\n客户端来喽")print(f"地址为:{client_addr}")# 读取信息recv_data = new_client_socket.recv(1024)# 回送信息给客户端print(f"信息是:{recv_data}")new_client_socket.send("----给我成功----".encode('utf-8'))# 关闭套接字new_client_socket.close()print("服务结束")tcp_server_socket.close()if __name__ == "__main__":main()~

4.升级版的循环服务的服务器

大体步骤和3类似,循环体内又加了一个循环,recv_data = new_cilent_socket.recv(1024)

为空时,表示客户端断开链接,升级版可以一直接收一个客户端的信息。

import socketdef main():# 创建套接字tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 绑定ip 和 porttcp_server_socket.bind(("",4111))# 被动接受数据tcp_server_socket.listen(128)while True:print("-----1-----\n等待一个客户端到来")# 接收数据new_client_socket,client_addr = tcp_server_socket.accept()print("-----2-----\n客户端来喽")print(f"地址为:{client_addr}")while True:# 读取信息recv_data = new_client_socket.recv(1024)# 回送信息给客户端print(f"客户端的请求是:{recv_data}")if recv_data:new_client_socket.send("----给我成功----".encode('utf-8'))else:break# 关闭套接字new_client_socket.close()print("服务结束")tcp_server_socket.close()if __name__ == "__main__":main()

5.案例-文件下载器

先是客户端

(1)创建套接字。。。。。。。。到第四步

(4)获取文件的名字

(5)将文件的名字发送到服务器里

(6)等待服务器响应

(7)移步服务器

import socketdef main():# 1.创建套接字tcp_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 2.获取服务器ip portdest_ip = input("请输入下载服务器的ip:")dest_port = int(input("请输入下载服务器的port:"))# 3.链接服务器tcp_socket.connect((dest_ip,dest_port))# 4.获取下载的文件名称download_file_name = input("请输入要下载的文件名字:")# 5.将文件名字发送到服务器tcp_socket.send(download_file_name.encode("utf-8"))# 6.接收文件中的数据recv_data = tcp_socket.recv(1024) # 1024 * 1024 = 1Mif recv_data:# 7.保存接收到的数据到一个文件中with open("[新]" + download_file_name,'wb') as f:f.write(recv_data)else:print("没有要下载的文件:")# 8.关闭套接字tcp_socket.close()if __name__ == "__main__":main()~ ~

服务器:

循环体内等待客户端链接,接收到一个客户端,send_file_2_client里接收服务器名字,先把文件内容定义为空,try内,打开文件,如果没有要下载的文件就打印结果,最后if 内,如果有文件就将内容发送给客户端,客户端接收到内容,就创建一个文件并将内容写入,否则print("没有要下载的文件"),最后关闭套接字

import socketdef send_file_2_client(new_client_socket,client_addr):# 读取信息# 1.接受客户端需要的文件名file_name = new_client_socket.recv(1024).decode('utf-8')file_content = None# 2.打开这个文件,读取数据try:f = open(file_name,'rb')file_content = f.read()f.close()except Exception as ret:print("没有要下载的文件(%s)" % file_name)# 3.发送文件的数据给客户端if file_content:print(f"客户端{client_addr}需要下载的文件是:{file_name}")new_client_socket.send(file_content)def main():# 创建套接字tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 绑定ip 和 porttcp_server_socket.bind(("",4111))# 被动接受数据tcp_server_socket.listen(124)while True:# 接收数据new_client_socket,client_addr = tcp_server_socket.accept()print(f"地址为:{client_addr}")# 调用函数完成服务 send_file_2_client(new_client_socket,client_addr)# 关闭套接字new_client_socket.close()tcp_server_socket.close()if __name__ == "__main__":main()~

二.多任务

1.多任务(线程实现)

1.多任务-线程

定义两个函数sing和dance

通过导入模块threading

里面有个类Thread()有两个参数,第一个表示目标函数,第二个表示函数的参数,用元组表示

创建对象后调用start来实现两个函数并发执行

import timeimport threadingdef sing():'''正在唱歌'''for i in range(5):print("-----正在唱歌-----")time.sleep(1)def dance():'''正在跳舞'''for i in range(10):print('-----正在跳舞-----')time.sleep(1)def main():t1 = threading.Thread(target = sing)t2 = threading.Thread(target = dance)t1.start()t2.start()while True:# 查看线程数 print(threading.enumerate())if len(threading.enumerate()) == 1:breaktime.sleep(1)if __name__ == "__main__":main()~ ~

2.继承Thread类完成多任务

import threadingimport timeclass Mythread(threading.Thread):def run(self):self.login()self.register()for i in range(3):time.sleep(1)msg = "I'm" + self.name + ' @ ' +str(i)print(msg)def login(self):for i in range(5):print("正在等录" + str(i))time.sleep(1)def register(self):for i in range(10):print("正在注册" + str(i))time.sleep(1)if __name__ == "__main__":t = Mythread()t.start()~

3.函数里修改全局变量

调用threading的方法,多线程之间共享全局变量

import threadingimport timenums = [11,22,33]def test1(temp):temp.append(44)print(f"{temp}")def test2(temp):temp.append(44)print(temp)if __name__ == "__main__":t1 = threading.Thread(target = test1,args = (nums,))t2 = threading.Thread(target = test2,args = (nums,))t2.start()t1.start()

4.互斥锁解决资源竞争问题

使用多线程对全局变量进行修改时,如果调用次数很多,会产生资源竞争问题,

可以定义一个互斥锁mutex = threading.Lock()

acquire表示上锁,release表示解锁

上锁到解锁之间只允许一个线程运行

import timeimport threadingg_num = 0mutex = threading.Lock()def test1(num):global g_nummutex.acquire()for i in range(num):g_num += 1mutex.release()print("----------in test1 g_num = %d----------" % g_num)def test2(num):global g_nummutex.acquire()for i in range(num):g_num += 1mutex.release()print("----------in test2 g_num = %d----------" % g_num)def main():t1 = threading.Thread(target = test1,args = (1000000,))t2 = threading.Thread(target = test2,args = (1000000,))t1.start()t2.start()time.sleep(5)print("----------in main g_num = %d----------" % g_num)if __name__ == "__main__":main()

5.多任务实现udp聊天器

定义两个函数,一个接收信息,一个发送信息,之前的聊天器很死板,只能发一次收一次,有了多任务,就可以实现收发一起执行,不一样的是,最后不能关闭套接字,因为不知到什么时候聊天结束。

import socketimport threadingdef recv_meassage(udp_socket):'''接收信息并显示'''while True:recv_data = udp_socket.recvfrom(1024)recv_msg = recv_data[0].decode('utf-8')recv_addr = recv_data[1]print(f"\n{recv_addr}发来了信息:{recv_msg}")def send_msg(udp_socket ,dest_ip, dest_port):'''发送信息'''while True:send_data = input("请输入您要发送的信息:")udp_socket.sendto(send_data.encode('utf-8'), (dest_ip, dest_port))def main():# 创建套接字udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)# 绑定端口udp_socket.bind(('',3555))dest_ip = input("请输入对方的ip地址:")dest_port = int(input("请输入对方ip地址的端口port:"))t_send = threading.Thread(target = send_msg, args = (udp_socket, dest_ip, dest_port))t_recv = threading.Thread(target = recv_meassage, args = (udp_socket,))t_send.start()t_recv.start()# 关闭套接字# udp_socket.close()if __name__ == "__main__":main()————————————————版权声明:本文为CSDN博主「小白来巡山^_^」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。原文链接:/m0_58613430/article/details/124160097

1.多任务(进程实现)

1.使用进程完成多任务

与线程类似,进程实现多任务需要导入模块 multiprocessing

不过,进程之间不共享全局变量,Process这个类和Thread一样,大致流程与线程实现多任务类似。

import timeimport multiprocessingnum = 0def test1(n):global numfor i in range(n):num += 1print(f"1------{i}------")def test2(n):global numfor i in range(n):num += 1print(f"2-----{i}-----")def main():p1 = multiprocessing.Process(target = test1, args = (100,))p2 = multiprocessing.Process(target = test2, args = (200,))p1.start()p2.start()time.sleep(10)print(num)if __name__ == "__main__":main()

2.利用队列完成多任务进程之间的通信

需要创建一个实例对象q = multiprocessing.Queue()

并把q当作参数传进需要完成通信的子进程中去,队列是先进先出,向队列传数据用put,取数据用get,可以用p.empty()判断队列是否为空,p.full()判断队列是否满

import multiprocessingdef download_from_web(q):'''下载数据'''data = [11,22,33,44]#相队列中写入数据for temp in data:q.put(temp)print(f'{data}-----下载器已经下载完了数据并存入到队列中-----')def analysis_data(q):'''数据处理'''# 从队列中获取数据病并模拟处理waitting_analysis_data = list()while True:data = q.get()waitting_analysis_data.append(data)if q.empty():breakprint(waitting_analysis_data)def main():# 1.创建一个队列q = multiprocessing.Queue()p1 = multiprocessing.Process(target = download_from_web, args = (q,))p2 = multiprocessing.Process(target = analysis_data ,args = (q,))p1.start()p2.start()if __name__ =="__main__":main()

3.案例-复制文件jia里所有文件青春版

用到了os模块里的mkdir创建文件夹,参数为文件夹的名字

os.listdir()获取名称为参数名称的文件夹里的所有文件名称,返回值是一个列表

引入新东西,进程池

po = multiprocessing.Pool(5)

参数表示一个进程池里最多可yi容纳五个进程

调用方法po.apply_async(copy_file ,args = (''''''''))向进程池里传入进程

close()关闭

join()让进程池里的进程执行完

函数里面 显示读取文件里的内容,再把内容写进新文件夹里文件里,完成拷贝

import osimport multiprocessingdef copy_file(file_name, new_folder_name, old_folder_name):print(file_name)f1 = open(old_folder_name + '/' + file_name, 'rb')f_content = f1.read()f1.close()f2 = open(new_folder_name + '/' + file_name,'wb')f2.write(f_content)f2.close()def main():# 获取要下载的文件夹的名字old_folder_name = input("请输入要复制的文件夹的名字:")# 创建一个新的文件夹new_folder_name = old_folder_name + "[复件]"try:os.mkdir(new_folder_name)except:pass# 获取文件夹里所有文件的名字file_names = os.listdir(old_folder_name)# 创建进程池po = multiprocessing.Pool(5)# 向进程chi里添加copy文件的任务for file_name in file_names:po.apply_async(copy_file, args = (file_name, new_folder_name, old_folder_name))po.close()po.join()# 复制原文件夹的文件,到新文件夹里去if __name__ == "__main__":main()

4.案例-带进度条的文件夹拷贝器

要想实现进度条,实现主进程和子进程之间的通信,要用到队列,multiprocessing.Manager().Queue(),将队列以参数形式传入函数里,没下载好一个文件就put以下,主进程里,每get一下就输出一个进度,进度值为下载好的文件数除以总的文件数,不用调用join()因为循环一定能保证子进程执行完毕,最后break

mport osimport multiprocessingdef copy_file(q, file_name, new_folder_name, old_folder_name):# print(file_name)f1 = open(old_folder_name + '/' + file_name, 'rb')f_content = f1.read()f1.close()f2 = open(new_folder_name + '/' + file_name, 'wb')f2.write(f_content)f2.close()# 如果拷贝完了队列就像队列中写入消息q.put(file_name)def main():# 获取要下载的文件夹的名字old_folder_name = input("请输入要复制的文件夹的名字:")# 创建一个新的文件夹new_folder_name = old_folder_name + "[复件]"try:os.mkdir(new_folder_name)except:pass# 获取文件夹里所有文件的名字file_names = os.listdir(old_folder_name)# 创建进程池po = multiprocessing.Pool(5)# 创建队列q = multiprocessing.Manager().Queue()# 向进程chi里添加copy文件的任务for file_name in file_names:po.apply_async(copy_file, args = (q, file_name, new_folder_name, old_folder_name))po.close()# po.join()all_file_num = len(file_names)now_file_num = 0while True:q.get()now_file_num += 1print('拷贝进度:%.2f%%' % (now_file_num * 100 / all_file_num))if now_file_num >= all_file_num:break# 复制原文件夹的文件,到新文件夹里去if __name__ == "__main__":main()

3.多任务(协程实现)

1.迭代器-自己定义的类进行迭代青春版

from collections import Iterable,Iteratorimport timeclass Classmate(object):def __init__(self):self.names = list()def add(self,name):self.names.append(name)def __iter__(self):return ClassIterator(self)class ClassIterator():def __init__(self,obj):self.obj = objdef __iter__(self):passdef __next__(self):return self.obj.names[0]classmate = Classmate()classmate.add('张三')classmate.add('李四')classmate.add('王五')# print("判断classmate是否是可以迭代的对象:", isinstance(classmate, Iterable))# classmate_iterator = iter(classmate)# print("判断classmate_iterator是否是迭代器:", isinstance(classmate_iterator, Iterator))# print(next(classmate_iterator))for temp in classmate:print(temp)time.sleep(1)~

2.迭代器-自己的类进行迭代

将Classmate类里添加 __iter__ 和 __next__类 完成迭代器

from collections.abc import Iterable,Iteratorimport timeclass Classmate(object):def __init__(self):self.names = list()self.current_num = 0def add(self,name):self.names.append(name)def __iter__(self):return selfdef __next__(self):if self.current_num < len(self.names):ret = self.names[self.current_num]self.current_num += 1return retelse:raise StopIterationclassmate = Classmate()classmate.add('张三')classmate.add('李四')classmate.add('王五')# print("判断classmate是否是可以迭代的对象:", isinstance(classmate, Iterable))# classmate_iterator = iter(classmate)# print("判断classmate_iterator是否是迭代器:", isinstance(classmate_iterator, Iterator))# print(next(classmate_iterator))for temp in classmate:print(temp)time.sleep(1)~ ~

3.迭代器实现多任务(Fibonacci为例子)

迭代器的优点:相对于list()创建列表,迭代器占用的内存非常小

class Fibonacci():def __init__(self,all_num):self.all_num = all_numself.a = 0self.b = 1self.current_num = 0def __iter__(self):return selfdef __next__(self):if self.all_num > self.current_num:ret = self.aself.a ,self.b = self.b, self.a + self.bself.current_num += 1return retelse:raise StopIterationf = Fibonacci(10)for num in f:print(num)

4.生成器-实现斐波那契数列

在函数里只要有yield就不是函数了,是生成器,创建对象obj,调用next(obj)调取a的值

def create_num(all_num):print('------1------')a, b = 0, 1current_num = 0while current_num < all_num:print('------2------')yield a# print(a)print('------3------')a, b = b, a + bcurrent_num += 1# 如果函数有一个yield语句,这个函数就不是函数了,成为了一个生成器的模板,调用create_num时,生成器是一种特殊的迭代器,创建了一个生成器的对象obj = create_num(10)ret = next(obj)print(ret)ret = next(obj)print(ret)# for i in obj:#print(i)~ ~

5.生成器的一些知识

循环读取数据,当读不出来是,抛出一场 e.value为生成器的返回值

def create_num(all_num):print('------1------')a, b = 0, 1current_num = 0while current_num < all_num:print('------2------')yield a# print(a)print('------3------')a, b = b, a + bcurrent_num += 1return 'ok------ok'# 如果函数有一个yield语句,这个函数就不是函数了,成为了一个生成器的模板,调用create_num时,生成器是一种特殊的迭代器,创建了一个生成器的对象obj = create_num(10)while True:try:ret = next(obj)print(ret)except Exception as e:print(e.value)break# for i in obj:#print(i)

6.生成器send()实现多任务

send和next差不多,但是 通过向send传参数, ret = yield a可以收到参数值

def create_num(all_num):print('------1------')a, b = 0, 1current_num = 0while current_num < all_num:print('------2------')ret = yield aprint("ret:",ret)# print(a)print('------3------')a, b = b, a + bcurrent_num += 1return 'ok------ok'# 如果函数有一个yield语句,这个函数就不是函数了,成为了一个生成器的模板,调用create_num时,生成器是一种特殊的迭代器,创建了一个生成器的对象obj = create_num(10)ret = next(obj)print(ret)ret = obj.send('wocaowocaowocao')print(ret)# for i in obj:#print(i)~ ~

7.通过yield完成多任务

看看就能懂

import timedef test1():while True:print("-----1-----")time.sleep(0.1)yielddef test2():while True:print("-----2-----")time.sleep(0.1)yielddef main():t1 = test1()t2 = test2()while True:next(t1)next(t2)if __name__ == "__main__":main()~ ~

8.greenlet多任务

调用实例对象后,在两个函数里switch()完成多任务,并发执行

from greenlet import greenletimport timedef test1():while True:print("-----1-----")gr2.switch()time.sleep(0.5)def test2():while True:print("-----2-----")gr1.switch()time.sleep(0.5)gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch()

9.gevent完成多任务

这个不常用

import geventimport timedef f(n):for i in range(n):print(gevent.getcurrent(),i)# time.sleep(0.5)gevent.sleep(0.5)# ( 函数名,参数)g1 = gevent.spawn(f,5)g2 = gevent.spawn(f,5)g3 = gevent.spawn(f,5)# join遇到了延时,等待g1运行完,延时时还能执行其他的g1.join()g2.join()g3.join()

10.gevent多任务-2

这个常用

import geventimport timefrom gevent import monkey# 打补丁:把当前代码所有耗时的代码换成geventmonkey.patch_all()def f(n):for i in range(n):print(gevent.getcurrent(),i)time.sleep(0.5)# gevent.sleep(0.5)# ( 函数名,参数)# g1 = gevent.spawn(f,5)# g2 = gevent.spawn(f,5)# g3 = gevent.spawn(f,5)# join遇到了延时,等待g1运行完,延时时还能执行其他的# g1.join()# g2.join()# g3.join()# joinall更常用gevent.joinall([gevent.spawn(f,5),gevent.spawn(f,5),gevent.spawn(f,5)])

11.案例-图片下载器

看看就懂

import urllib.requestimport geventfrom gevent import monkeymonkey.patch_all()def downloader(img_name, img_url):req = urllib.request.urlopen(img_url)img_content = req.read()with open(img_name, "wb") as f:f.write(img_content)def main():gevent.joinall([gevent.spawn(downloader, '3.jpg', '/live-cover/roomCover//01/14/39e88aa4a2576eb6f0b64b71aa7730ef_big.png/dy1'),gevent.spawn(downloader, '4.jpg', '/bfs/archive/9cbd84d68b1ebe28e39229cbe02ce7f54e8c2365.jpg@672w_378h_1c')])if __name__ == "__main__":main()

三.web服务器v3.1

1.正则表达式简介

直接上案例

1.正则判断变量名是否合理

import redef main():while True:name = input("请输入变量名称:(输入quit退出)")# match自动判断开头# ret = re.match(r"[a-zA-Z_][a-zA-Z_0-9]*$",name)ret = re.match(r"^[a-zA-Z_][a-zA-Z_0-9]*$",name)if name == 'quit':breakelif ret:print("成功%s" %ret.group())else:print("失败喽!")if __name__ == "__main__":main()

2.正则判断常用邮箱地址

import redef main():while True:email_name = input("请输入变量名称:(输入quit退出)")# match自动判断开头,\使需要的特殊字符转义ret = re.match(r"^[a-zA-Z_0-9]{4,20}@(163|126|qq|gmail)\.com$",email_name)if email_name == 'quit':breakelif ret:print("成功%s" %ret.group())else:print("失败喽!")if __name__ == "__main__":main()

2.http

1.实现简单的http服务器

浏览器相当于客户端,向服务器提出请求

服务器回送浏览器需要的数据

import socketdef service_client(new_socket):'''为这个客户端返回数据'''# 接受浏览器发送的请求(http请求)request = new_socket.recv(1024)print(request)# 返回http格式的数据给浏览器# 准备发送给浏览器的数据 -----header \r\n表示浏览器可以识别的换行response = "HTTP/1.1 200 OK\r\n"response += "\r\n"# 准备发送给浏览器的数据 -----bodyresponse += "<h1>this is a test.</h1>"new_socket.send(response.encode("utf-8"))#关闭new_socket.close()def main():# 创建套接字tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 绑定tcp_server_socket.bind(("",7890))# 变为监听套接字tcp_server_socket.listen(128)while True:# 等待客户链接new_socket, client_addr = tcp_server_socket.accept()# 服务service_client(new_socket)# 关闭tcp_server_socket.close()if __name__ == "__main__":main()

2.根据客户端需要完成http服务器

import socketdef service_client(new_socket):'''为这个客户端返回数据'''# 接受浏览器发送的请求(http请求)request = new_socket.recv(1024)print(request)print(">>>>>>>>>>" * 4) # 返回http格式的数据给浏览器# 准备发送给浏览器的数据 -----header \r\n表示浏览器可以识别的换行response = "HTTP/1.1 200 OK\r\n"response += "\r\n"# 准备发送给浏览器的数据 -----body# response += "<h1>this is a test.</h1>"with open("./html/index.html",'rb') as f:f_content = f.read()new_socket.send(response.encode("utf-8"))new_socket.send(f_content)#关闭new_socket.close()def main():# 创建套接字tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 重复利用同一个端口tcp_server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)# 绑定tcp_server_socket.bind(("",7890))# 变为监听套接字tcp_server_socket.listen(128)while True:# 等待客户链接new_socket, client_addr = tcp_server_socket.accept()# 服务service_client(new_socket)# 关闭tcp_server_socket.close()if __name__ == "__main__":main()

3.返回index页面http

import socketimport redef service_client(new_socket):'''为这个客户端返回数据'''# 接受浏览器发送的请求(http请求)request = new_socket.recv(1024).decode('utf-8')# print(request)# print(">>>>>>>>>>" * 4) # 返回http格式的数据给浏览器request_lines = request.splitlines()print("")print(">" * 20)print(request_lines)file_name = ''ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])if ret:file_name = ret.group(1)# print("*" * 50, file_name)if file_name == '/':file_name = '/index.html'# 准备发送给浏览器的数据 -----body# response += "<h1>this is a test.</h1>"try:with open("./html" + file_name, 'rb') as f:f_content = f.read()except:response = "HTTP/1.1 404 NOT FOUND\r\n"response += "\r\n"response += "-----file not found-----"new_socket.send(response.encode('utf-8'))else:# 准备发送给浏览器的数据 -----header \r\n表示浏览器可以识别的换行response = "HTTP/1.1 200 OK\r\n"response += "\r\n"new_socket.send(response.encode("utf-8"))new_socket.send(f_content)#关闭new_socket.close()def main():# 创建套接字tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 重复利用同一个端口tcp_server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)# 绑定tcp_server_socket.bind(("",7890))# 变为监听套接字tcp_server_socket.listen(128)while True:# 等待客户链接new_socket, client_addr = tcp_server_socket.accept()# 服务service_client(new_socket)# 关闭tcp_server_socket.close()if __name__ == "__main__":main()

4.使用多进程完成http服务器

import multiprocessingimport socketimport redef service_client(new_socket):'''为这个客户端返回数据'''# 接受浏览器发送的请求(http请求)request = new_socket.recv(1024).decode('utf-8')# print(request)# print(">>>>>>>>>>" * 4) # 返回http格式的数据给浏览器request_lines = request.splitlines()print("")print(">" * 20)print(request_lines)file_name = ''ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])if ret:file_name = ret.group(1)# print("*" * 50, file_name)if file_name == '/':file_name = '/index.html'# 准备发送给浏览器的数据 -----body# response += "<h1>this is a test.</h1>"try:with open("./html" + file_name, 'rb') as f:f_content = f.read()except:response = "HTTP/1.1 404 NOT FOUND\r\n"response += "\r\n"response += "-----file not found-----"new_socket.send(response.encode('utf-8'))else:# 准备发送给浏览器的数据 -----header \r\n表示浏览器可以识别的换行response = "HTTP/1.1 200 OK\r\n"response += "\r\n"new_socket.send(response.encode("utf-8"))new_socket.send(f_content)#关闭new_socket.close()def main():# 创建套接字tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 重复利用同一个端口tcp_server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)# 绑定tcp_server_socket.bind(("",7777))# 变为监听套接字tcp_server_socket.listen(128)while True:# 等待客户链接new_socket, client_addr = tcp_server_socket.accept()# 服务p = multiprocessing.Process(target = service_client, args = (new_socket,))p.start()new_socket.close()# 关闭tcp_server_socket.close()if __name__ == "__main__":main()

5.使用多线程完成http

import threadingimport socketimport redef service_client(new_socket):'''为这个客户端返回数据'''# 接受浏览器发送的请求(http请求)request = new_socket.recv(1024).decode('utf-8')# print(request)# print(">>>>>>>>>>" * 4) # 返回http格式的数据给浏览器request_lines = request.splitlines()print("")print(">" * 20)print(request_lines)file_name = ''ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])if ret:file_name = ret.group(1)# print("*" * 50, file_name)if file_name == '/':file_name = '/index.html'# 准备发送给浏览器的数据 -----body# response += "<h1>this is a test.</h1>"try:with open("./html" + file_name, 'rb') as f:f_content = f.read()except:response = "HTTP/1.1 404 NOT FOUND\r\n"response += "\r\n"response += "-----file not found-----"new_socket.send(response.encode('utf-8'))else:# 准备发送给浏览器的数据 -----header \r\n表示浏览器可以识别的换行response = "HTTP/1.1 200 OK\r\n"response += "\r\n"new_socket.send(response.encode("utf-8"))new_socket.send(f_content)#关闭new_socket.close()def main():# 创建套接字tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 重复利用同一个端口tcp_server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)# 绑定tcp_server_socket.bind(("",7777))# 变为监听套接字tcp_server_socket.listen(128)while True:# 等待客户链接new_socket, client_addr = tcp_server_socket.accept()# 服务t = threading.Thread(target = service_client, args = (new_socket,))t.start()# 多线程不需要所有的套接字都关闭# new_socket.close()# 关闭tcp_server_socket.close()if __name__ == "__main__":main()

6.协程完成http服务器

import geventfrom gevent import monkeyimport socketimport remonkey.patch_all()def service_client(new_socket):'''为这个客户端返回数据'''# 接受浏览器发送的请求(http请求)request = new_socket.recv(1024).decode('utf-8')# print(request)# print(">>>>>>>>>>" * 4) # 返回http格式的数据给浏览器request_lines = request.splitlines()print("")print(">" * 20)print(request_lines)file_name = ''ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])if ret:file_name = ret.group(1)# print("*" * 50, file_name)if file_name == '/':file_name = '/index.html'# 准备发送给浏览器的数据 -----body# response += "<h1>this is a test.</h1>"try:with open("./html" + file_name, 'rb') as f:f_content = f.read()except:response = "HTTP/1.1 404 NOT FOUND\r\n"response += "\r\n"response += "-----file not found-----"new_socket.send(response.encode('utf-8'))else:# 准备发送给浏览器的数据 -----header \r\n表示浏览器可以识别的换行response = "HTTP/1.1 200 OK\r\n"response += "\r\n"new_socket.send(response.encode("utf-8"))new_socket.send(f_content)#关闭new_socket.close()def main():# 创建套接字tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 重复利用同一个端口tcp_server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)# 绑定tcp_server_socket.bind(("",7777))# 变为监听套接字tcp_server_socket.listen(128)while True:# 等待客户链接new_socket, client_addr = tcp_server_socket.accept()# 服务gevent.spawn(service_client, new_socket)# 关闭tcp_server_socket.close()if __name__ == "__main__":main()

7.单进程单线程非堵塞并发完成http服务器

import socketdef service_client(new_socket):'''为这个客户端返回数据'''# 接受浏览器发送的请求(http请求)request = new_socket.recv(1024)print(request)print(">>>>>>>>>>" * 4) # 返回http格式的数据给浏览器# 准备发送给浏览器的数据 -----header \r\n表示浏览器可以识别的换行response = "HTTP/1.1 200 OK\r\n"response += "\r\n"# 准备发送给浏览器的数据 -----body# response += "<h1>this is a test.</h1>"with open("./html/index.html",'rb') as f:f_content = f.read()new_socket.send(response.encode("utf-8"))new_socket.send(f_content)#关闭new_socket.close()def main():# 创建套接字tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 重复利用同一个端口tcp_server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)# 绑定tcp_server_socket.bind(("",7890))# 变为监听套接字tcp_server_socket.listen(128)# 设置套接字为非堵塞的方式tcp_server_socket.setblocking(False)# 创建一个套接字列表new_socket_list = list()while True:try:# 等待客户链接new_socket, client_addr = tcp_server_socket.accept()except Exception as ret:print("-----没有客户端到来-----")else:print("---客户端到来---")new_socket.setblocking(False)new_socket_list.append(new_socket)for new_socket in new_socket_list:try:recv_data = new_socket.recv(1024)except Exception as ret:print("客户端没有发送数据")else:if recv_data:# 对方发送了数据print("接收到数据")else:# 对方调用了close()new_socket_list.remove(new_socket)new_socket.close()# 服务# service_client(new_socket)# 关闭tcp_server_socket.close()if __name__ == "__main__":main()

8.单进程单线程非堵塞长连接

import socketimport redef service_client(new_socket, request):'''为这个客户端返回数据'''# 接受浏览器发送的请求(http请求)# request = new_socket.recv(1024)# print(request)# print(">>>>>>>>>>" * 4) # 返回http格式的数据给浏览器# 准备发送给浏览器的数据 -----header \r\n表示浏览器可以识别的换行request_lines = request.splitlines()print('')print(">" * 20)print(request_lines)file_name = ''ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])if ret:file_name = ret.group(1)if file_name == '/':file_name = '/index.html'# 准备发送给浏览器的数据 -----body# response += "<h1>this is a test.</h1>"try:f = open("./html" + file_name, 'rb')except:response_body = "sorry,there are no file that you wanna."response_header = "HTTP/1.1 404 NOT FOUND\r\n"response_header += f'Content-Length:{len(response_body)}'response_header += '\r\n'response = response_header + response_bodynew_socket.send(response.encode('utf-8'))else:f_content = f.read()f.close()response_body = f_contentresponse_header = "HTTP/1.1 200 OK\r\n"response_header += 'Content-Length:%d\r\n' % len(response_body)response_header += '\r\n'response = response_header.encode('utf-8') + response_bodynew_socket.send(response)# 关闭# new_socket.close()def main():# 创建套接字tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 重复利用同一个端口tcp_server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)# 绑定tcp_server_socket.bind(("",7890))# 变为监听套接字tcp_server_socket.listen(128)# 非堵塞tcp_server_socket.setblocking(False)new_socket_list = list()while True:try:# 等待客户链接new_socket, client_addr = tcp_server_socket.accept()except Exception as ret:passelse:new_socket.setblocking(False)new_socket_list.append(new_socket)for client_socket in new_socket_list:try:recv_data = client_socket.recv(1024).decode('utf-8')except Exception as ret:passelse:if recv_data:service_client(client_socket, recv_data)else:client_socket.close()new_socket_list.remove(client_socket)# 服务# service_client(new_socket)# 关闭tcp_server_socket.close()if __name__ == "__main__":main()

9.epoll实现http服务器

import socketimport selectimport redef service_client(new_socket, request):'''为这个客户端返回数据'''# 接受浏览器发送的请求(http请求)# request = new_socket.recv(1024)# print(request)# print(">>>>>>>>>>" * 4) # 返回http格式的数据给浏览器# 准备发送给浏览器的数据 -----header \r\n表示浏览器可以识别的换行request_lines = request.splitlines()print('')print(">" * 20)print(request_lines)file_name = ''ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])if ret:file_name = ret.group(1)if file_name == '/':file_name = '/index.html'# 准备发送给浏览器的数据 -----body# response += "<h1>this is a test.</h1>"try:f = open("./html" + file_name, 'rb')except:response_body = "sorry,there are no file that you wanna."response_header = "HTTP/1.1 404 NOT FOUND\r\n"response_header += f'Content-Length:{len(response_body)}'response_header += '\r\n'response = response_header + response_bodynew_socket.send(response.encode('utf-8'))else:f_content = f.read()f.close()response_body = f_contentresponse_header = "HTTP/1.1 200 OK\r\n"response_header += 'Content-Length:%d\r\n' % len(response_body)response_header += '\r\n'response = response_header.encode('utf-8') + response_bodynew_socket.send(response)# 关闭# new_socket.close()def main():# 创建套接字tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 重复利用同一个端口tcp_server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)# 绑定tcp_server_socket.bind(("",7777))# 变为监听套接字tcp_server_socket.listen(128)# 非堵塞tcp_server_socket.setblocking(False)# 创建一个epoll对象epl = select.epoll()# 将监听套接字对应的fd注册到epoll中epl.register(tcp_server_socket.fileno(), select.EPOLLIN)fd_event_dict = dict()while True:fd_event_list = epl.poll() # 默认会堵塞,直到 os监测到数据到来 通过事件通知方式 告诉这个程序,此时才会解堵塞# [(df, event), (套接字对应的文件描述符,描述符到底是什么时间例如调用recv接收等)]for fd, event in fd_event_list:# 等待新客户端的链接if fd == tcp_server_socket.fileno():new_socket, client_addr = tcp_server_socket.accept()epl.register(new_socket.fileno(), select.EPOLLIN)fd_event_dict[new_socket.fileno()] = new_socketelif event == select.EPOLLIN:# 判断已经链接的客户端是否有数据发送过来recv_data = fd_event_dict[fd].recv(1024).decode('utf-8')if recv_data:service_client(fd_event_dict[fd], recv_data)else:fd_event_dict[fd].close()epl.unregister(fd)del fd_event_dict[fd]# 服务# service_client(new_socket)# 关闭tcp_server_socket.close()if __name__ == "__main__":main()

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。