最近在写爬虫程序爬取亚马逊上的评论信息,因此也自学了很多爬虫相关的知识,其实网络上已经有很多基于Python的入门爬虫程序了,所以学习起来比较方便,唯独那个多线程爬虫一直都学的不是很明白,所以就写下这篇blog记录一下学到的一些东西(主要是对自己所学的一些东西进行整理和总结)。
一、Python的多线程
Python多线程网上的介绍很多了,但是一直都听说Python的多线程很鸡肋,为什么呢?为什么有人说 Python 的多线程是鸡肋呢?里面的多位大佬已经做出了解释,其实就是因为Python多线程用到了全局解释器锁(GIL锁),这里引用一位大佬的回答 @DarrenChan陈驰:
Python代码的执行由Python虚拟机(解释器)来控制。Python在设计之初就考虑要在主循环中,同时只有一个线程在执行,就像单CPU的系统中运行多个进程那样,内存中可以存放多个程序,但任意时刻,只有一个程序在CPU中运行。同样地,虽然Python解释器可以运行多个线程,但同一时间只有一个线程在解释器中运行。
对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同时只有一个线程在运行。在多线程环境中,Python虚拟机按照以下方式执行。
1.设置GIL。
2.切换到一个线程去执行。
3.运行。
4.把线程设置为睡眠状态。
5.解锁GIL。
6.再次重复以上步骤。
对所有面向I/O的(会调用内建的操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果某线程并未使用很多I/O操作,它会在自己的时间片内一直占用处理器和GIL。也就是说,I/O密集型的Python程序比计算密集型的Python程序更能充分利用多线程的好处。
关于python的锁机制,还有一篇博文 Python并发编程之谈谈线程中的“锁机制”(三) 值得大家去看看,里面详细介绍了常用的python锁,同时也附了一些代码,挺不错。
二、线程安全的队列 Queue
队列这种东西大家应该都知道,就是一个先进先出的数据结构,而Python的标准库中提供了一个线程安全的队列,也就是说该模块是适用于多线程编程的先进先出(first-in,first-out,FIFO)数据结构,可以用来在生产者消费者线程之间安全地传递消息或其他数据。它会为调用者处理锁定,使用多个线程可以安全地处理同一个 Queue 实例。Queue 的大小(其中包含的元素个数)可能要受限,以限制内存使用或处理。
在Python 3中要引入Queue和Python 2中引入Queue是不同,引入方式如下:
#python 2
import Queue
# python 3
from queue import Queue
因为是线程安全的,很自然就可以利用Queue来实现一个多线程爬虫咯,而Queue的一些常见操作如下:
# 实例化一个队列,可以在指定队列大小
q = Queue.Queue()
q_50 = Queue.Queue(50) # 指定一个长度为50的队列
# 入队一个数据data
q.put(data)
# 出队并赋值给item
item = q.get()
# 判断队列是否为空,是否满
if q.empty():
print('队列为空')
if q.full():
print('队列满')
除了普通队列,标准库中还有优先队列和后进先出队列这两个队列,分别为LifoQueue和PriorityQueue,其引用方式与Queue类似,使用方法参考 [Python标准库]Queue——线程安全的 FIFO 实现
三、基于多线程爬虫爬取糗事百科的段子
下面进入实战的一个代码,代码的理解也相对简单,相信经过这个代码,大家也可以自行写出一个多线程爬虫。
'''整体的思路:1、构造任务队列pageQueue ,存放所有要爬取的页面url2、用多线程爬虫从糗事百科上抓取糗事,然后将抓取的页面内容存放到data_queue中3、用多线程程序对data_queue中的页面内容进行解析,分别提取 糗事的图片url,糗事的题目和糗事内容,然后存放到的json文件中(一个时间点只有一个线程可以写文件IO,注意到Python的多线程机制使用了GIL锁)'''
import requests
from lxml import etree
from queue import Queue
import threading
import json
'''Queue.qsize(队列名) #返回队列的大小Queue.empty(队列名) # 队列为空返回true,否则为falseQueue.full(队列名) # 队列满返回trueQueue.get(队列名,值) # 出队Queue.put(队列名,值) # 入队FIFO 先进先出'''
class Crawl_thread(threading.Thread):
'''抓取线程类,注意需要继承线程类Thread'''
def __init__(self,thread_id,queue):
threading.Thread.__init__(self) # 需要对父类的构造函数进行初始化
self.thread_id = thread_id
self.queue = queue # 任务队列
def run(self):
'''线程在调用过程中就会调用对应的run方法:return:'''
print('启动线程:',self.thread_id)
self.crawl_spider()
print('退出了该线程:',self.thread_id)
def crawl_spider(self):
while True:
if self.queue.empty(): #如果队列为空,则跳出
break
else:
page = self.queue.get()
print('当前工作的线程为:',self.thread_id," 正在采集:",page)
url = '/Shr/page/{}/'.format(str(page))
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3371.0 Safari/537.36'
}
try:
content = requests.get(url,headers=headers)
data_queue.put(content.text) # 将采集的结果放入data_queue中
except Exception as e:
print('采集线程错误',e)
class Parser_thread(threading.Thread):
'''解析网页的类,就是对采集结果进行解析,也是多线程方式进行解析'''
def __init__(self,thread_id,queue,file):
threading.Thread.__init__(self)
self.thread_id = thread_id
self.queue = queue
self.file = file
def run(self):
print('启动线程:', self.thread_id)
while not flag:
try:
item = self.queue.get(False) # get参数为false时队列为空,会抛出异常
if not item:
pass
self.parse_data(item)
self.queue.task_done() # 每当发出一次get操作,就会提示是否堵塞
except Exception as e:
pass
print('退出了该线程:', self.thread_id)
def parse_data(self,item):
'''解析网页内容的函数:param item::return:'''
try:
html = etree.HTML(item)
result = html.xpath('//div[contains(@id,"qiushi_tag")]') # 匹配所有段子内容
for site in result:
try:
img_url = site.xpath('.//img/@src')[0] # 糗事图片
title = site.xpath('.//h2')[0].text # 糗事题目
content = site.xpath('.//div[@class="content"]/span')[0].text.strip() # 糗事内容
response={
'img_url':img_url,
'title':title,
'content':content
} #构造json
json.dump(response,fp=self.file,ensure_ascii=False) # 存放json文件
except Exception as e:
print('parse 2: ', e)
except Exception as e:
print('parse 1: ',e)
data_queue = Queue() # 存放解析数据的queue
flag = False
def main():
output = open('qiushi.json','a',encoding='utf-8') # 将结果保存到一个json文件中
pageQueue = Queue(50) # 任务队列,存放网页的队列
for page in range(1,11):
pageQueue.put(page) # 构造任务队列
# 初始化采集线程
crawl_threads = []
crawl_name_list = ['crawl_1','crawl_2','crawl_3'] # 总共构造3个爬虫线程
for thread_id in crawl_name_list:
thread = Crawl_thread(thread_id,pageQueue) # 启动爬虫线程
thread.start() # 启动线程
crawl_threads.append(thread)
# 初始化解析线程
parse_thread = []
parser_name_list = ['parse_1','parse_2','parse_3']
for thread_id in parser_name_list: #
thread = Parser_thread(thread_id,data_queue,output)
thread.start() # 启动线程
parse_thread.append(thread)
# 等待队列情况,先进行网页的抓取
while not pageQueue.empty(): # 判断是否为空
pass # 不为空,则继续阻塞
# 等待所有线程结束
for t in crawl_threads:
t.join()
# 等待队列情况,对采集的页面队列中的页面进行解析,等待所有页面解析完成
while not data_queue.empty():
pass
# 通知线程退出
global flag
flag = True
for t in parse_thread:
t.join() # 等待所有线程执行到此处再继续往下执行
print('退出主线程')
output.close()
if __name__ == '__main__':
main()
参考博客/教程[Python标准库]Queue——线程安全的 FIFO 实现
一个多线程爬虫的教程: