插件窝 干货文章 Python实现cpu并行运算的两种方式

Python实现cpu并行运算的两种方式

进程 函数 multiprocessing 使用 131    来源:    2024-10-28

Python一共有两种并行方式

1. 使用multiprocessing

第一种方式用于单个节点内部的并行,也就是说同时发起的进程数不能超过你单个机器CPU的线程数。
以下是第一种方式的并行程序:

import multiprocessing
import time
import os
import numpy as np
ncore=20

def run(core):
    Your code
    reture 0

if __name__ == '__main__':
    print(time.strftime('%Y-%m-%d %H:%M:%S'))
    param = np.arange(20)
    p = multiprocessing.Pool(ncore)
    p.map(run, param)
    p.close()
    p.join()
    print(time.strftime('%Y-%m-%d %H:%M:%S'))

提交脚本直接:

python your_job_name.py

2. 使用mpi4py

第二种方式用于跨节点的并行,可以发起成千上百个CPU的并行。
以下是第二中方式的并行程序:

from mpi4py import MPI
import time
import os
import numpy as np
ncore=20

def run(core):
    Your code
    reture 0

if __name__ == '__main__':
    print(time.strftime('%Y-%m-%d %H:%M:%S'))
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    run(rank)
    print(time.strftime('%Y-%m-%d %H:%M:%S'))

提交脚本需要用到mpi

mpiexec -n cpu_number python your_job_name.py

知识拓展:python多进程模式实现多核CPU的并行计算

Python中的多进程模式

在Python中,可以使用multiprocessing模块来实现多进程。multiprocessing是Python标准库中的一个模块,用于管理多进程的创建和通信。

在multiprocessing中,可以使用Process类来创建进程,Process类的构造函数可以接受一个函数作为参数。

该函数将在子进程中执行。下面是一个简单的示例:

import multiprocessing  
def worker():  
    print("Worker process started")  
if __name__ == '__main__':  
    p = multiprocessing.Process(target=worker)  
    p.start()  
    p.join()  

在上面的示例中,我们首先定义了一个worker函数,然后使用Process类创建了一个进程,并将worker函数作为参数传递给Process类的构造函数。

最后,我们调用Process类的start方法启动进程,并调用Process类的join方法等待进程结束。

  1. 提高程序执行效率的方法

在Python中使用多进程模式提高程序执行效率,可以通过以下几种方式来实现:

  1. 1 多进程并发执行任务

在多进程模式下,可以将任务分配给多个进程并行执行,从而利用多核CPU的优势。

在Python中,可以使用multiprocessing模块来实现多进程并发执行任务。

下面是一个简单的示例:

import multiprocessing  
def worker(name):  
    print("Worker %s started" % name)  
if __name__ == '__main__':  
    for i in range(5):  
        p = multiprocessing.Process(target=worker, args=(i,))  
        p.start()  

在上面的示例中,我们定义了一个worker函数,该函数接受一个参数name,并在函数体中打印出Worker name started的信息。

然后我们使用for循环创建了5个进程,并将worker函数和对应的参数传递给Process类的构造函数。

最后,我们调用Process类的start方法启动进程。

  1. 2 进程池

对于大量重复的任务,可以使用进程池来维护一定数量的进程,每个进程执行一个任务后返回结果,然后再由进程池分配下一个任务。

这样可以避免频繁地创建和销毁进程,提高效率。在Python中,可以使用multiprocessing模块的Pool类来实现进程池。

下面是一个简单的示例:

import multiprocessing  
def worker(name):  
    print("Worker %s started" % name)  
if __name__ == '__main__':  
    with multiprocessing.Pool(processes=4) as pool:  
        pool.map(worker, range(10))  

在上面的示例中,我们定义了一个worker函数,该函数接受一个参数name,并在函数体中打印出Worker name started的信息。

然后我们使用with语句创建了一个进程池,并指定进程池中的进程数量为4。

最后,我们使用Pool类的map方法将worker函数和对应的参数传递给进程池,进程池会自动分配任务给不同的进程执行。

  1. 3 消息队列

在多进程模式下,不同的进程之间需要进行通信,可以利用消息队列来实现进程间通信。

Python中可以使用Queue模块来实现消息队列。下面是一个简单的示例:

import multiprocessing  
def producer(queue):  
    for i in range(10):  
        queue.put(i)  
def consumer(queue):  
    while not queue.empty():  
        print(queue.get())  
if __name__ == '__main__':  
    queue = multiprocessing.Queue()  
    p1 = multiprocessing.Process(target=producer, args=(queue,))  
    p2 = multiprocessing.Process(target=consumer, args=(queue,))  
    p1.start()  
    p2.start()  
    p1.join()  
    p2.join()  

在上面的示例中,我们定义了一个producer函数和一个consumer函数,producer函数将0~9的数字放入消息队列,consumer函数从消息队列中取出数字并打印出来。

然后我们使用multiprocessing模块的Queue类创建了一个消息队列,并使用Process类创建了两个进程分别执行producer函数和consumer函数。

  1. 4 共享内存

对于需要多个进程共享的数据,可以使用共享内存来避免数据拷贝和进程间通信的开销。

在Python中,可以使用multiprocessing模块的Value和Array类来实现共享内存。

下面是一个简单的示例:

import multiprocessing  
def worker(counter):  
    counter.value += 1  
if __name__ == '__main__':  
    counter = multiprocessing.Value('i', 0)  
    processes = []  
    for i in range(5):  
        p = multiprocessing.Process(target=worker, args=(counter,))  
        processes.append(p)  
        p.start()  
    for p in processes:  
        p.join()  
    print(counter.value)  

在上面的示例中,我们定义了一个worker函数,该函数接受一个参数counter,每次执行时将counter的值加1。

然后我们使用multiprocessing模块的Value类创建了一个整型变量counter,并使用Process类创建了5个进程分别执行worker函数。

最后,我们打印出counter的值。

  1. 5 异步IO

对于I/O密集型任务,可以使用异步IO来提高效率。在Python中,可以使用asyncio模块来实现异步IO。

下面是一个简单的示例:

import asyncio  
async def worker():  
    await asyncio.sleep(1)  
    print("Worker process started")  
loop = asyncio.get_event_loop()  
loop.run_until_complete(worker())  

在上面的示例中,我们定义了一个worker函数,该函数使用asyncio库的异步IO特性。

在函数体中,使用asyncio.sleep函数模拟了一个长时间的I/O操作,并在操作完成后打印了一条消息。

然后我们使用asyncio库的get_event_loop函数创建了一个事件循环,并使用run_until_complete函数启动worker函数。在程序执行过程中,事件循环会负责调度和执行异步IO操作。

  1. 总结

在Python中,使用多进程模式可以实现多核CPU的并行计算,从而提高程序的执行效率。

在本文中,我们介绍了如何使用Python的multiprocessing模块实现多进程并发执行任务、进程池、消息队列、共享内存、异步IO等方式来提高程序执行效率。

实际应用中,需要根据具体的场景选择合适的并行计算方式,并注意避免死锁等常见问题。

到此这篇关于Python实现cpu并行运算的两种方式的文章就介绍到这了,更多相关Python cpu并行运算内容请搜索插件窝以前的文章或继续浏览下面的相关文章希望大家以后多多支持插件窝!