## 使用fork创建新进程
Python的OS模块提供了一个fork()函数,该函数的作用在于程序会启动两个进程(一个父进程一个子进程)来执行从os.fork()开始的所有代码,fork()函数不需要参数,会返回一个表明是哪个进程在执行的返回值,如果返回0则表明是fork出来的子进程在执行,如果返回非0,则表明是父进程在执行
该方法仅在UNIX及类UNIX系统上行得通,包括UNIX,Linux和Mac系统
import osprint('父进程(%s)开始执行' % os.getpid())# 开始fork一个子进程# 从这行代码开始,下面代码都会被两个进程执行pid = os.fork()print('进程进入:%s' % os.getpid())# 如果pid为0,表明子进程if pid == 0: print('子进程,其ID为 (%s), 父进程ID为 (%s)' % (os.getpid(), os.getppid()))else: print('我 (%s) 创建的子进程ID为 (%s).' % (os.getpid(), pid))print('进程结束:%s' % os.getpid())
从pid = os.fork()之后的代码程序会分别使用父进程和子进程来执行
## 使用multiprocessing.Process创建新进程
Python在muliprocessing模块下提供了Process来创建新进程,与Thread非常类似,创建新进程有两种方式
-以指定函数作为target,创建Process对象即可创建新进程
-继承Process类,并重写它的run()方法来创建进程类,程序创建Process子类的实例作为进程
同样的Process也具有如下类似的方法和属性
- run():重写该方法可实现进程的执行体
- start():该方法用于启动进程
-join(timeout):该方法类似于线程的join()方法,当前进程必须等待被join的进程执行完毕才能继续执行
-name:该属性用于设置或访问进程的名字
-is_alive():判断进程死活
-daemon:该属性用于判断或设置进程的后台状态
-pid:返回进程ID
-authkey:返回进程的授权key
-terminate():中断该进程
### 以指定函数作为target创建进程
import multiprocessingimport os# 定义一个普通的action函数,该函数准备作为进程执行体def action(max): for i in range(max): print("(%s)子进程(父进程:(%s)):%d" % (os.getpid(), os.getppid(), i))if __name__ == '__main__': # 下面是主程序(也就是主进程) for i in range(100): print("(%s)主进程: %d" % (os.getpid(), i)) if i == 20: # 创建并启动第一个进程 mp1 = multiprocessing.Process(target=action,args=(100,)) mp1.start() # 创建并启动第一个进程 mp2 = multiprocessing.Process(target=action,args=(100,)) mp2.start() mp2.join() print('主进程执行完成!')
### 继承Process类创建子进程
创建步骤:
-定义继承Process的子类,重写其run()方法准备作为进程执行体
-创建Process子类的实例
-调用Process子类的实例的start()方法来启动进程
import multiprocessingimport osclass MyProcess(multiprocessing.Process): def __init__(self, max): self.max = max super().__init__() # 重写run()方法作为进程执行体 def run(self): for i in range(self.max): print("(%s)子进程(父进程:(%s)):%d" % (os.getpid(), os.getppid(), i))if __name__ == '__main__': # 下面是主程序(也就是主进程) for i in range(100): print("(%s)主进程: %d" % (os.getpid(), i)) if i == 20: # 创建并启动第一个进程 mp1 = MyProcess(100) mp1.start() # 创建并启动第一个进程 mp2 = MyProcess(100) mp2.start() mp2.join() print('主进程执行完成!')
## Context和启动进程的方式
Python有三种启动进程的方式
-spawn:父进程会启动一个全新的Python解释器进程,这也是windows平台唯一的方式,在这种方式下,子进程只能继承那些处理run()方法所必须的资源,那些不必要的文件扫描器和handle都不会被继承,这种方式比fork或forkserver方式效率要低
-fork:父进程使用os.fork()来启动一个python解释器进程,该子进程会继承父进程的所有资源,因此它基本等效于父进程
-forkserver:用这种方式启动进程,程序会启动一个服务器进程,之后当程序再次请求启动新进程时,父进程都会链接到该服务器进程,请求由服务器进程来fork新进程,这种方式无需从父进程继承资源,跟fork一样该方式仅在UNIX系统有效
### set_start_method函数
multiprocessing模块提供了set_start_method()函数,用于设置启动继承的方式,该代码必须所有与多进程相关的代码之前
import multiprocessingimport osdef foo(q): print('被启动的新进程: (%s)' % os.getpid()) q.put('Python')if __name__ == '__main__': # 设置使用fork方式启动进程 multiprocessing.set_start_method('spawn') q = multiprocessing.Queue() # 创建进程 mp = multiprocessing.Process(target=foo, args=(q, )) # 启动进程 mp.start() # 获取队列中的消息 print(q.get()) mp.join()
程序的新进程想multiprocess.Queue中放入一个数据`(Python)`主进程取出该Queue中的数据,并输出该数据
### get_context函数
该方法也能设置进程启动方式,利用get_context()方法来获取Context对象,调用该方法可传入spawn、fork、forkserver字符串,Context拥有和multiprocessing相同的API
import multiprocessingimport osdef foo(q): print('被启动的新进程: (%s)' % os.getpid()) q.put('Python')if __name__ == '__main__': # 设置使用fork方式启动进程,并获取Context对象 ctx = multiprocessing.get_context('fork') # 接下来就可用Context对象来代替mutliprocessing模块了 q = ctx.Queue() # 创建进程 mp = ctx.Process(target=foo, args=(q, )) # 启动进程 mp.start() # 获取队列中的消息 print(q.get()) mp.join()
## 使用进程池管理进程
程序可以通过multiprocessing模块的Pool()函数来创建进程池,它实际上是multiprocess.pool.Pool类,它提供了如下常用函数:
-apply(func[, args[, kwds]]):将func函数提交给进程池处理,其中args代表传给func的位置参数,kwds代表传给func的关键字参数,该方法会被阻塞直到func函数执行完成
-apply_async(func[, args[, kwds[, callback[, error_callback]]]]):这是apply()函数的异步版,该方法不会被阻塞,其中callback指定func函数完成后的回调函数,error_callback指定func函数出错后的回调函数
-map(func, iterable[, chunksize]):类似于Python的map()函数,只是此处使用新进程对iterable的每一个元素执行func函数
-map_async(func, iterable[, chunksize[, callback[, error_callback]]]):这是map()方法的异步版,该方法不会被阻塞,其中callback指定func函数完成后的回调函数,error_callback指定func函数出错后的回调函数
- imap(func, iterable[, chunksize]):map()方法的延迟版本
-imap_unrodered(func, iterable[, chunksize]):功能类似于imap()方法,但该方法不能保证所有生成的结果(包含多个元素)与原iterable中的元素顺序一致
-starmap(func, iterable[, chunksize]):类似于map()方法,该方法要求iterable的元素也是iterable对象,程序会将每一个元素解包之后作为func函数的参数
-close():关闭进程池,该方法让进程池不能再接收新任务,但会继续执行当前进程池中所有任务,直到完毕再关闭自己
- terminate():立即中止进程池
- join():等待所有进程完成
import multiprocessingimport timeimport osdef action(name='default'): print('(%s)进程正在执行,参数为: %s' % (os.getpid(), name)) time.sleep(3)if __name__ == '__main__': # 创建包含4条进程的进程池 pool = multiprocessing.Pool(processes=4) # 将action分3次提交给进程池 pool.apply_async(action) pool.apply_async(action, args=('位置参数', )) pool.apply_async(action, kwds={'name': '关键字参数'}) pool.close() pool.join()
import multiprocessingimport timeimport os# 定义一个准备作为进程任务的函数def action(max): my_sum = 0 for i in range(max): print('(%s)进程正在执行: %d' % (os.getpid(), i)) my_sum += i return my_sumif __name__ == '__main__': # 创建一个包含4条进程的进程池 with multiprocessing.Pool(processes=4) as pool: # 使用进程执行map计算 # 后面元组有3个元素,因此程序启动3条进程来执行action函数 results = pool.map(action, (50, 100, 150)) print('--------------') for r in results: print(r)
## 进程通信
Python为进程通信提供了两种机制
-Queue:一个进程向Queue中放入数据,另一个进程从Queue中读取数据
-Pipe:Pipe代表链接两个进程的管道,程序在调用Pipe()函数时会产生两个链接端,分别交给通信的两个进程,然后进程既可以从该链接端读取数据,也可以向该连接端写入数据
### 使用Queue实现进程通信
multiprocess下的Queue和queue下的Queue类似,都提供了qsize()/empty()/full()/put()/put_nowait()/get()/get_nowait()等方法,区别只是一个服务于进程,一个服务于线程
import multiprocessingdef f(q): print('(%s) 进程开始放入数据...' % multiprocessing.current_process().pid) q.put('Python')if __name__ == '__main__': # 创建进程通信的Queue q = multiprocessing.Queue() # 创建子进程 p = multiprocessing.Process(target=f, args=(q,)) # 启动子进程 p.start() print('(%s) 进程开始取出数据...' % multiprocessing.current_process().pid) # 取出数据 print(q.get()) # Python p.join()
### 使用Pipe实现进程通信
程序调用multiprocessing.Pipe()函数来创建一个管道,并返回两个PipeConnection对象,用于连接通信的两个进程,PipeConnection有如下方法:
-send(obj):发送一个obj给管道的另一端,另一端使用recv()方法接收,该obj必须是可拿到的,并且如果该对象系列化后超过32MB则可能会报ValueError异常
-recv():接受另一端通过send()方法发送过来的数据
-fileno():关于连接所使用的文件扫描器
-close():关闭连接
-poll(timeout):返回链接中是否还有数据可以读取
- send_bytes(buffer[, offset[, size]]):发送字节数据,如果没有指定offset、size参数,则默认发送buffer字节串的全部数据;如果指定了offset和size参数,则只发送buffer字节串中从offset开始,长度为size的字节数据,通过该方法发送的数据应该使用recv_bytes()或者recv_bytes_into()方法接收
-recv_bytes([maxlength]):接收通过send_bytes()方法发送的数据,maxlength指定最多接收的字节数,该方法返回接收到的字节数据
-recv_bytes_into(buffer[, offset]):功能与recv_bytes()方法类似,只是该方法将接收到的数据放在buffer中
import multiprocessingdef f(conn): print('(%s) 进程开始发送数据...' % multiprocessing.current_process().pid) # 使用conn发送数据 conn.send('Python')if __name__ == '__main__': # 创建Pipe,该函数返回两个PipeConnection对象 parent_conn, child_conn = multiprocessing.Pipe() # 创建子进程 p = multiprocessing.Process(target=f, args=(child_conn, )) # 启动子进程 p.start() print('(%s) 进程开始接收数据...' % multiprocessing.current_process().pid) # 通过conn读取数据 print(parent_conn.recv()) # Python p.join()
## 多线程与多进程
import multiprocessingimport timeclass Account: # 定义构造器 def __init__(self, account_no, balance, lock): # 封装账户编号、账户余额的两个成员变量 self.account_no = account_no self._balance = balance self.lock = lock # 因为账户余额不允许随便修改,所以只为self._balance提供getter方法 def getBalance(self): return self._balance # 提供一个进程安全的draw()方法来完成取钱操作 def draw(self, draw_amount): # 加锁 self.lock.acquire() try: # 账户余额大于取钱数目 if self._balance >= draw_amount: # 吐出钞票 print(multiprocessing.current_process().name\ + "取钱成功!吐出钞票:" + str(draw_amount)) time.sleep(0.001) # 修改余额 self._balance -= draw_amount print("\t余额为: " + str(self._balance)) else: print(multiprocessing.current_process().name\ + "取钱失败!余额不足!") finally: # 修改完成,释放锁 self.lock.release()# 定义一个函数来模拟取钱操作def draw(account, draw_amount): print(account) # 直接调用account对象的draw()方法来执行取钱操作 account.draw(draw_amount)if __name__ == '__main__': lock = multiprocessing.RLock() # 创建一个账户 acct = Account("1234567" , 900, lock) # 模拟两个进程对同一个账户取钱 multiprocessing.Process(name='甲', target=draw , args=(acct , 800)).start() multiprocessing.Process(name='乙', target=draw , args=(acct , 800)).start() multiprocessing.Process(name='丙', target=draw , args=(acct , 800)).start()
## 更多实例
多进程multiprocessing模块和多线程threading模块的使用方式很类似,在CPU密集型的程序中多线程并不能达到高效运转的效果,为了发挥多核CPU的优势使用多进程更有效果
### 多进程调用接口
# -*- coding: utf-8 -*-import multiprocessingimport requestsfrom time import ctimeimport jsondef syc_email(userID, userName): """syc_email""" print("Start synchronizing %s %s" % ("email", ctime())) parameter = {"userId":userID,"userName":userName,"enterpriseId":"10330","flag":"sended"} request_own = requests.put("https://xxxxxx.leadscloud.com/mail/receiveSendedAndRubbishMail", data=parameter) data = request_own.json() print(json.dumps(data, indent=4, sort_keys=True, ensure_ascii=False) ) print("接口调用已经返回结果,本次同步结束")dicts = {'1263':'13810078954','1294':'13810327625','1223':'18515934978','1295':'13911154792'}threads = []files = range(len(dicts))for userID, userName in dicts.items(): mp = multiprocessing.Process(target = syc_email, args = (userID, userName)) threads.append(mp)if __name__ == '__main__': for p in files: threads[p].start() for p in files: threads[p].join() print('all end: %s' % ctime())
### 执行结果
PS C:\Users\Administrator\Desktop> python .\multipreocess.pyStart synchronizing email Thu Mar 21 13:54:26 2019Start synchronizing email Thu Mar 21 13:54:26 2019Start synchronizing email Thu Mar 21 13:54:26 2019Start synchronizing email Thu Mar 21 13:54:26 2019{ "code": 1, "data": 0, "msg": "成功"}接口调用已经返回结果,本次同步结束{ "code": 1, "data": 0, "msg": "成功"}接口调用已经返回结果,本次同步结束{ "code": 1, "data": 0, "msg": "成功"}接口调用已经返回结果,本次同步结束{ "code": 1, "data": 0, "msg": "成功"}接口调用已经返回结果,本次同步结束all end: Thu Mar 21 14:03:22 2019
### 多进程启动浏览器
# -*- coding: utf-8 -*-from selenium import webdriverfrom time import sleepfrom time import ctimeimport multiprocessingdef start_browser(browser, time): if browser == "chrome": print("starting chrome browser now! %s" % ctime()) # 控制台打印当前时间 chrome_driver = webdriver.Chrome() chrome_driver.get("http://www.baidu.com") sleep(time) chrome_driver.quit() elif browser == "firefox": print("starting firefox browser now! %s" % ctime()) # 控制台打印当前时间 fire_driver = webdriver.Firefox() fire_driver.get("http://www.baidu.com") sleep(time) fire_driver.quit() else: print("starting ie browser now! %s" %ctime()) # 控制台打印当前时间 ie_driver = webdriver.Ie() ie_driver.get("http://www.baidu.com") sleep(time) ie_driver.quit()# 定义字典参数 browser_dict = {"chrome": 3, "firefox": 4}# 定义空List用于存储进程start_browser_processing = []# 循环字典Key-Value,创建进程并加入到List中for browser, time in browser_dict.items(): processing_browser = multiprocessing.Process(target=start_browser, args=(browser, time)) start_browser_processing.append(processing_browser)if __name__ == '__main__': for processing_browser in range(len(browser_dict)): start_browser_processing[processing_browser].start() for processing_browser in range(len(browser_dict)): start_browser_processing[processing_browser].join() print(u"全部结束 %s" % ctime())