python多进程
基本概念
Python中的多进程是通过multiprocessing
包来实现的,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象
这个进程对象的方法和线程对象的方法差不多也有start(),run(),join()等方法,其中有一个方法不同Thread线程对象中的守护线程方法是setDeamon,而Process进程对象的守护进程是通过设置daemon属性来完成的
与多线程的共享式内存不同,由于各个进程都是相互独立的,因此进程间通信再多进程中扮演这非常重要的角色,Python中我们可以使用multiprocessing模块中的pipe
、queue
、Array
、Value
等等工具来实现进程间通讯和数据共享,但是在编写起来仍然具有很大的不灵活性
任务类型
同步与异步
同步 就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息 那么这个进程将会一直等待 下去,直到收到返回信息才继续执行下去
异步 是指进程不需要一直等 下去,而是继续执行下面的操作,不管其他进程的状态 当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率
IO密集和计算密集
对于IO密集型任务 : python的多线程能够节省时间
对于计算(CPU)密集型任务 : Python的多线程并没有用处,建议使用多进程
其他组合搭配
python使用多核,即开多个进程
方法一: 协程+多进程,使用方法简单,效率还可以,一般使用该方法
协程yield是你自己写的,是自己定义什么时候切换进程
方法二:IO多路复用,使用复杂,但效率很高,不常用
多进程相关模块
复制 # 创建管理进程模块:
Process (用于创建进程): 通过创建一个Process对象然后调用它的start()方法来生成进程。Process遵循threading . Thread的API。
Pool (用于创建进程管理池) :可以创建一个进程池,该进程将执行与Pool该类一起提交给它的任务,当子进程较多需要管理时使用。
Queue(用于进程通信,资源共享):进程间通信,保证进程安全。
Value,Array(用于进程通信,资源共享):
Pipe(用于管道通信):管道操作。
Manager(用于资源共享):创建进程间共享的数据,包括在不同机器上运行的进程之间的网络共享。
# 同步子进程模块:
Condition
Event:用来实现进程间同步通信。
Lock:当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
RLock
Semaphore:用来控制对共享资源的访问数量,例如池的最大连接数。
python多线程低效原因
GIL 的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定
某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个
拿不到通行证的线程,就不允许进入 CPU 执行
目前 Python 的解释器有多种,例如:
CPython :CPython 是用C语言实现的 Python 解释器,作为官方实现,它是最广泛使用的 Python 解释器
PyPy :PyPy 是用RPython实现的解释器。RPython 是 Python 的子集, 具有静态类型。这个解释器的特点是即时编译,支持多重后端(C, CLI, JVM)。PyPy 旨在提高性能,同时保持最大兼容性(参考 CPython 的实现)
Jython :Jython 是一个将 Python 代码编译成 Java 字节码的实现,运行在JVM(Java Virtual Machine)上。另外,它可以像是用 Python 模块一样,导入 并使用任何Java类
IronPython :IronPython 是一个针对 .NET 框架的 Python 实现。它 可以用 Python 和 .NET framewor k的库,也能将 Python 代码暴露给 .NET 框架中的其他语言
GIL 只在 CPython 中才有,而在 PyPy 和 Jython 中是没有 GIL 的
注意: 每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源
这就导致打印线程执行时长,会发现耗时更长的原因
并且由于 GIL 锁存在,Python 里一个进程永远只能同时执行一个线程(拿到 GIL 的线程才能执行),这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因
多进程实现方式
Process
普通Process
复制 from multiprocessing import Process
def func ( name ):
print ( '测试 %s 多进程' % name)
if __name__ == '__main__' :
process_list = []
for i in range ( 5 ): #开启5个子进程执行fun1函数
p = Process (target = func, args = ( 'Python' ,)) #实例化进程对象
p . start ()
process_list . append (p)
for i in process_list :
p . join ()
print ( '结束测试' )
复制 测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
结束测试
Process finished with exit code 0
上面的代码开启了5个子进程去执行函数,我们可以观察结果,是同时打印的,这里实现了真正的并行操作,就是多个CPU同时执行任务。
我们知道进程是python中最小的资源分配单元 ,也就是进程中间的数据,内存是不共享的 ,每启动一个进程,都要独立分配资源和拷贝访问的数据,所以进程的启动和销毁的代价是比较大 了,所以在实际中使用多进程,要根据服务器的配置来设定。
继承Process
复制 from multiprocessing import Process
class MyProcess ( Process ): #继承Process类
def __init__ ( self , name ):
super (MyProcess,self). __init__ ()
self . name = name
def run ( self ):
print ( '测试 %s 多进程' % self.name)
if __name__ == '__main__' :
process_list = []
for i in range ( 5 ): #开启5个子进程执行fun1函数
p = MyProcess ( 'Python' ) #实例化进程对象
p . start ()
process_list . append (p)
for i in process_list :
p . join ()
print ( '结束测试' )
复制 测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
结束测试
Process finished with exit code 0
通过类继承的方法来实现的,python多进程的第二种实现方式也是一样的,效果和第一种方式一样
Process类的其他方法
复制 构造方法:
Process ([ group [ , target [ , name [ , args [ , kwargs ]]]]])
group: 线程组
target: 要执行的方法
name: 进程名
args / kwargs: 要传入方法的参数
实例方法:
is_alive():返回进程是否在运行 , bool类型。
join([ timeout ]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程
属性:
daemon:和线程的setDeamon功能一样
name:进程名字
pid:进程号
进程池
复制 # apply_async:异步
from multiprocessing import Pool , cpu_count
import os , time , random
def fun1 ( name ):
print ( 'Run task %s ( %s )...' % (name, os. getpid ()))
start = time . time ()
time . sleep (random. random () * 3 )
end = time . time ()
print ( 'Task %s runs %0.2f seconds.' % (name, (end - start)))
return f ' { name } : { os . getpid () } '
if __name__ == '__main__' :
results = []
pool = Pool ( cpu_count () - 1 )
for i in range ( 4 ):
results . append (pool. apply_async (func = fun1, args = (i,)))
pool . close ()
pool . join ()
print ()
for result in results :
print (result. get ())
print ( 'All Done!!!' )
print ( '结束测试' )
复制 Run task 0 ( 30716 )...
Run task 1 ( 15020 )...
Run task 2 ( 23200 )...
Run task 3 ( 5884 )...
Task 0 runs 1.34 seconds.
Task 2 runs 1.53 seconds.
Task 1 runs 1.88 seconds.
Task 3 runs 2.48 seconds.
0 : 30716
1 : 15020
2 : 23200
3 : 5884
All Done !!!
结束测试
Process finished with exit code 0
对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了
复制 # map_async:异步
from multiprocessing import Pool , cpu_count , Manager
from functools import partial
def job ( data , mgrDicTask , lock ):
res = f 'a+b = { data [ 0 ] + data [ 1 ] } '
lock . acquire ()
# Manager对象无法监测到它引用的可变对象值的修改,需要通过触发__setitem__方法来让它获得通知
tempDic = list (mgrDicTask[ 'result' ])
tempDic . append (res)
mgrDicTask [ 'result' ] = tempDic
lock . release ()
return res
if __name__ == "__main__" :
data = [[ 2 , 3 ] , [ 3 , 4 ] , [ 2 , 5 ]]
pool = Pool (processes = cpu_count () - 1 )
mgr = Manager ()
lock = mgr . Lock ()
mgrDicTask = mgr . dict ()
mgrDicTask [ 'result' ] = []
fun = partial (job, mgrDicTask = mgrDicTask, lock = lock)
pool . map_async (fun, data)
pool . close ()
pool . join ()
print (mgrDicTask[ 'result' ])
print ( 'All Done!!!' )
复制 [ 'a+b = 7' , 'a+b = 7' , 'a+b = 5' ]
All Done !!!
Process finished with exit code 0
其他进程池
复制 # 进程池的另外一种创建方式,跟线程池的创建方式一样。其方法等也相同。
def process_pool_test ( url_list ):
book_list = []
# 创建进程池
pool = ProcessPoolExecutor (max_workers = 20 )
start = time . time ()
for url in url_list :
time . sleep ( 0.5 )
result = pool . submit (get_book_info, url)
book_list . append (result)
pool . shutdown ()
print ( 'time: ' , time. time () - start)
book_name_list = []
author_list = []
author_info_list = []
print ( 'book_list: ' , len (book_list))
for future in book_list :
book_name_list . extend (future. result ()[ 'name' ])
author_list . extend (future. result ()[ 'author' ])
author_info_list . extend (future. result ()[ 'info' ])
ExcelUtils . write_data_to_excel ( 'bookInfo' , book_name_list, author_list, author_info_list)
if __name__ == '__main__' :
sys . setrecursionlimit ( 10000 )
url_list = [ 'https://www.edge.org/library' ]
for i in range ( 1 , 52 ):
url_list . append ( 'https://www.edge.org/library?page= %s ' % i)
thread_pool_test (url_list)
多进程通信
内容提取神器 beautiful Soup 的用法
进程是系统独立调度核分配系统资源(CPU、内存)的基本单位,进程之间是相互独立的 ,每启动一个新的进程相当于把数据进行了一次克隆,子进程里的数据修改无法影响到主进程中的数据,不同子进程之间的数据也不能共享,这是多进程在使用中与多线程最明显的区别
但是难道Python多进程中间难道就是孤立的吗?
当然不是,python也提供了多种方法实现了多进程中间的通信和数据共享(可以修改一份数据)
进程队列Queue
复制 #!/usr/bin/env Python
# -- coding: utf-8 --
"""
@version: v1.0
@author: narutohyc
@file: multiprocessing_queue.py
@Description: 多进程队列使用示例
@time: 2020/5/14 15:53
"""
from multiprocessing import Process , Queue , Manager
from multiprocessing import cpu_count
import os
import time
class Task :
def __init__ ( self , task_name : str , data : list , ** kwargs ):
self . task_name = task_name
self . data = data
def __repr__ ( self ):
return f 'task_name: { self . task_name } data: { self . data } '
class MultiProcessingQueue :
def __init__ ( self ):
# 进程数
self . num_of_worker = cpu_count ()
# 进程队列大小,根据不同的任务需求
self . size_of_queue = 10
def start_work ( self ):
print ( "start_work 开始" )
# 进程队列
process_list = []
# 新建一个大小为10的队列
work_queue = Queue (self.size_of_queue)
# 进程间共享列表, 其他的还有共享字典等,都是进程安全的
dealed_sample_lst = Manager (). list ()
# 一个生产者
sent = Process (target = self.productor, args = (work_queue, dealed_sample_lst,))
sent . start ()
process_list . append (sent)
# 多个消费者
for _ in range (self.num_of_worker - 1 ):
process = Process (target = self.consumer, args = (work_queue, dealed_sample_lst,))
process . start ()
process_list . append (process)
[process . join () for process in process_list]
print ( "start_work 结束" )
return dealed_sample_lst
def productor ( self , work_queue : Queue , dealed_sample_lst ):
print ( "生产者开始工作" )
for ii in range ( 100 ):
work_queue . put ( Task (task_name = f ' {str (os. getpid ()) } - {str (ii) } ' , data = [ii for _ in range ( 2 )]))
if ii % 30 == 0 :
time . sleep ( 1 )
print ( "生产者休息ing" )
'''
JoinableQueue 比Queue多了task_done() 与join()两个函数,多用于生产者消费者问题。
task_done()是用在get()后,发送通知说我get完了
join()是说Queue里所有的task都已处理。
'''
# 这里需要加入结束标识,还有就是JoinableQueue的方式
for _ in range (self.num_of_worker - 1 ):
work_queue . put ( None )
print ( "生产者工作结束" )
def consumer ( self , work_queue : Queue , dealed_sample_lst ):
while True :
task : Task = work_queue . get ()
if task is None :
break
# 处理数据
task . data = [ii * 2 for ii in task . data]
dealed_sample_lst . append (task)
print (task)
print ( f '进程 { os. getpid () } 处理结束' )
def multiprocessing_queue_test ():
multiprocessing_queue = MultiProcessingQueue ()
dealed_sample_lst = multiprocessing_queue . start_work ()
# for sample in dealed_sample_lst:
# print(sample)
print ( "测试结束" )
if __name__ == '__main__' :
multiprocessing_queue_test ()
复制 start_work 开始
生产者开始工作
task_name: 28868 - 0 data :[ 0 , 0 ]
生产者休息ing
task_name: 28868 - 1 data :[ 2 , 2 ]
...
task_name: 28868 - 6 data :[ 12 , 12 ]
生产者休息ing
task_name: 28868 - 31 data :[ 62 , 62 ]
...
task_name: 28868 - 58 data :[ 116 , 116 ]
生产者休息ing
task_name: 28868 - 61 data :[ 122 , 122 ]
...
task_name: 28868 - 64 data :[ 128 , 128 ]
...
生产者休息ing
task_name: 28868 - 91 data :[ 182 , 182 ]
...
task_name: 28868 - 96 data :[ 192 , 192 ]
生产者工作结束
进程29208 处理结束
task_name: 28868 - 97 data :[ 194 , 194 ]
进程20632 处理结束
进程28496 处理结束
task_name: 28868 - 98 data :[ 196 , 196 ]
进程30200 处理结束
进程26512 处理结束
进程29776 处理结束
task_name: 28868 - 99 data :[ 198 , 198 ]
进程30072 处理结束
start_work 结束
测试结束
上面的代码结果可以看到我们主进程中可以通过Queue获取子进程中put的数据,实现进程间的通信
JoinableQueue队列
JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理
通知进程是使用共享的信号和条件变量来实现的
参数介绍:
maxsize : 是队列中允许最大项数,省略则无大小限制
方法介绍:
q.task_done() :使用者使用此方法发出信号,表示q.get()的返回项目已经被处理 如果调用此方法的次数大于从队列中删除项目的数量将引发ValueError异常
q.join() :生产者调用此方法进行阻塞,直到队列中所有的项目均被处理 阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
示例代码
复制 #!/usr/bin/env Python
# -- coding: utf-8 --
"""
@version: v1.0
@author: narutohyc
@file: multiprocessing_queue.py
@Description: 多进程队列使用示例
@time: 2020/5/14 15:53
"""
from multiprocessing import Process , Queue , JoinableQueue , Manager
from multiprocessing import cpu_count
import os , time
class Task :
def __init__ ( self , task_name : str , data : list , ** kwargs ):
self . task_name = task_name
self . data = data
def __repr__ ( self ):
return f 'task_name: { self . task_name } data: { self . data } '
class MultiProcessingJoinableQueue :
def __init__ ( self ):
# 进程数
self . num_of_worker = cpu_count ()
# 进程队列大小,根据不同的任务需求
self . size_of_queue = 10
def start_work ( self ):
print ( "start_work 开始" )
# 进程队列
process_list = []
# 新建一个大小为10的队列
work_queue = JoinableQueue (self.size_of_queue)
# 进程间共享列表, 其他的还有共享字典等,都是进程安全的
dealed_sample_lst = Manager (). list ()
# 一个生产者
sent = Process (target = self.productor, args = (work_queue, dealed_sample_lst,))
process_list . append (sent)
# 多个消费者
for _ in range (self.num_of_worker - 1 ):
process = Process (target = self.consumer, args = (work_queue, dealed_sample_lst,))
process . daemon = True
process_list . append (process)
[process . start () for process in process_list]
# 这里需要注意的一点是,这里join只需要调用生产者(别调消费者的join,否则无法正常退出)
# 消费者不需要,个人感觉应该是生产者那边已经调用了work_queue.join()的方法
# 消费者结束后,整个程序退出
[process . join () for process in process_list [: 1 ] ]
print ( "start_work 结束" )
return dealed_sample_lst
def productor ( self , work_queue : Queue , dealed_sample_lst ):
print ( "生产者开始工作" )
for ii in range ( 100 ):
work_queue . put ( Task (task_name = f ' {str (os. getpid ()) } - {str (ii) } ' , data = [ii for _ in range ( 2 )]))
if ii % 30 == 0 :
time . sleep ( 1 )
print ( "生产者休息ing" )
print ( "生产者工作结束" )
work_queue . join ()
def consumer ( self , work_queue : Queue , dealed_sample_lst ):
while True :
task : Task = work_queue . get ()
if task is None :
break
# 处理数据
task . data = [ii * 2 for ii in task . data]
dealed_sample_lst . append (task)
print (task)
work_queue . task_done ()
print ( f '进程 { os. getpid () } 处理结束' )
def multiprocessing_joinablequeue_test ():
multiprocessing_joinablequeue = MultiProcessingJoinableQueue ()
dealed_sample_lst = multiprocessing_joinablequeue . start_work ()
# for sample in dealed_sample_lst:
# print(sample)
print ( "测试结束" )
if __name__ == '__main__' :
multiprocessing_joinablequeue_test ()
结果输出
复制 start_work 开始
生产者开始工作
task_name: 14608-0 data:[ 0 , 0 ]
生产者休息ing
task_name: 14608-1 data:[ 2 , 2 ]
...
task_name: 14608-7 data:[ 14 , 14 ]
生产者休息ing
task_name: 14608-31 data:[ 62 , 62 ]
...
task_name: 14608-60 data:[ 120 , 120 ]
生产者休息ing
task_name: 14608-61 data:[ 122 , 122
...
task_name: 14608-90 data:[ 180 , 180 ]
生产者休息ing
生产者工作结束
task_name: 14608-91 data:[ 182 , 182 ]
...
task_name: 14608-93 data:[ 186 , 186 ]
start_work 结束
测试结束
管道Pipe
Pipe的本质是进程之间的用管道数据传递,而不是数据共享,这和socket有点像
pipe()返回两个连接对象分别表示管道的两端,每端都有send()和recv()函数
如果两个进程试图在同一时间的同一端进行读取和写入那么,这可能会损坏管道中的数据
管道是数据不安全的,多个进程同时收发数据可道引起数据异常,这时候就应该配合锁使用
复制 from multiprocessing import Process , Pipe
def fun1 ( conn ):
print ( '子进程发送消息:' )
conn . send ( '你好主进程' )
print ( '子进程接受消息:' )
print (conn. recv ())
conn . close ()
if __name__ == '__main__' :
conn1 , conn2 = Pipe () #关键点,pipe实例化生成一个双向管
p = Process (target = fun1, args = (conn2,)) #conn2传给子进程
p . start ()
print ( '主进程接受消息:' )
print (conn1. recv ())
print ( '主进程发送消息:' )
conn1 . send ( "你好子进程" )
p . join ()
print ( '结束测试' )
复制 主进程接受消息:
子进程发送消息:
子进程接受消息:
你好主进程
主进程发送消息:
你好子进程
结束测试
Process finished with exit code 0
上面可以看到主进程和子进程可以相互发送消息
Managers
Queue和Pipe只是实现了数据交互,并没实现数据共享 ,即一个进程去更改另一个进程的数据,那么就要用到Managers
复制 from multiprocessing import Process , Manager
def fun1 ( dic , lis , index ):
dic [ index ] = 'a'
dic [ '2' ] = 'b'
lis . append (index) #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
#print(l)
if __name__ == '__main__' :
with Manager () as manager :
dic = manager . dict () #注意字典的声明方式,不能直接通过{}来定义
l = manager . list ( range ( 5 )) #[0,1,2,3,4]
process_list = []
for i in range ( 10 ):
p = Process (target = fun1, args = (dic,l,i))
p . start ()
process_list . append (p)
for res in process_list :
res . join ()
print (dic)
print (l)
复制 { 0 : 'a' , '2' : 'b' , 3 : 'a' , 1 : 'a' , 2 : 'a' , 4 : 'a' , 5 : 'a' , 7 : 'a' , 6 : 'a' , 8 : 'a' , 9 : 'a' }
[ 0 , 1 , 2 , 3 , 4 , 0 , 3 , 1 , 2 , 4 , 5 , 7 , 6 , 8 , 9 ]
可以看到主进程定义了一个字典和一个列表,在子进程中,可以添加和修改字典的内容 在列表中插入新的数据,实现进程间的数据共享,即可以共同修改同一份数据
注意事项
无法调用多层生成器(待验证)
复制 #!/usr/bin/env Python
# -- coding: utf-8 --
"""
@version: v0.1
@author: narutohyc
@file: text.py
@Description:
@time: 2020/5/29 19:51
"""
from multiprocessing import Process , Queue , JoinableQueue , Manager
from multiprocessing import cpu_count
import os , time
from abc import (ABC ,
abstractmethod ,
ABCMeta)
from comm . logger . logger_config import logger
class SampleIterator (ABC , metaclass = ABCMeta ):
def __init__ ( self ):
pass
@abstractmethod
def __iter__ ( self ):
'''
样本处理并返回
'''
pass
class DataSource1 ( SampleIterator ):
def __init__ ( self ):
super (DataSource1, self). __init__ ()
def __iter__ ( self ):
for ii in range ( 10 ):
yield ii
class DataSource2 ( SampleIterator ):
def __init__ ( self , data_source ):
super (DataSource2, self). __init__ ()
self . data_source = data_source
def __iter__ ( self ):
for ii in self . data_source :
yield ii
class DataSource3 ( SampleIterator ):
def __init__ ( self , data_source ):
super (DataSource3, self). __init__ ()
self . data_source = data_source
def __iter__ ( self ):
for ii in self . data_source :
yield ii
class DataSource4 ( SampleIterator ):
def __init__ ( self , data_source ):
super (DataSource4, self). __init__ ()
self . data_source = data_source
def __iter__ ( self ):
for ii in self . data_source :
yield ii
class HUCY ():
def __init__ ( self , data_source = None ):
self . num_of_worker = cpu_count ()
self . size_of_queue = 2
self . data_source = data_source
def start_work ( self ):
# 进程队列
process_list = []
# 新建一个大小为10的队列
work_queue = Queue (self.size_of_queue)
# 一个生产者
produce_num = 1
for _ in range (produce_num):
sent = Process (target = self.productor, args = (work_queue,))
sent . start ()
process_list . append (sent)
# 多个消费者
for _ in range (self.num_of_worker - produce_num):
process = Process (target = self.consumer, args = (work_queue,))
process . start ()
process_list . append (process)
# 这里需要加入结束标识,还有就是JoinableQueue的方式
[process . join () for process in process_list [: produce_num ] ]
for _ in range (self.num_of_worker - produce_num):
work_queue . put ( None )
[process . join () for process in process_list [ produce_num :] ]
print ( "start_work 结束" )
def productor ( self , work_queue ):
[work_queue . put (ii) for ii in self . data_source]
logger . info ( "生产者结束" )
def consumer ( self , work_queue ):
while True :
data = work_queue . get ()
if data is None :
break
logger . info ( f "数据: { data } " )
logger . info ( "消费者结束" )
def hyc_test ():
da1 = DataSource2 ( DataSource3 ( DataSource4 ( DataSource1 ())))
da2 = DataSource2 ( DataSource3 ( DataSource4 (da1)))
da3 = DataSource2 ( DataSource3 ( DataSource4 (da2)))
da4 = DataSource2 ( DataSource3 ( DataSource4 (da3)))
da5 = DataSource2 ( DataSource3 ( DataSource4 (da4)))
da6 = DataSource2 ( DataSource3 ( DataSource4 ( DataSource2 (da5))))
hucy = HUCY (da6)
hucy . start_work ()
if __name__ == '__main__' :
hyc_test ()
几个问题:
以上代码 若有多个生产者 就会各自拥有自己的数据生成器,导致数据重复