2024-11-06
Python
00

目录

Python多进程编程详解:一个实用的任务分发与处理示例
引言
代码结构概览
详细代码解析
1. 导入模块
2. 定义工作进程函数 process_task
3. 定义任务分发函数 distribute_tasks
4. 主函数 main
a. 初始化参数
b. 确定进程数量和任务队列大小
c. 配置日志
d. 创建任务队列和工作进程
e. 分发任务
f. 等待任务完成并关闭进程
g. 输出总耗时
5. 运行主函数
总结
实际应用建议
整体程序

Python多进程编程详解:一个实用的任务分发与处理示例

在现代计算中,利用多核CPU的能力来提高程序的性能已成为必不可少的技能。Python的multiprocessing模块提供了简便的接口来实现多进程并行计算。本文将通过一个实际的代码示例,详细介绍如何使用Python的多进程来分发和处理任务。

引言

在处理大量任务时,单线程或单进程往往无法充分利用多核CPU的性能。通过多进程并行处理,可以显著提高程序的执行效率。下面,我们将介绍一个完整的代码示例,演示如何使用multiprocessing模块实现任务的并行处理。

代码结构概览

首先,让我们先看一下代码的整体结构:

python
import logging import time from multiprocessing import cpu_count, get_context def process_task(task_queue, enable_trigger): # 处理任务的工作进程函数 pass def distribute_tasks(task_queue, batch_size, incremmental, task_size): # 分发任务到任务队列 pass def main(): # 主函数,设置多进程环境并启动任务处理 pass if __name__ == '__main__': main()

代码主要包含以下几个部分:

  1. 导入必要的模块:包括loggingtimemultiprocessing
  2. 定义工作进程函数process_task,用于处理从任务队列中获取的任务。
  3. 定义任务分发函数distribute_tasks,用于将任务放入任务队列。
  4. 主函数main,设置多进程环境,启动工作进程,并分发任务。

详细代码解析

接下来,我们将逐步解析代码的每个部分。

1. 导入模块

python
import logging import time from multiprocessing import cpu_count, get_context
  • logging:用于记录日志信息,方便调试和监控程序运行。
  • time:用于记录时间,计算程序运行耗时。
  • multiprocessing:核心模块,提供创建多进程所需的功能。

2. 定义工作进程函数 process_task

python
def process_task(task_queue, enable_trigger): while True: task = task_queue.get() if task is None: # 接收到退出信号,结束循环 task_queue.task_done() break # 处理任务(此处为占位符,实际逻辑需自行实现) logging.info(f"Processing task: {task}") time.sleep(0.1) # 模拟处理时间 task_queue.task_done()

功能说明

  • 不断从task_queue中获取任务。
  • 如果接收到None,表示需要结束进程,跳出循环。
  • 否则,处理任务(此处用time.sleep(0.1)模拟处理时间)。
  • 调用task_queue.task_done(),表示任务已处理完毕。

关键点

  • 使用while True循环,使进程持续运行,直到接收到退出信号。
  • 使用task_queue.get()从队列中获取任务,队列是进程间通信的主要方式。
  • 通过task_queue.task_done()通知队列,任务已完成。

3. 定义任务分发函数 distribute_tasks

python
def distribute_tasks(task_queue, batch_size, incremmental, task_size): # 将任务分发到任务队列中 for i in range(batch_size): task_queue.put(f"Task {i}") logging.info(f"Distributed {batch_size} tasks.")

功能说明

  • 将一批任务放入task_queue中,供工作进程消费。
  • batch_size决定了任务的数量。

关键点

  • 使用task_queue.put()将任务放入队列。
  • 任务可以是任何可序列化的对象,这里使用字符串表示。

4. 主函数 main

python
def main(): start = time.time() max_workers = 0 enable_trigger = False incremmental = True # 假设这是一个布尔标志 num_processor = int(max_workers) or cpu_count() task_size = num_processor * 3 ctx = get_context('spawn') # 使用'spawn'以兼容Windows batch_size = 1000 logging.basicConfig(level=logging.INFO) logging.info( f"[start update candidate resume lang]: " f"use processors {num_processor}, " f"task_length {task_size}" ) task_queue = ctx.JoinableQueue(maxsize=task_size) # 创建工作进程 workers = [] for _ in range(num_processor): p = ctx.Process(target=process_task, args=(task_queue, enable_trigger)) p.start() workers.append(p) # 分发任务 distribute_tasks(task_queue, batch_size, incremmental, task_size) # 等待所有任务完成 task_queue.join() # 发送信号以停止工作进程 for _ in workers: task_queue.put(None) # 等待所有工作进程结束 for p in workers: p.join() logging.info(f"Finished processing in {time.time() - start} seconds.")

功能说明

  • 设置多进程环境,创建工作进程。
  • 分发任务到任务队列。
  • 等待任务完成,发送结束信号,关闭进程。

关键点详解

a. 初始化参数

python
start = time.time() max_workers = 0 enable_trigger = False incremmental = True # 假设这是一个布尔标志
  • start:记录开始时间,用于计算总耗时。
  • max_workers:最大工作进程数,默认为0。
  • enable_triggerincremmental:控制流程的标志变量。

b. 确定进程数量和任务队列大小

python
num_processor = int(max_workers) or cpu_count() task_size = num_processor * 3 ctx = get_context('spawn') # 使用'spawn'以兼容Windows batch_size = 1000
  • num_processor:如果max_workers为0,则使用cpu_count()获取CPU核心数。
  • task_size:任务队列的最大大小,设置为进程数的3倍。
  • ctx:获取上下文,这里使用spawn以兼容Windows操作系统。
  • batch_size:一次分发的任务数量。

c. 配置日志

python
logging.basicConfig(level=logging.INFO) logging.info( f"[start update candidate resume lang]: " f"use processors {num_processor}, " f"task_length {task_size}" )
  • 配置日志级别为INFO,用于输出信息。
  • 记录使用的处理器数量和任务长度。

d. 创建任务队列和工作进程

python
task_queue = ctx.JoinableQueue(maxsize=task_size) # 创建工作进程 workers = [] for _ in range(num_processor): p = ctx.Process(target=process_task, args=(task_queue, enable_trigger)) p.start() workers.append(p)
  • 创建一个可加入的任务队列JoinableQueue,指定最大大小maxsize
  • 使用Process创建工作进程,目标函数为process_task,并传入参数。
  • 启动进程并将其添加到workers列表中。

e. 分发任务

python
distribute_tasks(task_queue, batch_size, incremmental, task_size)
  • 调用distribute_tasks函数,将任务放入任务队列中。

f. 等待任务完成并关闭进程

python
# 等待所有任务完成 task_queue.join() # 发送信号以停止工作进程 for _ in workers: task_queue.put(None) # 等待所有工作进程结束 for p in workers: p.join()
  • task_queue.join():阻塞主进程,直到队列中的所有任务都被处理完毕。
  • 通过向每个工作进程发送None,通知它们退出循环,结束进程。
  • 使用p.join()等待所有进程结束,确保资源被正确释放。

g. 输出总耗时

python
logging.info(f"Finished processing in {time.time() - start} seconds.")
  • 计算并输出程序的总运行时间。

5. 运行主函数

python
if __name__ == '__main__': main()
  • 确保只有在直接运行脚本时才执行main()函数。
  • 这是Python的惯用法,防止在被导入时意外执行代码。

总结

通过上述代码,我们实现了一个简单而实用的多进程任务处理程序。关键点包括:

  • 使用multiprocessing模块的ProcessJoinableQueue来实现进程间通信和任务分发。
  • 使用get_context('spawn')来兼容不同的操作系统,特别是Windows。
  • 通过发送特殊的退出信号(如None),优雅地结束工作进程。
  • 使用task_queue.join()task_queue.task_done()来同步任务的完成情况。

实际应用建议

  • 任务设计:在实际应用中,任务可能是需要大量计算或IO操作的函数。应将实际的处理逻辑替换占位符部分。
  • 异常处理:在生产环境中,需添加异常处理机制,确保进程异常退出时不会导致死锁或资源泄漏。
  • 资源管理:注意进程和队列等资源的正确关闭和释放,防止内存泄漏。
  • 性能调优:根据任务的复杂度和系统资源,适当调整num_processortask_size等参数。

整体程序

python
import logging import time from multiprocessing import cpu_count, get_context def process_task(task_queue, enable_trigger): while True: task = task_queue.get() if task is None: # Signal to exit task_queue.task_done() break # Process the task (placeholder for actual processing logic) logging.info(f"Processing task: {task}") time.sleep(0.1) # Simulate processing time task_queue.task_done() def distribute_tasks(task_queue, batch_size, incremmental, task_size): # Distribute tasks into the task_queue for i in range(batch_size): task_queue.put(f"Task {i}") logging.info(f"Distributed {batch_size} tasks.") def main(): start = time.time() max_workers = 0 enable_trigger = False incremmental = True # Assuming this is a boolean flag num_processor = int(max_workers) or cpu_count() task_size = num_processor * 3 ctx = get_context('spawn') # Use 'spawn' for Windows compatibility batch_size = 1000 logging.basicConfig(level=logging.INFO) logging.info( f"[start update candidate resume lang]: " f"use processors {num_processor}, " f"task_length {task_size}" ) task_queue = ctx.JoinableQueue(maxsize=task_size) # Create worker processes workers = [] for _ in range(num_processor): p = ctx.Process(target=process_task, args=(task_queue, enable_trigger)) p.start() workers.append(p) # Corrected function call with missing comma distribute_tasks(task_queue, batch_size, incremmental, task_size) # Wait for all tasks to be processed task_queue.join() # Send a signal to stop the workers for _ in workers: task_queue.put(None) # Properly close the pool for p in workers: p.join() logging.info(f"Finished processing in {time.time() - start} seconds.") if __name__ == '__main__': main()
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:Dong

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!