在现代计算中,利用多核CPU的能力来提高程序的性能已成为必不可少的技能。Python的multiprocessing
模块提供了简便的接口来实现多进程并行计算。本文将通过一个实际的代码示例,详细介绍如何使用Python的多进程来分发和处理任务。
在处理大量任务时,单线程或单进程往往无法充分利用多核CPU的性能。通过多进程并行处理,可以显著提高程序的执行效率。下面,我们将介绍一个完整的代码示例,演示如何使用multiprocessing
模块实现任务的并行处理。
首先,让我们先看一下代码的整体结构:
pythonimport logging
import time
from multiprocessing import cpu_count, get_context
def process_task(task_queue, enable_trigger):
# 处理任务的工作进程函数
pass
def distribute_tasks(task_queue, batch_size, incremmental, task_size):
# 分发任务到任务队列
pass
def main():
# 主函数,设置多进程环境并启动任务处理
pass
if __name__ == '__main__':
main()
代码主要包含以下几个部分:
logging
、time
和multiprocessing
。process_task
,用于处理从任务队列中获取的任务。distribute_tasks
,用于将任务放入任务队列。main
,设置多进程环境,启动工作进程,并分发任务。接下来,我们将逐步解析代码的每个部分。
pythonimport logging
import time
from multiprocessing import cpu_count, get_context
logging
:用于记录日志信息,方便调试和监控程序运行。time
:用于记录时间,计算程序运行耗时。multiprocessing
:核心模块,提供创建多进程所需的功能。process_task
pythondef process_task(task_queue, enable_trigger):
while True:
task = task_queue.get()
if task is None:
# 接收到退出信号,结束循环
task_queue.task_done()
break
# 处理任务(此处为占位符,实际逻辑需自行实现)
logging.info(f"Processing task: {task}")
time.sleep(0.1) # 模拟处理时间
task_queue.task_done()
功能说明:
task_queue
中获取任务。None
,表示需要结束进程,跳出循环。time.sleep(0.1)
模拟处理时间)。task_queue.task_done()
,表示任务已处理完毕。关键点:
while True
循环,使进程持续运行,直到接收到退出信号。task_queue.get()
从队列中获取任务,队列是进程间通信的主要方式。task_queue.task_done()
通知队列,任务已完成。distribute_tasks
pythondef distribute_tasks(task_queue, batch_size, incremmental, task_size):
# 将任务分发到任务队列中
for i in range(batch_size):
task_queue.put(f"Task {i}")
logging.info(f"Distributed {batch_size} tasks.")
功能说明:
task_queue
中,供工作进程消费。batch_size
决定了任务的数量。关键点:
task_queue.put()
将任务放入队列。main
pythondef main():
start = time.time()
max_workers = 0
enable_trigger = False
incremmental = True # 假设这是一个布尔标志
num_processor = int(max_workers) or cpu_count()
task_size = num_processor * 3
ctx = get_context('spawn') # 使用'spawn'以兼容Windows
batch_size = 1000
logging.basicConfig(level=logging.INFO)
logging.info(
f"[start update candidate resume lang]: "
f"use processors {num_processor}, "
f"task_length {task_size}"
)
task_queue = ctx.JoinableQueue(maxsize=task_size)
# 创建工作进程
workers = []
for _ in range(num_processor):
p = ctx.Process(target=process_task, args=(task_queue, enable_trigger))
p.start()
workers.append(p)
# 分发任务
distribute_tasks(task_queue, batch_size, incremmental, task_size)
# 等待所有任务完成
task_queue.join()
# 发送信号以停止工作进程
for _ in workers:
task_queue.put(None)
# 等待所有工作进程结束
for p in workers:
p.join()
logging.info(f"Finished processing in {time.time() - start} seconds.")
功能说明:
关键点详解:
pythonstart = time.time()
max_workers = 0
enable_trigger = False
incremmental = True # 假设这是一个布尔标志
start
:记录开始时间,用于计算总耗时。max_workers
:最大工作进程数,默认为0。enable_trigger
和incremmental
:控制流程的标志变量。pythonnum_processor = int(max_workers) or cpu_count()
task_size = num_processor * 3
ctx = get_context('spawn') # 使用'spawn'以兼容Windows
batch_size = 1000
num_processor
:如果max_workers
为0,则使用cpu_count()
获取CPU核心数。task_size
:任务队列的最大大小,设置为进程数的3倍。ctx
:获取上下文,这里使用spawn
以兼容Windows操作系统。batch_size
:一次分发的任务数量。pythonlogging.basicConfig(level=logging.INFO)
logging.info(
f"[start update candidate resume lang]: "
f"use processors {num_processor}, "
f"task_length {task_size}"
)
INFO
,用于输出信息。pythontask_queue = ctx.JoinableQueue(maxsize=task_size)
# 创建工作进程
workers = []
for _ in range(num_processor):
p = ctx.Process(target=process_task, args=(task_queue, enable_trigger))
p.start()
workers.append(p)
JoinableQueue
,指定最大大小maxsize
。Process
创建工作进程,目标函数为process_task
,并传入参数。workers
列表中。pythondistribute_tasks(task_queue, batch_size, incremmental, task_size)
distribute_tasks
函数,将任务放入任务队列中。python# 等待所有任务完成
task_queue.join()
# 发送信号以停止工作进程
for _ in workers:
task_queue.put(None)
# 等待所有工作进程结束
for p in workers:
p.join()
task_queue.join()
:阻塞主进程,直到队列中的所有任务都被处理完毕。None
,通知它们退出循环,结束进程。p.join()
等待所有进程结束,确保资源被正确释放。pythonlogging.info(f"Finished processing in {time.time() - start} seconds.")
pythonif __name__ == '__main__':
main()
main()
函数。通过上述代码,我们实现了一个简单而实用的多进程任务处理程序。关键点包括:
multiprocessing
模块的Process
和JoinableQueue
来实现进程间通信和任务分发。get_context('spawn')
来兼容不同的操作系统,特别是Windows。None
),优雅地结束工作进程。task_queue.join()
和task_queue.task_done()
来同步任务的完成情况。num_processor
和task_size
等参数。pythonimport logging
import time
from multiprocessing import cpu_count, get_context
def process_task(task_queue, enable_trigger):
while True:
task = task_queue.get()
if task is None:
# Signal to exit
task_queue.task_done()
break
# Process the task (placeholder for actual processing logic)
logging.info(f"Processing task: {task}")
time.sleep(0.1) # Simulate processing time
task_queue.task_done()
def distribute_tasks(task_queue, batch_size, incremmental, task_size):
# Distribute tasks into the task_queue
for i in range(batch_size):
task_queue.put(f"Task {i}")
logging.info(f"Distributed {batch_size} tasks.")
def main():
start = time.time()
max_workers = 0
enable_trigger = False
incremmental = True # Assuming this is a boolean flag
num_processor = int(max_workers) or cpu_count()
task_size = num_processor * 3
ctx = get_context('spawn') # Use 'spawn' for Windows compatibility
batch_size = 1000
logging.basicConfig(level=logging.INFO)
logging.info(
f"[start update candidate resume lang]: "
f"use processors {num_processor}, "
f"task_length {task_size}"
)
task_queue = ctx.JoinableQueue(maxsize=task_size)
# Create worker processes
workers = []
for _ in range(num_processor):
p = ctx.Process(target=process_task, args=(task_queue, enable_trigger))
p.start()
workers.append(p)
# Corrected function call with missing comma
distribute_tasks(task_queue, batch_size, incremmental, task_size)
# Wait for all tasks to be processed
task_queue.join()
# Send a signal to stop the workers
for _ in workers:
task_queue.put(None)
# Properly close the pool
for p in workers:
p.join()
logging.info(f"Finished processing in {time.time() - start} seconds.")
if __name__ == '__main__':
main()
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!