多进程
Python标准库原本有threading和multiprocessing模块编写相应的多线程/多进程代码。但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。多进程我们介绍futures的ProcessPoolExecutor
ProcessPoolExecutor类是Executor类的子类,实例化ProcessPoolExecutor类以创建进程池,在实例化的过程中应指定同时运行的最大进程数
1 | from concurrent.futures import ProcessPoolExecutor |
该类有两种方法对进程池提交任务建立进程(函数及一组参数构成一个任务),分别是submit()和map(),如果单纯想多开进程别无他想,用哪个都行,但submit()会有更灵活的用法
map(fn,*iterables)
- fn:函数
- *iterables:函数每个参数的集合,N个参数就接N个集合
可以理解这是python自带map()的多进程版,他返回的是一个迭代器,包含每个任务对应的返回值(有序的),下面用例子来分析
1 | from concurrent.futures import ProcessPoolExecutor |
输出
1 | 2s |
分析(下面以参数代替某个进程):
带s的是函数输出的,进程池最大允许4个进程同时运行,所以参数 2,3,10,5 首先一起进去。2最快完成,马上让给6进去,2+6 < 10 ,所以后进6完成得比10快,最后输出顺序就是 2s,3s,5s,6s,10s
不带s的是for循环打印迭代器中的结果,由输出可见,i的值分配是会等待进程完成返回值的,等2的完成返回2,等3的完成返回3,等10的完成返回10,由于10完成前5和6早就完成了,所以返回10后紧接着返回5和6,最后输出顺序为2,3,10,5,6,是有序的,对应各任务的返回值
在爬虫中,上面代码中的时间阻塞会对应着网络I/O阻塞,任务中往往包含着网络请求。比如你有很多个图片链接,就写一个下载图片的函数(接收一个图片链接的参数),把函数和图片链接的集合喂给map()就实现多进程了加速了。
submit(fn, *arg)
- fn:函数
- *arg:函数的参数
该方法是往进程池中提交可回调的任务,并返回一个future实例。提交多个任务可用循环实现,返回的future实例用列表存起来,每个future代表一个进程。关于future对象有许多方法:
- future.running():判断某个future(进程)是否运行中
- future.done():判断某个future(进程)是否正常结束
- future.cancel():终止某个future(进程),终止失败返回False,成功返回True
- future.result():获取future对应任务返回的结果。如果future还没完成就会去等待
- future.add_done_callback(fn):接收函数fn,将fn绑定到future对象上。当future对象被终止或完成时,fn将会被调用并接受该future对象
- as_completed(fs):接收futures列表,futures列表中一旦有某个future(进程)完成就将该future对象yield回来,是个迭代器
1 | from concurrent.futures import ProcessPoolExecutor,as_completed |
多线程
建议小心使用,虽然多线程能实现高并发,但由于线程资源共享的特性,某个线程操作这些共享的资源时可能操到一半就停止让给另一个线程操作,导致错乱的发生。为避免此情况发生对某些操作需要加锁,所以这里介绍对锁有支持的threading模块,python自带直接导入。
如果你确信这些操作不会发生错乱,可以直接使用concurrent.future 的 ThreadPoolExecutor,方法什么的和ProcessPoolExecutor的一样
线程
创建线程有两种方法:
- 实例化 threading.Thread 类,target接收函数,arg以可迭代形式接收参数。这种方法最简单
1 | import threading |
- 继承threading.Thread 类,重写run方法,把函数及参数接收写进自己写的多线程类中。这种方法更灵活,threading.Thread 类并没有供获取线程调用函数返回值的方法,如果需要函数返回值就需要继承该类自己实现
1 | import threading |
线程相关方法和属性:
- Thread.start():启动线程
- Thread.join():等待线程的结束,没有join的话会接着运行join下面的代码
- Thread.is_alive():判断线程是否在运行,线程未开启/结束时返回 False
- Thread.name:返回线程的名字,默认线程名是Thread-N,N指第N个开启的线程
- Thread.setName(str):给线程命名
- Thread.setDaemon(True/False):设置子线程是否会随主线程结束而结束,原本所有子线程默认是不会随主线程结束而结束的
锁
线程间资源共享,如果多个线程共同对某个数据修改,可能会出现错误,为了保证数据的正确性,需要对多个线程进行同步。这时就需要引入锁了(利用GIL),锁只有一个,一个线程在持有锁的状态下对某些数据进行操作,其他线程就无法对该数据进行操作,直至该线程释放锁让其他线程抢,谁抢到谁就有权修改。
threading提供Lock和RLock两类锁,前者一个线程只能获取获取一次锁,后者允许一个线程能重复获取锁。如果某个线程对全局数据的操作是割裂的(分块的),那就使用RLock。
- acquire():获取锁
- release():释放锁
- 有数据操作放在acquire 和 release 之间,就不会出现多个线程修改同一个数据的风险了
- acquire 和 release 必须成对存在,如果一个线程只拿不放,其他线程没有锁能抢就只能永远阻塞(停止)
- 一个错乱的例子及锁的使用:
1 | import time, threading |
在不加锁的情况下多跑几次,你会的到不同的结果。但是加了锁之后,+n,-n两个操作完整执行,不会中途中断,结果永0。
限制同时运行线程数
使用 threading.Semaphore 类就行,Semaphore 在内部管理着一个计数器。调用 acquire() 会使这个计数器减1,release() 则是加1。计数器的值永远不会小于 0。当计数器到 0 时,再调用 acquire() 就会阻塞,直到其他线程来调用release(),这样就限制了同时运行线程的数量。
使用上非常简单,实例化Semaphore并指定线程数后,给函数的头加个acquire(),尾加个release()就行。
1 | import threading, time |
关于threading的其他高级用法本文并未提及,以上都是些常用的用法,如果有更高级的需要,可以参考https://www.cnblogs.com/chengd/articles/7770898.html
应用在爬虫上
如果爬虫需要重复进行某个操作(如下载一张图片,爬取一张网页的源码,破解一次加密【加密耗cpu最好多进程】),那把这个操作抽象成一个接收相应参数的函数,把函数喂给进程/线程即可。
多线程下载豆瓣Top250电影图片
本次爬虫项目将爬取豆瓣Top250电影的图片,其网址为:https://movie.douban.com/top250
分别不使用多线程和使用多线程来完成,通过两者的对比,显示出多线程在爬虫项目中的巨大优势。
不使用多线程
首先,我们不使用多线程来下载豆瓣Top250电影图片
1 | import time |
1 | ************************************************** |
在不使用多线程的情况下,这个爬虫总共耗时约80s,完成了豆瓣Top250电影图片的下载。
urllib.request.urlretrieve(url, filename=None, reporthook=None, data=None)
将URL表示的网络对象复制到本地文件。
https://blog.csdn.net/pursuit_zhangyu/article/details/80556275
使用多线程
接下来,我们使用多线程来下载豆瓣Top250电影图片
1 | import time |
1 | ************************************************** |
同样是下载豆瓣Top250电影,10个网页中的图片,在没有使用多线程的情况下,总共耗时约80s,而在使用多线程(10个线程)的情况下,总共耗时约9.5秒,效率整整提高了约8倍。