分享好友 编程语言首页 频道列表

Python进程间通讯与进程池超详细讲解 python进程池的作用

Python  2023-02-09 10:050

在《多进程并发与同步》中介绍了进程创建与信息共享,除此之外python还提供了更方便的进程间通讯方式。

进程间通讯

multiprocessing中提供了Pipe(一对一)和Queue(多对多)用于进程间通讯。

队列Queue

队列是一个可用于进程间共享的Queue(内部使用pipe与锁),其接口与普通队列类似:

put(obj[, block[, timeout]]):插入数据到队列(默认阻塞,且没有超时时间);

  • 若设定了超时且队列已满,会抛出queue.Full异常;
  • 队列已关闭时,抛出ValueError异常

get([block[, timeout]]):读取并删除一个元素;

  • 若设定了超时且队列为空,会抛出queue.Empty异常;
  • 队列已关闭时,抛出ValueError异常;若已阻塞后,再关闭则会一直阻塞;

qsize():返回一个近似队列长度(因多进程原因,长度会有误差);

empty()/full():队列空或慢(因多进程原因,会有误差);

close():关闭队列;

当主进程(创建Queue的)关闭队列时,子进程中的队列并没有关闭,所以getElement进程会一直阻塞等待(为保证能正常退出,需要设为后台进程):

def putElement(name, qu: multiprocessing.Queue):
    try:
        for i in range(10):
            qu.put(f"{name}-{i + 1}")
            time.sleep(.1)
    except ValueError:
        print("queue closed")
    print(f"{name}: put complete")
def getElement(name, qu: multiprocessing.Queue):
    try:
        while True:
            r = qu.get()
            print(f"{name} recv: {r}")
    except ValueError:
        print("queue closed")
    print(f"{name}: get complete")
if __name__ == '__main__':
    qu = multiprocessing.Queue(100)
    puts = [multiprocessing.Process(target=putElement, args=(f"send{i}", qu)) for i in range(10)]
    gets = [multiprocessing.Process(target=getElement, args=(f"recv{i}", qu), daemon=True) for i in range(2)]
    list(map(lambda f: f.start(), puts))
    list(map(lambda f: f.start(), gets))
    for f in puts:
        f.join()
    print("To close")
    qu.close() # 只是main中的close了,其他进程中的并没有

管道Pipe

multiprocessing.Pipe([duplex])返回一个连接对象对(conn1, conn2)。若duplex为True(默认),创建的是双向管道;否则conn1只能用于接收消息,conn2只能用于发送消息:

  • send():发送消息;
  • recv():接收消息;

进程间的Pipe基于fork机制建立:

  • 主进程创建Pipe:Pipe的两个Connections连接的的都是主进程;
  • 创建子进程后,Pipe也被拷贝了一份:此时有了4个Connections;
  • 主进程关闭一个Out Connection,子进程关闭一个In Connection:就建立好了一个输入在主进程,输出在子进程的管道。
def pipeProc(pipe):
    outPipe, inPipe = pipe
    inPipe.close() # 必须关闭,否则结束时不会收到EOFError异常
    try:
        while True:
            r = outPipe.recv()
            print("Recv:", r)
    except EOFError:
        print("RECV end")
if __name__ == '__main__':
    outPipe, inPipe = multiprocessing.Pipe()
    sub = multiprocessing.Process(target=pipeProc, args=((outPipe, inPipe),))
    sub.start()
    outPipe.close() # 必须在进程成功运行后,才可关闭
    with inPipe:
        for x in range(10):
            inPipe.send(x)
            time.sleep(.1)
    print("send complete")
    sub.join()

进程池Pool

虽然使用多进程能提高效率,但进程的创建与销毁会消耗较长时间;同时,过多进程会引起频繁的调度,也增加了开销。

进程池中有固定数量的进程:

  • 请求到来时,从池中取出一个进程来处理任务;理完毕后,进程并不立即关闭,而是再放回进程池中;
  • 当池中进程数量不够,请求就要等待,直到拿到空闲进程后才能继续执行;
  • 池中进程的数量是固定的,隐藏同一时间最多有固定数量的进程在运行。

multiprocessing.Pool([processes[, initializer[, initargs]]])

  • processes:要创建进程数量(默认os.cpu_count()个),在需要时才会创建;
  • initializer(*initargs):每个工作进程启动时执行的方法(一般processes为几就执行几次);

Pool类中主要方法:

  • apply(func[, args[, kwds]]):以阻塞方式,从池中获取进程并执行func(*args,**kwargs)
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):异步方式(从池中获取一个进程)执行func(*args,**kwargs),返回AsyncResult;
  • map(func, iterable[, chunksize])/map_async:map的并行版本(可同时处理多个任务),异步时返回MapResult;
  • starmap(func, iterable[, chunksize])/starmap_async:与map的区别是允许传入多个参数;
  • imap(func, iterable[, chunksize]):map的惰性版本(返回结果是可迭代对象),内存消耗会低些,返回迭代器IMapIterator;
  • imap_unordered(func, iterable[, chunksize]):imap返回的结果顺序与map顺序是相同的,而此方法返回的顺序是乱序的(不依次等待每个任务完成,先完成的先返回),返回迭代器IMapIterator;
  • close():关闭,禁止继续提交任务(已提交任务会继续执行完成);
  • terminate():立即终止所有任务;
  • join():等待工作进程完成(必须已close或terminate了);
def poolWorker():
    print(f"worker in process {os.getpid()}")
    time.sleep(1)
def poolWorkerOne(name):
    print(f"worker one {name} in process {os.getpid()}")
    time.sleep(random.random())
    return name
def poolWorkerTwo(first, second):
    res = first + second
    print(f"worker two {res} in process {os.getpid()}")
    time.sleep(1./(first+1))
    return res
def poolInit():
    print("pool init")
if __name__ == '__main__':
    workers = multiprocessing.Pool(5, poolInit) # poolInit会被调用5次(线程启动时)
    with workers:
        for i in range(5):
            workers.apply_async(poolWorker)
        arg = [(i, i) for i in range(10)]
        workers.map_async(poolWorkerOne, arg)
        results = workers.starmap_async(poolWorkerTwo, arg) # 每个元素(元组)会被拆分为独立的参数
        print("Starmap:", results.get())
        results = workers.imap_unordered(poolWorkerOne, arg)
        for r in results: # r是乱序的(若使用imap,则与输入arg的顺序相同)
            print("Unordered:", r)
    # 必须保证workers已close了
    workers.join()
原文地址:https://blog.csdn.net/alwaysrun/article/details/127185356

查看更多关于【Python】的文章

展开全文
相关推荐
反对 0
举报 0
评论 0
图文资讯
热门推荐
优选好物
更多热点专题
更多推荐文章
如何在Abaqus的python中调用Matlab程序
目录1. 确定版本信息2. 备份python3. 设置环境变量4. 安装程序5. 调试运行参考资料Abaqus2018操作系统Win10 64位Python版本2.7(路径C:\SIMULIA\CAE\2018\win_b64\tools\SMApy\python2.7)2. 备份python将上述的“python2.7”文件夹复制出来,避免因操作错误

0评论2023-03-16608

sf02_选择排序算法Java Python rust 实现
Java 实现package common;public class SimpleArithmetic {/** * 选择排序 * 输入整形数组:a[n] 【4、5、3、7】 * 1. 取数组编号为i(i属于[0 , n-2])的数组值 a[i],即第一重循环 * 2. 假定a[i]为数组a[k](k属于[i,n-1])中的最小值a[min],即执行初始化 min =i

0评论2023-02-09407

Python vs Ruby: 谁是最好的 web 开发语言?
Python 和 Ruby 都是目前用来开发 websites、web-based apps 和 web services 的流行编程语言之一。 这两种语言在许多方面有相似之处。它们都是高级的面向对象的编程语言,都是交互式脚本语言、都提供标准库且支持持久化。但是,Python 和 Ruby 的解决方法却

0评论2023-02-09819

Python+Sklearn实现异常检测
目录离群检测 与 新奇检测Sklearn 中支持的方法孤立森林 IsolationForestLocal Outlier FactorOneClassSVMElliptic Envelope离群检测 与 新奇检测很多应用场景都需要能够确定样本是否属于与现有的分布,或者应该被视为不同的分布。离群检测(Outlier detectio

0评论2023-02-09736

Python异常与错误处理详细讲解 python的异常
基础知识优先使用异常捕获LBYL(look before you leap): 在执行一个可能出错的操作时,先做一些关键的条件判断,仅当满足条件时才进行操作。EAFP(eaiser to ask for forgiveness than permission): 不做事前检查,直接执行操作。后者更优: 代码简洁,效率更高

0评论2023-02-09962

Python多线程与同步机制浅析
目录线程实现Thread类函数方式继承方式同步机制同步锁Lock条件变量Condition信号量Semaphore事件Event屏障BarrierGIL全局解释器锁线程实现Python中线程有两种方式:函数或者用类来包装线程对象。threading模块中包含了丰富的多线程支持功能:threading.curren

0评论2023-02-09409

python基础之reverse和reversed函数的介绍及使用
目录一、reverse二、reversed附:Python中reverse和reversed反转列表的操作方法总结一、reversereverse()是python中列表的一个内置方法(在字典、字符串和元组中没有这个内置方法),用于列表中数据的反转例子:lista = [1, 2, 3, 4]lista.reverse()print(lista

0评论2023-02-09878

Python多进程并发与同步机制超详细讲解
目录多进程僵尸进程Process类函数方式继承方式同步机制状态管理Managers在《多线程与同步》中介绍了多线程及存在的问题,而通过使用多进程而非线程可有效地绕过全局解释器锁。 因此,通过multiprocessing模块可充分地利用多核CPU的资源。多进程多进程是通过mu

0评论2023-02-09469

Python PyMuPDF实现PDF与图片和PPT相互转换
目录安装与简介MuPDFPyMuPDFPyMuPDF使用元数据页面Page代码示例PDF转图片图片转PDFPDF转PPT文章目录 安装与简介MuPDFPyMuPDF PyMuPDF使用元数据页面Page 代码示例PDF转图片图片转PDFPDF转PPTPyMuPDF提供了PDF及流行图片处理接口。安装与简介安装:pip install

0评论2023-02-09349

更多推荐