Python 学习笔记 多进程 multiprocessing

Python 解释器有一个全局解释器锁(PIL),导致每个 Python 进程中最多同时运行一个线程,因此 Python 多线程程序并不能改善程序性能,不能发挥多核系统的优势,可以通过这篇文章了解。

但是多进程程序不受此影响, Python 2.6 引入了 multiprocessing 来解决这个问题。这里介绍 multiprocessing 模块下的进程,进程同步,进程间通信和进程管理四个方面的内容。 这里主要讲解多进程的典型使用,multiprocessing 的 API 几乎是完复制了 threading 的API, 因此只需花少量的时间就可以熟悉 threading 编程了。

Process

先来看一段代码

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Process, current_process
def func():
time.sleep(1)
proc = current_process()
proc.name, proc.pid
sub_proc = Process(target=func, args=())
sub_proc.start()
sub_proc.join()
proc = current_process()
proc.name, proc.pid

这是在主进程中创建子进程,然后启动(start) 子进程,等待(join) 子进程执行完,再继续执行主进程的整个的执行流程。

那么,一个进程应该是用来做什么的,它应该保存一些什么状态,它的生命周期是什么样的呢?

一个进程需要处理一些不同任务,或者处理不同的对象。创建进程需要一个 function 和相关参数,参数可以是dictProcess(target=func, args=(), kwargs = {})name 可以用来标识进程。

控制子进程进入不同阶段的是 start()join()is_alive()terminate()exitcode 方法。这些方法只能在创建子进程的进程中执行。

进程同步

Lock

锁是为了确保数据一致性,比如读写锁,每个进程给一个变量增加 1 ,但是如果在一个进程读取但还没有写入的时候,另外的进程也同时读取了,并写入该值,则最后写入的值是错误的,这时候就需要锁。

1
2
3
4
5
6
7
8
9
def func(lock):
lock.acquire()
# do mysql query select update ...
lock.release()
lock = Lock()
for i in xrange(4):
proc = Process(target=func, args=(lock))
proc.start()

Lock 同时也实现了 ContextManager API, 可以结合 with 语句使用, 关于 ContextManager, 请移步 Python 学习实践笔记 装饰器 与 context 查看。

Semaphore

Semaphore 和 Lock 稍有不同,Semaphore 相当于 N 把锁,获取其中一把就可以执行了。 信号量的总数 N 在构造时传入,s = Semaphore(N)。 和 Lock 一样,如果信号量为0,则进程堵塞,直到信号大于0。

Pipes

Pipe 是在两个进程之间通信的工具,Pipe Constructor 会返回两个端

1
conn1, conn2 = Pipe(True)

如果是全双工的(构造函数参数为True),则双端口都可接收发送,否则前面的端口用于接收,后面的端口用于发送。

1
2
3
4
5
6
7
8
9
10
11
def proc1(pipe):
for i in xrange(10000):
pipe.send(i)
def proc2(pipe):
while True:
print "proc2 rev:", pipe.recv()
pipe = Pipe()
Process(target=proc1, args=(pipe[0],)).start()
Process(target=proc2, args=(pipe[1],)).start()

Pipe 的每个端口同时最多一个进程读写,否则数据会出各种问题

Queues

multiprocessing.Queue 与 Queue.Queue 非常相似。其 API 列表如下

  • qsize()
  • empty()
  • full()
  • put()
  • put_nowait()
  • get()
  • get_nowait()
  • close()
  • join_thread()
  • cancel_join_thread()

当 Queue 为 Queue.Full 状态时,再 put() 会堵塞,当状态为 Queue.Empty 时,再 get() 也是。当 put() 或 get() 设置了超时参数,而超时的时候,会抛出异常。

Queue 主要用于多个进程产生和消费,一般使用情况如下

1
2
3
4
5
6
7
8
9
10
11
12
def producer(q):
for i in xrange(10):
q.put(i)
def consumer(q):
while True:
print "consumer", q.get()
q = Queue(40)
for i in xrange(10):
Process(target=producer, args=(q,)).start()
Process(target=consumer, args=(q,)).start()

十个生产者进程,一个消费者进程,共用同一个队列进行同步。

有一个简化版本的 multiprocessing.queues.SimpleQueue, 只支持3个方法 empty(), get(), put()。

也有一个强化版本的 JoinableQueue, 新增两个方法 task_done() 和 join()。 task_done() 是给消费者使用的,每完成队列中的一个任务,调用一次该方法。当所有的 tasks 都完成之后,交给调用 join() 的进程执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def consumer(q):
while True:
print "consumer", q.get()
q.task_done()
jobs = JoinableQueue()
for i in xrange(10):
jobs.put(i)
for i in xrange(10):
p = Process(target=consumer, args=(jobs,))
p.daemon = True
p.start()
jobs.join()

这个 join 函数等待 JoinableQueue 为空的时候,等待就结束,外面的进程可以继续执行了,但是那10个进程干嘛去了呢,他们还在等待呀,上面是设置了 p.daemon = True, 子进程才随着主进程结束的,如果没有设置,它们还是会一直等待的呢。

Lock、Pipe、Queue 和 Pipe 需要注意的是:尽量避免使用 Process.terminate 来终止程序,否则将会导致很多问题, 详情请移步python 官方文档查看。

进程间数据共享

前一节中, Pipe、Queue 都有一定数据共享的功能,但是他们会堵塞进程, 这里介绍的两种数据共享方式都不会堵塞进程, 而且都是多进程安全的。

共享内存

共享内存有两个结构,一个是 Value, 一个是 Array,这两个结构内部都实现了锁机制,因此是多进程安全的。 用法如下:

1
2
3
4
5
6
7
8
9
10
11
def func(n, a):
n.value = 50
for i in range(len(a)):
a[i] += 10
num = Value('d', 0.0)
ints= Array('i', range(10))
p = Process(target=func, args=(num, ints))
p.start()
p.join()

Value 和 Array 都需要设置其中存放值的类型,d 是 double 类型,i 是 int 类型,具体的对应关系在Python 标准库的 sharedctypes 模块中查看。

服务进程 Manager

上面的共享内存支持两种结构 Value 和 Array, 这些值在主进程中管理,很分散。 Python 中还有一统天下,无所不能的 Server process,专门用来做数据共享。 其支持的类型非常多,比如list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value 和 Array 用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process, Manager
def func(dct, lst):
dct[88] = 88
lst.reverse()
manager = Manager()
dct = manager.dict()
lst = manager.list(range(5,10))
p = Process(target=func, args=(dct, lst))
p.start()
p.join()
print dct, '|', lst
Out: {88: 88} | [9, 8, 7, 6, 5]

一个 Manager 对象是一个服务进程,推荐多进程程序中,数据共享就用一个 manager 管理。

进程管理

如果有50个任务要执行, 但是 CPU 只有4核, 你可以创建50个进程来做这个事情。但是大可不必,徒增管理开销。如果你只想创建4个进程,让他们轮流替你完成任务,不用自己去管理具体的进程的创建销毁,那 Pool 是非常有用的。

Pool 是进程池,进程池能够管理一定的进程,当有空闲进程时,则利用空闲进程完成任务,直到所有任务完成为止,用法如下

1
2
3
4
5
def func(x):
return x*x
pool = Pool(processes=4)
print pool.map(func, range(8))

Pool 进程池创建4个进程,不管有没有任务,都一直在进程池中等候,等到有数据的时候就开始执行。
Pool 的 API 列表如下:

  • apply(func[, args[, kwds]])
  • apply_async(func[, args[, kwds[, callback]]])
  • map(func, iterable[, chunksize])
  • map_async(func, iterable[, chunksize[, callback]])
  • imap(func, iterable[, chunksize])
  • imap_unordered(func, iterable[, chunksize])
  • close()
  • terminate()
  • join()

异步执行

apply_async 和 map_async 执行之后立即返回,然后异步返回结果。 使用方法如下

1
2
3
4
5
6
7
8
9
10
11
12
def func(x):
return x*x
def callback(x):
print x, 'in callback'
pool = Pool(processes=4)
result = pool.map_async(func, range(8), 8, callback)
print result.get(), 'in main'
Out:
[0, 1, 4, 9, 16, 25, 36, 49] in callback
[0, 1, 4, 9, 16, 25, 36, 49] in main

有两个值得提到的,一个是 callback,另外一个是 multiprocessing.pool.AsyncResult。 callback 是在结果返回之前,调用的一个函数,这个函数必须只有一个参数,它会首先接收到结果。callback 不能有耗时操作,因为它会阻塞主线程。

AsyncResult 是获取结果的对象,其 API 如下

  • get([timeout])
  • wait([timeout])
  • ready()
  • successful()

如果设置了 timeout 时间,超时会抛出 multiprocessing.TimeoutError 异常。wait 是等待执行完成。 ready 测试是否已经完成,successful 是在确定已经 ready 的情况下,如果执行中没有抛出异常,则成功,如果没有ready 就调用该函数,会得到一个 AssertionError 异常。

Pool 管理

这里不再继续讲 map 的各种变体了,因为从上面的 API 一看便知。

然后我们来看看 Pool 的执行流程,有三个阶段。第一、一个进程池接收很多任务,然后分开执行任务;第二、不再接收任务了;第三、等所有任务完成了,回家,不干了。

这就是上面的方法,close 停止接收新的任务,如果还有任务来,就会抛出异常。 join 是等待所有任务完成。 join 必须要在 close 之后调用,否则会抛出异常。terminate 非正常终止,内存不够用时,垃圾回收器调用的就是这个方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/351221.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Django 学习笔记第一课

Django web 框架介绍: MVC框架的核心思想 核心思想:解耦; 好处:可扩展性,向后兼容,低耦合,高内聚; 普通web结构框架MVC框架 M:model 主要用于数据库层次的封装; V:view…

记住要重置线程上下文类加载器

我很难思考与Java 加载有关的东西,而不是与类加载器有关的东西。 在使用应用程序服务器或OSGi的情况下尤其如此,在这些应用程序服务器或OSGi中,经常使用多个类加载器,并且透明地使用类加载器的能力降低了。 我同意OSGI Alliance B…

EntityFramework Code-First—领域类配置之DataAnnotations

本文出自:https://www.cnblogs.com/tang-tang/p/5510574.html 一、摘要 EF Code-First提供了一个可以用在领域类或其属性上的DataAnnotation特性集合,DataAnnotation特性会覆盖默认的EF约定。 DataAnnotation存在于两个命名空间里: System.Co…

Python 调试工具 PDB(Linux 环境下调试)

转载:http://blog.163.com/gjx0619126/blog/static/12740839320114995947700/ 在python中使用pdb模块可以进行调试 import pdb pdb.set_trace() 也可以使用python -m pdb mysqcript.py这样的方式 (Pdb) 会自动停在第一行,等待调试,这时你可以看看 帮助…

Ubuntu 更新源方法

安装完Ubuntu系统之后,面临的最主要的一个问题就是将apt安装源进行更新,因为在国内直接利用Ubuntu默认的安装源下载安装包速度慢,并且有的时候软件版本也比较旧。今天小编对Ubuntu更新源进行介绍:(这里针对阿里源和清华…

PAT Basic 1002

1002 写出这个数 (20 分)读入一个正整数 n,计算其各位数字之和,用汉语拼音写出和的每一位数字。 输入格式: 每个测试输入包含 1 个测试用例,即给出自然数 n 的值。这里保证 n 小于 10​100​​。 输出格式&…

mybatis crud_MyBatis教程– CRUD操作和映射关系–第2部分

mybatis crud为了说明这一点,我们正在考虑以下示例域模型: 会有用户,每个用户可能都有一个博客,每个博客可以包含零个或多个帖子。 这三个表的数据库结构如下: CREATE TABLE user (user_id int(10) unsigned NOT NU…

MATLAB 排序函数(先按第一列排序(主排序)然后再按第二列排序(次排序))

利用 sortrows 函数实现MATLAB 先按第一列排序(主排序)然后再按第二列排序(次排序) A [8,9,6;5,5,2;2,5,8] sortrows(A)A 8 9 65 5 22 5 8ans 2 5 85 5 28 9 6

用PDB库调试Python程序

Python自带的pdb库,发现用pdb来调试程序还是很方便的,当然了,什么远程调试,多线程之类,pdb是搞不定的。用pdb调试有多种方式可选:1. 命令行启动目标程序,加上-m参数,这样调用myscrip…

数据操作

mysql> create table employee(-> id int primary key auto_increment,-> emp_name char(12) not null,-> sex enum(male,female) not null default male, #大部分是男的-> age int(3) unsigned not null default 28,-> hire_date date not null,-> post …

/usr/bin/ld: cannot find -l*** 这里***可以指lapack等

在Linux安装编译过程中有时会出现在如下形式的错误: /usr/bin/ld: cannot find -l***这里表示编译过程中找不到以下库名: lib库名(即***).so会发生这样的原因有以下三种情形: 系统没有安装相对应的lib 相对应的lib版本不对 lib&#xff0…

通过分区在Kafka中实现订单保证人

Kafka最重要的功能之一是实现消息的负载平衡,并保证分布式集群中的排序,否则传统队列中将无法实现。 首先让我们尝试了解问题陈述 让我们假设我们有一个主题,其中发送消息,并且有一个消费者正在使用这些消息。 如果只有一个使用…

破解栅栏密码python脚本

今天遇到一个要破解的栅栏密码,写了个通用的脚本 1 #!/usr/bin/env python2 # -*- coding: gbk -*-3 # -*- coding: utf_8 -*-4 # Author: 蔚蓝行5 # http://www.cnblogs.com/duanv6 e raw_input(请输入要解密的字符串\n)7 elen len(e)8 field[]9 for i in range(…

水稻已知os基因号,利用DAVIA进行GO功能富集分析

第1-5步: 已知水稻的基因(os),进行功能注释 第6步 第七步: 第八步: 第九步: 第十步: 第十一步:

二,八,十,十六进制之间转换的相应方法

int num1 Integer.valueOf(n,16); //16进制转换成10进制 Integer.toHexString(Integer i); //10进制转换成16进制 补充:Integer.toHexString(Integer i);该方法得出的字符默认为小写,如果想得到大写结果,则变为Integer.toHexString(Integer i…

IDF实验室-图片里的英语

原题: 一恒河沙中有三千世界,一张图里也可以有很多东西。 不多说了,答案是这个图片包含的那句英文的所有单词的首字母。 首字母中的首字母要大写,答案格式是wctf{一坨首字母} 加油吧少年!看好你哦~ writeup&#xff…

linux 终端调用MATLAB程序

linux 终端调用MATLAB程序 路径:/A/B/C/ 程序名称:xxx.m linux 终端调用MATLAB函数方法 cd /A/B/C/ matlab -nodisplay -nosplash -nodesktop -r "xxx;exit;"

2018-11-02 在代码中进行中文命名实践的短期目标

对中文命名的意义不再赘述, 请参看之前的对在代码中使用中文命名的质疑与回应. 去年中文命名实践的阻力和应对之后, 在一些小项目中继续实践了中文命名(Java/JS/Python等, 详见之前的专栏文章), 涉及领域不少但尚未形成明确的重点项目. 发现了一些在业务相关代码使用中文命名的…

Wireshark 命令行捕获数据

在 Wireshark 程序目录中,包含两个命令行捕获工具。这两个工具分别是 Dumpcap 和 Tshark。当不能以图形界面方式捕获数据时,可以在命令行使用 dumpcap 或 tshark 程序实施捕获。 一、使用 Dumpcap 捕获数据 执行 dumpcap -h 可以查看参数详情。 1、执行 …

zk ui_高级ZK:异步UI更新和后台处理–第2部分

zk ui介绍 在第1部分中,我展示了如何在ZK应用程序中使用服务器推送和线程来执行后台任务。 但是,这个简单的示例具有一个重大缺陷,这使其对于实际应用程序而言是一种不好的方法:它为每个后台任务启动了一个新线程。 JDK5引入了E…