学习爬虫Part9-给爬虫加速:多线程,多进程

多进程

Python标准库原本有threading和multiprocessing模块编写相应的多线程/多进程代码。但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。多进程我们介绍futures的ProcessPoolExecutor

ProcessPoolExecutor类是Executor类的子类,实例化ProcessPoolExecutor类以创建进程池,在实例化的过程中应指定同时运行的最大进程数

1
2
3
4
5
6
7
8
9
10
11
from concurrent.futures import  ProcessPoolExecutor
pool = ProcessPoolExecutor(max_workers=4) # 运行最大进程数4
#进程池的操作...
pool.shutdown(wait=True) # 关闭进程池,默认等待所有进程的完成。
print('Deep') # 有shutdown的情况下所有进程完成后才会运行下面的print,没有的话会马上运行

'''
创建进程也可用with,这时会自带shutdown功能
with ProcessPoolExecutor(4) as pool:
#进程池的操作...
'''

该类有两种方法对进程池提交任务建立进程(函数及一组参数构成一个任务),分别是submit()和map(),如果单纯想多开进程别无他想,用哪个都行,但submit()会有更灵活的用法

map(fn,*iterables)

  • fn:函数
  • *iterables:函数每个参数的集合,N个参数就接N个集合
    可以理解这是python自带map()的多进程版,他返回的是一个迭代器,包含每个任务对应的返回值(有序的),下面用例子来分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from concurrent.futures import  ProcessPoolExecutor
import time


def test(x):
time.sleep(x) # 时间阻塞
print(str(x)+'s')
return x

if __name__ == '__main__':
with ProcessPoolExecutor(4) as pool:
p = pool.map(test,[2,3,10,5,6])
for i in p:
print(i)

输出

1
2
3
4
5
6
7
8
9
10
2s
2
3s
3
5s
6s
10s
10
5
6

分析(下面以参数代替某个进程):

  • 带s的是函数输出的,进程池最大允许4个进程同时运行,所以参数 2,3,10,5 首先一起进去。2最快完成,马上让给6进去,2+6 < 10 ,所以后进6完成得比10快,最后输出顺序就是 2s,3s,5s,6s,10s

  • 不带s的是for循环打印迭代器中的结果,由输出可见,i的值分配是会等待进程完成返回值的,等2的完成返回2,等3的完成返回3,等10的完成返回10,由于10完成前5和6早就完成了,所以返回10后紧接着返回5和6,最后输出顺序为2,3,10,5,6,是有序的,对应各任务的返回值

在爬虫中,上面代码中的时间阻塞会对应着网络I/O阻塞,任务中往往包含着网络请求。比如你有很多个图片链接,就写一个下载图片的函数(接收一个图片链接的参数),把函数和图片链接的集合喂给map()就实现多进程了加速了。

submit(fn, *arg)

  • fn:函数
  • *arg:函数的参数

该方法是往进程池中提交可回调的任务,并返回一个future实例。提交多个任务可用循环实现,返回的future实例用列表存起来,每个future代表一个进程。关于future对象有许多方法:

  • future.running():判断某个future(进程)是否运行中
  • future.done():判断某个future(进程)是否正常结束
  • future.cancel():终止某个future(进程),终止失败返回False,成功返回True
  • future.result():获取future对应任务返回的结果。如果future还没完成就会去等待
  • future.add_done_callback(fn):接收函数fn,将fn绑定到future对象上。当future对象被终止或完成时,fn将会被调用并接受该future对象
  • as_completed(fs):接收futures列表,futures列表中一旦有某个future(进程)完成就将该future对象yield回来,是个迭代器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from concurrent.futures import ProcessPoolExecutor,as_completed
import time

def test(x):
time.sleep(x)
print(str(x)+'s')
return x

if __name__ == '__main__':
with ProcessPoolExecutor(4) as pool:
futures = [pool.submit(test,i) for i in [2,3,10,5,6]]

'''
for j in futures:
print(j.result()) # 对应接收参数有序输出,输出2,3,10,5,6
'''
for j in as_completed(futures):
print(j.result()) # 对应进程完成顺序输出,输出2,3,5,6,10

多线程

建议小心使用,虽然多线程能实现高并发,但由于线程资源共享的特性,某个线程操作这些共享的资源时可能操到一半就停止让给另一个线程操作,导致错乱的发生。为避免此情况发生对某些操作需要加锁,所以这里介绍对锁有支持的threading模块,python自带直接导入。
如果你确信这些操作不会发生错乱,可以直接使用concurrent.future 的 ThreadPoolExecutor,方法什么的和ProcessPoolExecutor的一样

线程

创建线程有两种方法:

  1. 实例化 threading.Thread 类,target接收函数,arg以可迭代形式接收参数。这种方法最简单
1
2
3
4
5
6
7
8
9
10
11
12
import threading
import time

def test(x):
time.sleep(x)
print(str(x)+'s')
return x

t1 = threading.Thread(target=test, args=(1,)) # 创建线程
t2 = threading.Thread(target=test, args=(3,))
t1.start() # 启动线程
t2.start()
  1. 继承threading.Thread 类,重写run方法,把函数及参数接收写进自己写的多线程类中。这种方法更灵活,threading.Thread 类并没有供获取线程调用函数返回值的方法,如果需要函数返回值就需要继承该类自己实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

class TestThread(threading.Thread):
def __init__(self,x):
threading.Thread.__init__(self)
self.x = x # 参数接收

def run(self):
time.sleep(self.x) # 原来的函数写到run中
print(str(self.x)+'s')

def result(self): # 实现获取调用函数的返回值的方法
return self.x

t1 = TestThread(1) #创建线程
t2 = TestThread(3)
t1.start() # 启动线程
t2.start()
t1.join() # 等待线程结束
t2.join()
print(t1.result(),t2.result())

线程相关方法和属性:

  • Thread.start():启动线程
  • Thread.join():等待线程的结束,没有join的话会接着运行join下面的代码
  • Thread.is_alive():判断线程是否在运行,线程未开启/结束时返回 False
  • Thread.name:返回线程的名字,默认线程名是Thread-N,N指第N个开启的线程
  • Thread.setName(str):给线程命名
  • Thread.setDaemon(True/False):设置子线程是否会随主线程结束而结束,原本所有子线程默认是不会随主线程结束而结束的

线程间资源共享,如果多个线程共同对某个数据修改,可能会出现错误,为了保证数据的正确性,需要对多个线程进行同步。这时就需要引入锁了(利用GIL),锁只有一个,一个线程在持有锁的状态下对某些数据进行操作,其他线程就无法对该数据进行操作,直至该线程释放锁让其他线程抢,谁抢到谁就有权修改。

threading提供Lock和RLock两类锁,前者一个线程只能获取获取一次锁,后者允许一个线程能重复获取锁。如果某个线程对全局数据的操作是割裂的(分块的),那就使用RLock。

  • acquire():获取锁
  • release():释放锁
  • 有数据操作放在acquire 和 release 之间,就不会出现多个线程修改同一个数据的风险了
  • acquire 和 release 必须成对存在,如果一个线程只拿不放,其他线程没有锁能抢就只能永远阻塞(停止)
  • 一个错乱的例子及锁的使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time, threading

lock = threading.Lock() # rlock = threading.RLock()
balance = [0]

def test(n):
for i in range(100000): # 理想的情况是执行了+n,-n操作后才让另一个线程处理,结果永0
#lock.acquire()
balance[0] = balance[0] + n # 某个线程可能处理到这里就终止让给另一个线程处理了,循环一大,结果可能错乱不为0
balance[0] = balance[0] - n
#lock.release()
t1 = threading.Thread(target=test, args=(5,))
t2 = threading.Thread(target=test, args=(8.0,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance[0])

在不加锁的情况下多跑几次,你会的到不同的结果。但是加了锁之后,+n,-n两个操作完整执行,不会中途中断,结果永0。

限制同时运行线程数

使用 threading.Semaphore 类就行,Semaphore 在内部管理着一个计数器。调用 acquire() 会使这个计数器减1,release() 则是加1。计数器的值永远不会小于 0。当计数器到 0 时,再调用 acquire() 就会阻塞,直到其他线程来调用release(),这样就限制了同时运行线程的数量。

使用上非常简单,实例化Semaphore并指定线程数后,给函数的头加个acquire(),尾加个release()就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading, time

def test(x):
semaphore.acquire()
time.sleep(x)
print(x)
semaphore.release()

semaphore = threading.Semaphore(4) # 最大4个线程同时进行
ts = [threading.Thread(target=test,args=(i,)) for i in [2,3,5,10,6]]
[t.start() for t in ts]

'输出:2,3,5,6,10 (原理和上面多进程的那个差不多)'

关于threading的其他高级用法本文并未提及,以上都是些常用的用法,如果有更高级的需要,可以参考https://www.cnblogs.com/chengd/articles/7770898.html

应用在爬虫上

如果爬虫需要重复进行某个操作(如下载一张图片,爬取一张网页的源码,破解一次加密【加密耗cpu最好多进程】),那把这个操作抽象成一个接收相应参数的函数,把函数喂给进程/线程即可。

多线程下载豆瓣Top250电影图片

本次爬虫项目将爬取豆瓣Top250电影的图片,其网址为:https://movie.douban.com/top250

分别不使用多线程和使用多线程来完成,通过两者的对比,显示出多线程在爬虫项目中的巨大优势。

不使用多线程

  首先,我们不使用多线程来下载豆瓣Top250电影图片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import time
import requests
import urllib.request
from bs4 import BeautifulSoup

# 该函数用于下载图片
# 传入函数: 网页的网址url
def download_picture(url):

# 获取网页的源代码
r = requests.get(url)
# 利用BeautifulSoup将获取到的文本解析成HTML
soup = BeautifulSoup(r.text, "lxml")
# 获取网页中的电影图片
content = soup.find('div', class_='article')
images = content.find_all('img')
# 获取电影图片的名称和下载地址
picture_name_list = [image['alt'] for image in images]
picture_link_list = [image['src'] for image in images]

# 利用urllib.request..urlretrieve正式下载图片
for picture_name, picture_link in zip(picture_name_list, picture_link_list):
urllib.request.urlretrieve(picture_link, 'E://douban/%s.jpg' % picture_name)


def main():

# 全部10个网页
start_urls = ["https://movie.douban.com/top250"]
for i in range(1, 10):
start_urls.append("https://movie.douban.com/top250?start=%d&filter=" % (25 * i))

# 统计该爬虫的消耗时间
t1 = time.time()
print('*' * 50)

for url in start_urls:
download_picture(url)
t2 = time.time()

print('不使用多线程,总共耗时:%s'%(t2-t1))
print('*' * 50)

main()
1
2
3
**************************************************
不使用多线程,总共耗时:79.93260931968689
**************************************************

在不使用多线程的情况下,这个爬虫总共耗时约80s,完成了豆瓣Top250电影图片的下载。

urllib.request.urlretrieve(url, filename=None, reporthook=None, data=None)
将URL表示的网络对象复制到本地文件。
https://blog.csdn.net/pursuit_zhangyu/article/details/80556275

使用多线程

  接下来,我们使用多线程来下载豆瓣Top250电影图片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import time
import requests
import urllib.request
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED

# 该函数用于下载图片
# 传入函数: 网页的网址url
def download_picture(url):

# 获取网页的源代码
r = requests.get(url)
# 利用BeautifulSoup将获取到的文本解析成HTML
soup = BeautifulSoup(r.text, "lxml")
# 获取网页中的电影图片
content = soup.find('div', class_='article')
images = content.find_all('img')
# 获取电影图片的名称和下载地址
picture_name_list = [image['alt'] for image in images]
picture_link_list = [image['src'] for image in images]

# 利用urllib.request..urlretrieve正式下载图片
for picture_name, picture_link in zip(picture_name_list, picture_link_list):
urllib.request.urlretrieve(picture_link, 'E://douban/%s.jpg' % picture_name)


def main():

# 全部10个网页
start_urls = ["https://movie.douban.com/top250"]
for i in range(1, 10):
start_urls.append("https://movie.douban.com/top250?start=%d&filter=" % (25 * i))

# 统计该爬虫的消耗时间
print('*' * 50)
t3 = time.time()

# 利用并发下载电影图片
executor = ThreadPoolExecutor(max_workers=10) # 可以自己调整max_workers,即线程的个数
# submit()的参数: 第一个为函数, 之后为该函数的传入参数,允许有多个
future_tasks = [executor.submit(download_picture, url) for url in start_urls]
# 等待所有的线程完成,才进入后续的执行
wait(future_tasks, return_when=ALL_COMPLETED)

t4 = time.time()
print('使用多线程,总共耗时:%s' % (t4 - t3))
print('*' * 50)

main()
1
2
3
**************************************************
使用多线程,总共耗时:9.361606121063232
**************************************************

同样是下载豆瓣Top250电影,10个网页中的图片,在没有使用多线程的情况下,总共耗时约80s,而在使用多线程(10个线程)的情况下,总共耗时约9.5秒,效率整整提高了约8倍。

Donate? comment?