1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > Python多线程——队列(Queue)

Python多线程——队列(Queue)

时间:2022-04-17 04:15:37

相关推荐

Python多线程——队列(Queue)

一、 Queue作用:主要就是为多线程生产值、消费者之间线程通信提供服务,具有先进先出的数据结构。

1、首先我们组要明白为什么要使用队列,队列的性质,

多线程并发编程的重点,是线程之间共享数据的访问问题和线程之间的通信问题,为了解决线程之间数据共享问题,PYTHON 提供了一个数据类型【队列】可以用于在多线程并发模式下,安全的访问数据而不会造成数据共享冲突。正常请求的多线程,如果是消费之和生产者,通过列表实现,多线程会对列表中的数据取值,会出现同时访问列表数据的情况,这时候就需要对线程进行加锁或者是线程等待,手动进行解决,过于麻烦,但是队列会通过先进先出或者先进后出的模式,保证了单个数据不会进行同时被多个线程进行访问。

1.1 FIFO

Queue.Queue(maxsize=0)FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者 等于0,队列大小没有限制。

1.2 LIFO

Queue.LifoQueue(maxsize=0)LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上priorityclass Queue.PriorityQueue(maxsize=0)构造一个优先队列。maxsize用法同上。

1.3 基本方法:

Queue.Queue(maxsize=0) #FIFO, 用来定义队列的长度,如果maxsize小于1就表示队列长度无限,Queue.LifoQueue(maxsize=0) #LIFO, 如果maxsize小于1就表示队列长度无限Queue.qsize() #返回队列的大小Queue.empty() #如果队列为空,返回True,反之False ,在线程间通信的过程中,可以通过此来给消费者等待信息Queue.full() # 如果队列满了,返回True,反之False,给生产者提醒Queue.get([block[, timeout]]) 读队列,timeout等待时间Queue.put(item, [block[, timeout]]) 写队列,timeout等待时间Queue.queue.clear() 清空队列task_done()#意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。join()#阻塞调用线程,直到队列中的所有任务被处理掉。只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done((意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

二、应用案例

2.1 队列需求一(爬虫的请求地址)

Python多线程主要是为了提高程序在IO方面的优势,在爬虫的过程中显得尤为重要。正常的爬虫请求直接封装多线程就ok,但是爬虫请求的过程中,对于url的请求需要通过队列来实现,这是队列的需求之一。

对于爬虫的请求地址来说,一般是有规律性可循的,如果是翻页数据,可以将请求到的url放到队列中,通过多线程对队列进行取数据,如果队列为空,线程判断自动等待,循环加入队列url,线程自动请求,以下伪代码,作为参考:

import threadingfrom queue import Queueclass ThreadCrawl(threading.Thread):def __init__(self, threadName, idQueue):# 继承父类的方法super(ThreadCrawl, self).__init__()self.threadName = threadName# 线程名字def run(self):print('启动' + self.threadName)while not self.idQueue.empty():try:id = self.idQueue.get(False) # False 如果队列为空,抛出异常time.sleep(1)print("~"*300)self.get_con(id)except Exception as e:print('队列为空。。。。。', e)passprint('#'*300)def get_con(self): #自己封装的请求自定义passdef get_id(m, n):conn = psycopg2.connect(database='postgres', user='postgres', password='123456', host='127.0.0.1', port='5432')cur = conn.cursor()sql1 = 'SELECT doc_id from id LIMIT {} offset {};'.format(m, n)cur.execute(sql1)data = cur.fetchall()mit()return datadef main():n = 60while True:m = 20# m是固定值,一次去20条, n是第几条开始print('开始采集n的值为', n)if n == 200000:break# id的队列idQueue = Queue(20)if idQueue.empty():data = get_id(m, n)for i in data:idQueue.put(i[0])#采集线程的数量crawlList = []for id in range(1, 2):name = '采集线程{}'.format(id)crawlList.append(name)# 存储采集线程的列表集合threadcrawl = []for threadName in crawlList:thread = ThreadCrawl(threadName, idQueue)thread.start()threadcrawl.append(thread)for thread in threadcrawl:thread.join()n = n + mprint("主线程退出..............")if __name__ == '__main__':main()

以上代码是作者从数据库中取数据,间隔性取,让后拼装到url,进行请求

2.2 队列需求二(生产者、消费者模型)

import threadingimport timefrom queue import Queuedef put_id():i = 0while True:i = i + 1print("添加数据", i, id_queue.qsize())time.sleep(0.1)id_queue.put(i)def get_id(m):while True:i = id_queue.get()print("线程", m, '取值', i)if __name__ == "__main__":id_queue = Queue(20)Th1 = threading.Thread(target=put_id, )Th2 = threading.Thread(target=get_id, args=(2, ))Th3 = threading.Thread(target=get_id, args=(3, ))Th5 = threading.Thread(target=get_id, args=(4, ))Th4 = threading.Thread(target=get_id, args=(5, ))Th2.start()Th1.start()Th3.start()Th4.start()Th5.start()

作者:白白_嫩嫩

链接:/p/e30d302ebdeb

来源:简书

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。