并发编程(3) 进程池和线程池

Posted on Posted in python

进程池和线程池

  • concurrent.futures 模块提供了高度封装的异步调用接口
  • ThreadPoolExecutor 线程池,提供异步调用
  • ProcessPoolExecutor 进程池,提供异步调用

基本方法

  • submit(fn, *args, **kwargs) 异步提交任务
  • map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit操作
  • shutdown(wait=True)
    相当于进程池的 pool.close() + pool.join() 操作,wait = True, 等待池内所有任务执行完毕回收完资源才继续;wait = False ,立即返回,并不会等待池内的任务执行完毕;但不管 wait 参数为何值,整个程序都会等到所有任务执行完毕。submit 和 map 必须在 shutdown 之前
  • result(timeout=None) 取得结果
  • add_done_callback(fn) 回调函数

进程池

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import os, time, random

def task(n):
    print('%s is running' % os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=3)
    futures = []
    for i in range(11):
        future = executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

线程池

  • 把 ProcessPoolExecutor

map 函数

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import os, time, random

def task(n):
    print('%s is running' % os.getpid())
    time.sleep(random.randint(1,3))
    print(n**2)

if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=3)

    executor.map(task, range(1,12)) # map 取代了 for + submit

回调函数

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' % (os.getpid(), url))
    response = requests.get(url)
    if response.status_code == 200:
        return {'url':url, 'text':response.text}

def parse_page(res):
    res = res.result()
    print('<进程%s> parse %s' % (os.getpid(), res['url']))
    parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)

if __name__ == '__main__':
    urls = [
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com',
            'http://www.sina.com.cn'
            ]
    p = ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) # parse_page 拿到的是一个 future 对象 obj,需要用 obj.result() 拿到结果
» 转载请注明来源:呢喃 » 并发编程(3) 进程池和线程池