并发编程(1) multiprocessing 详解

Posted on Posted in python

multiprocessing 详解

Process 类介绍

格式

  • 需要使用关键字的方式来指定参数
  • args 指定的为传给 target 函数的位置参数,是一个元组形式,必须有逗号
Process([group [, target [, name [, args [, kwargs]]]]])

参数

  • group 参数为使用,值始终为 None
  • target 表示调用对象,即子进程要执行的任务
  • args 表示调用对象位置参数元组,args=(1,2,'abc',)
  • kwargs 表示调用对象的字典,kwargs=['name':'abc',age=20]
  • name 为子进程名字

方法介绍

  • p.start() 启动进程,并调用该子进程中的 p.run()
  • p.run() 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
  • p.terminate() 强制终止进程 p, 不会进行任何清理操作,如果 p 创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果 p 还保存了一个锁,那么也不会释放,进而导致思索
  • p.is_alive() 如果 p 仍然在运行,返回 True
  • p.join([timeout]) 主线程等待 p 终止(强调:是主线程处于等的状态,子进程处于运行的状态)。timeout 可选超时时间

属性介绍

  • p.daemon 默认为 False,如果设置为True,代表p为后台运行的守护进程,当 p 也随之终止,并且设定为 True 后,p 不能创建自己的新进程,必须在 p.start() 之前设置
  • p.name 进程名
  • p.pid 进程的 pid

Process 类的使用

方法一:开启子进程

import time                                                                     
import random                                                                   
from multiprocessing import Process                                             
                                                                                
def shuchu(name):                                                               
    print('%s shuchuing' % name)                                                
    time.sleep(random.randrange(1,5))                                           
    print('%s shuchu end' % name)                                               
                                                                                
if __name__ == '__main__':                                                      
    p1 = Process(target=shuchu,args=('a',)) # 必须加 , 号                       
    p2 = Process(target=shuchu,args=('b',)) # 必须加 , 号                       
                                                                                
    p1.start()                                                                  
    p2.start() 
    print('主')

=====>
b shuchuing
a shuchuing
a shuchu end
b shuchu end

方法二:开启子进程

import time                                                                     
import random                                                                   
from multiprocessing import Process                                             
                                                                                
class Shuchu(Process):                                                          
    def __init__(self,name):                                                    
    ┆   super().__init__()                                                                                                                                        
    ┆   self.name = name                                                        
    def run(self):                                                              
    ┆   print('%s shuchuing' % self.name)                                       
    ┆   time.sleep(random.randrange(1,5))                                       
    ┆   print('%s shuchu end' % self.name)                                      
                                                                                
if __name__ == '__main__':                                                      
    p1 = Shuchu('a')                                                            
    p2 = Shuchu('b')                                                            
                                                                                
    p1.start()                                                                  
    p2.start()                                                                  
    print('主')

join 方法

  • 在主进程的任务与子进程的任务彼此独立的情况下,主进程的任务先执行完毕后,主进程还需要等待子进程执行完毕,然后统一回收资源
  • 如果主进程的任务在执行到某一个阶段,需要子进程执行完毕后才能继续执行,就需要有一种机制让主进程检查子进程是否执行完毕,在子进程执行完毕后才继续执行,否则一直阻塞
import time, os                                                                 
import random                                                                   
from multiprocessing import Process                                             
                                                                                
def task(name):                                                                 
    print('%s is shuchuing,name is %s' % (os.getpid(),name))                    
    time.sleep(random.randrange(1,5))                                           
    print('%s is shuchuing end ,name is %s' % (os.getpid(),name))               
                                                                                
if __name__ == '__main__':                                                      
    p1 = Process(target=task,args=('a',))                                       
    p2 = Process(target=task,args=('b',))                                       
    p3 = Process(target=task,args=('c',))                                       
                                                                                
    p_l = [p1,p2,p3]                                                            
    for i in p_l:                                                               
    ┆   i.start()
         print(i.name,i.pid,)                                                               
    for i in p_l:                                                               
    ┆   i.join() # 等待 p 停止,才执行下一行代码                                                                                                                  
    print('主') 

守护进程

  • 守护进程会在主进程代码执行结束后就终止
  • 守护进程内无法再开启子进程,否则抛出异常。AssertionError
import time, os                                                                 
import random                                                                   
from multiprocessing import Process                                             
                                                                                
def task(name):                                                                 
    print('%s is shuchuing,name is %s' % (os.getpid(),name))                    
    time.sleep(random.randrange(1,5))                                           
    print('%s is shuchuing end ,name is %s' % (os.getpid(),name))               
                                                                                
if __name__ == '__main__':                                                      
    p = Process(target=task,args=('a',))                                        
    p.daemon = True # 一定要设置在 p.start() 前                                 
    p.start()                                                                   
    p.join()                                                                                                                                                      
    print('主')

互斥锁

避免并发数据进程混乱

与join的区别

join 是将任务整体串行,互斥锁是可以将一个任务中的某一段代码串行

"""                                                                             
由并发变成了串行,牺牲了运行效率,但避免了混乱                                                                                                                    
"""                                                                             
import time, os                                                                 
import random                                                                   
from multiprocessing import Process, Lock                                       
                                                                                
def work(Lock):                                                                 
    lock.acquire() # 加锁                                                       
    print('%s is running' % os.getpid())                                        
    time.sleep(2)                                                               
    print('%s is done' % os.getpid())                                           
    lock.release() # 释放锁                                                     
                                                                                
if __name__ == '__main__':                                                      
    lock = Lock()                                                               
    for i in range(3):                                                          
    ┆   p = Process(target=work, args = (lock,))                                
    ┆   p.start() 

模拟抢票

"""
db.txt : {"count":1}
"""
import time, json
from multiprocessing import Process, Lock

def search(name):
    dic = json.load(open('db.txt'))
    time.sleep(1)
    print('\033[43m%s 查到剩余票数%s\033[0m' % (name, dic['count']))

def get(name):
    dic = json.load(open('db.txt'))
    time.sleep(1) # 模拟读数据网络延迟
    if dic['count'] > 0:
        dic['count']-=1
        time.sleep(1)
        json.dump(dic,open('db.txt','w'))
        print('\033[46m%s 购票成功\033[0m' % name)

def task(name,lock):
    search(name)
    with lock: # 相当于 lock.acquire(), 执行完代码块自动执行 lock.release()
        get(name)

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        name = '<路人%s>' % i
        p = Process(target=task,args=(name,lock))
        p.start()

=====>

<路人1> 查到剩余票数1
<路人0> 查到剩余票数1
<路人3> 查到剩余票数1
<路人4> 查到剩余票数1
<路人6> 查到剩余票数1
<路人7> 查到剩余票数1
<路人2> 查到剩余票数1
<路人5> 查到剩余票数1
<路人8> 查到剩余票数1
<路人9> 查到剩余票数1
<路人1> 购票成功

队列 JoinableQueue

JoinableQueue([maxsize])

maxsize 是队列中允许最大项,省略则无大小限制

JoinableQueue 的实例 p 除了与 Queue 对象相同的方法之外还具有:

  • q.task_done() 使用者使用此方法发出信号,表示 q.get() 的返回项目已被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发 ValueError 异常
  • q.join 生产者调用此方法进行阻塞,知道队列中所有的项目均被处理。阻塞将持续队列中每个项目均调用 q.task_done() 方法为止
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

from multiprocessing import Process, JoinableQueue
import time, random, os

def consumer(q,name):
    while True:
        res = q.get()
        time.sleep(random.randint(1,3))
        print('\033[43m%s 吃 %s\033[0m' % (name,res))
        q.task_done() # 发送信号给 q.join() , 说明已从队列中取走了一个数据并处理完毕

def productor(q,name,food):
    for i in range(3):
        time.sleep(random.randint(1,3))
        res='%s%s' % (food, i)
        q.put(res)
        print('\033[45m%s 生产了 %s\033[0m' % (name,res))
    q.join() # 等到消费者把自己放入队列中的所有的数据都取走之后,生产者才结束

if __name__ == '__main__':
    q = JoinableQueue()

    # 生产者们
    p1 = Process(target=productor, args=(q,'a','包子'))
    p2 = Process(target=productor, args=(q,'b','馒头'))
    p3 = Process(target=productor, args=(q,'c','骨头'))

    # 消费者
    c1 = Process(target=consumer, args=(q,'d'))
    c2 = Process(target=consumer, args=(q,'e'))

    c1.daemon = True
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()


    p1.join()
    p2.join()
    p3.join()

    print('主')

=====>

a 生产了 包子0
b 生产了 馒头0
a 生产了 包子1
b 生产了 馒头1
d 吃 馒头0
c 生产了 骨头0
b 生产了 馒头2
c 生产了 骨头1
e 吃 包子0
d 吃 包子1
a 生产了 包子2
e 吃 馒头1
c 生产了 骨头2
e 吃 馒头2
d 吃 骨头0
d 吃 包子2
e 吃 骨头1
d 吃 骨头2
主

» 转载请注明来源:呢喃 » 并发编程(1) multiprocessing 详解

Leave a Reply

Your email address will not be published. Required fields are marked *

1 × 3 =