多线程多进程解析:Python、os、sys、Queue、multiprocessing、threading

当涉及到操作系统的时候,免不了要使用os模块,有时还要用到sys模块。

设计到并行程序,一般开单独的进程,而不是线程,原因是python解释器的全局解释器锁GIL(global interpreter lock),本文最后会讲到。使用进程可以实现完全并行,无GIL的限制,可充分利用多cpu多核的环境。

 

os/sys模块

1、os模块

os.system() 函数可以启动一个进程,执行完之后返回状态码。

os.fork() 复制一个进程,如果是子进程返回0,如果是父进程返回子进程的pid,使用这个函数的时候,建议你学习一下linux编程的知识。

os.popen 以管道的方式创建进程。

os.spawnl 也可以创建进程,并能指定环境变量。

os.kill(pid, sig) 关闭一个进程,pid是进程号,sig是信号。与fork配合使用,例如你刚才用fork创建了一个子进程,它的pid是11990, 那么调用

os.kill( 11990, signal.CTRL_BREAK_EVENT)

就以ctrl+c的方式杀死了这个进程。

os.wait() -> (pid, status)找到任一个僵死子进程,或者等待任一个子进程的SIGCHLD信号

os.waitpid(pid, options) -> (pid, status) 等待给定进程结束

 

除了利用os模块实现多进程和通信,还有一个模块multiprocessing封装了很多创建进程和进程间通信的操作,发挥多核的威力。

附:

文件操作也是与操作系统相关的操作,也被封装在os模块。

文件操作的详细内容见  http://www.cnblogs.com/xinchrome/p/5011304.html

 

2、sys模块

同样是一个与系统相关的模块,它们都表示了程序运行的上下文环境。但是与os模块不同的是,os模块主要封装系统操作,sys模块主要封装系统中的各种环境参数。

比如文件操作、进程线程操作封装在os模块中;

标准输入输出stdout/stdin、命令行参数argv、环境变量path、平台platform等参数封装在sys模块中;

不过sys中也含有一些进程操作,比如sys.exit(n)和sys.exit('Unable to create first child.')

 

多进程multiprocessing

 multiprocessing模块的内容:

multiprocessing.Process(target=run)类的实例表示一个进程,具有字段 pid,方法 start() join()等

multiprocessing.Pool(processes=4) 类的实例表示一个进程池

multiprocessing.Lock类的实例表示一个锁,具有acquire()和release() 方法

multiprocessing.Semaphore(2) 信号量类的实例表示一个信号量,可以指定初始值,具有 acquire() 和 release() 方法

multiprocessing.Event() 表示一个信号,用于实现多进程等待某一个进程的情况

进程间要实现通信,除了锁、信号量、事件,还有队列multiprocessing.Queue。

 

复制代码
import multiprocessingdef writer_proc(q):      try:         q.put(1, block = False) except:         pass   def reader_proc(q):      try:         print q.get(block = False) except:         passif __name__ == "__main__":q = multiprocessing.Queue()writer = multiprocessing.Process(target=writer_proc, args=(q,))  writer.start()   reader = multiprocessing.Process(target=reader_proc, args=(q,))  reader.start()  reader.join()  writer.join()
复制代码

 

multiprocessing.Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。

 

 

进程同步互斥实例:

 

复制代码
import multiprocessing
import time# 信号量实现同步,进程间也可以使用信号量
def preq(s):print "a"s.release()s.release()def worker(s,i):s.acquire()s.acquire()print(multiprocessing.current_process().name + " acquire")time.sleep(i)print(multiprocessing.current_process().name + " release")s.release()
if __name__ == "__main__":s = multiprocessing.Semaphore(0)for i in range(1):pre = multiprocessing.Process(target=preq, args=(s,))pre.start()p = multiprocessing.Process(target=worker, args=(s,1))p.start()# 锁实现进程间对文件的互斥访问
def worker_with(lock, f):with lock:fs = open(f,"a+")fs.write('Lock acquired via with\n')fs.close()def worker_no_with(lock, f):lock.acquire()try:fs = open(f,"a+")fs.write('Lock acquired directly\n')fs.close()finally:lock.release()
if __name__ == "__main__":f = "file.txt"lock = multiprocessing.Lock()w = multiprocessing.Process(target=worker_with, args=(lock, f))nw = multiprocessing.Process(target=worker_no_with, args=(lock, f))w.start()nw.start()w.join()nw.join()# Event实现同步,Event类的实例表示一个信号,进程调用它的wait方法等待被唤醒,调用set方法唤醒所有正在等待的进程
def wait_for_event(e):"""Wait for the event to be set before doing anything"""print ('wait_for_event: starting')e.wait()print ('wait_for_event: e.is_set()->' + str(e.is_set()))
def wait_for_event_timeout(e, t):"""Wait t seconds and then timeout"""print ('wait_for_event_timeout: starting')e.wait(t)print ('wait_for_event_timeout: e.is_set()->' + str(e.is_set()))if __name__ == '__main__':e = multiprocessing.Event()w1 = multiprocessing.Process(name='block', target=wait_for_event,args=(e,))w1.start()w2 = multiprocessing.Process(name='non-block', target=wait_for_event_timeout, args=(e, 2))w2.start()time.sleep(3)e.set()print ('main: event is set')
复制代码

 

Process类中定义的方法

| is_alive(self)
| Return whether process is alive
|
| join(self, timeout=None)
| Wait until child process terminates
|
| run(self)
| Method to be run in sub-process; can be overridden in sub-class
|
| start(self)
| Start child process
|
| terminate(self)
| Terminate process; sends SIGTERM signal or uses TerminateProcess()

以上来自于Python自带帮助

 

进程处理信号 signal

 利用signal模块,进程可以捕获信号,根据相应的handler做处理。

信号(signal)-- 进程之间通讯的方式,是一种软件中断。一个进程一旦接收到信号就会打断原来的程序执行流程来处理信号。
几个常用信号:
    SIGINT 终止进程 中断进程 (control+c)

    SIGQUIT 退出进程

    SIGTERM 终止进程: 软件终止信号 (命令行中输入kill命令时,向进程发送的默认信号) 当直接写kill PID,默认是向进程发送SIGTERM

    SIGKILL 终止进程:杀死进程,捕捉这个信号会报错,也就是进程不能捕捉此信号(kill -9)
    SIGALRM 闹钟信号。Alarms信号是一个特殊信号类型,它可以让程序要求系统经过一段时间对自己发送通知。os 标准模块中指出,它可用于避免无限制阻塞 I/O 操作或其它系统调用。

    SIGCHLD 子进程退出时对父进程发出的信号,如果父进程还没有处理它,子进程将会停留在僵死状态等待其父进程调用wait函数,这个状态下的子进程就是僵死进程

PS:常用信号简介:

ctrl-c 发送 SIGINT 信号给前台进程组中的所有进程。常用于终止正在运行的程序。
ctrl-z 发送 SIGTSTP 信号给前台进程组中的所有进程,常用于挂起一个进程。
ctrl-d 不是发送信号,而是表示一个特殊的二进制值,表示 EOF,也就是输入流(例如普通文件或者stdin)的结束。
ctrl-\ 发送 SIGQUIT 信号给前台进程组中的所有进程,终止前台进程并生成 core 文件。

 

常常会在python程序被关闭之前加一个钩子,用atexit模块以及signal模块来实现

父进程捕获终止信号后,还需要向每个子进程发送终止信号。

注意信号处理函数需要接受两个参数。

 

复制代码
#!/usr/bin python# 正常退出或者被ctl+c终止时,进程捕获信号,调用处理函数(钩子)
import atexit
from signal import signal, SIGTERMdef test():print 'exit........'
atexit.register(test)
signal(SIGTERM, lambda signum, stack_frame: exit(1))while True:pass# 进程发送信号终止其他进程
import os  
import signal  #发送信号,16175是前面那个绑定信号处理函数的pid,需要自行修改  
os.kill(16175,signal.SIGTERM)  
os.kill(16175,signal.SIGUSR1) # Linux编程范式:fork(),等待SIGCHLD信号
import os  
import signal  
from time import sleep  def onsigchld(a,b):  print '收到子进程结束信号'  
signal.signal(signal.SIGCHLD,onsigchld)  pid = os.fork()  
if pid == 0:  print '我是子进程,pid是',os.getpid()  sleep(2)  
else:  print '我是父进程,pid是',os.getpid()  os.wait()  #等待子进程结束  # 闹钟信号,用于告诉操作系统向自己发送信号
import signal
import timedef receive_alarm(signum, stack):print 'Alarm :', time.ctime()# Call receive_alarm in 2 seconds
signal.signal(signal.SIGALRM, receive_alarm)
signal.alarm(2)print 'Before:', time.ctime()
time.sleep(10)
print 'After :', time.ctime() 
复制代码

 

 

多线程threading & thread

  与进程不同,线程要实现同步,直接用Python自带的Queue模块即可。Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

     python多线程编程,一般使用thread和threading模块。thread模块想对较底层,threading模块对thread模块进行了封装,更便于使用。所有,通常多线程编程使用threading模块。线程的创建一般有两种:①将创建的函数传递进threading.Thread()对象的target字段,可以是函数或者定义了__call__方法的类实例。②继承threading.Thread类,通常重写run()方法。

1 threading模块的内容

Thread类的实例可以引用一个线程,这是我们用的最多的一个类,你可以指定目标线程函数执行或者自定义继承自它的子类都可以实现子线程功能;

Timer类是Thread类的子类,表示等待一段时间后才开始运行某段代码,或者重复运行某段代码;

(Python自带的)Queue类的实例是实现了多生产者(Producer)、多消费者(Consumer)的队列,支持锁原语,能够在多个线程之间提供很好的同步支持;

Lock类的实例引用了一个锁原语,这个我们可以对全局变量互斥时使用,提供acquire和release方法;

RLock的实例表示可重入锁,使单线程可以再次获得已经获得的锁;

Condition类的实例表示条件变量,能让一个线程停下来,等待其他线程满足某个“条件”,除了提供acquire和release方法外,还提供了wait和notify方法,相当于一个多功能信号量;

Event 类的实例表示通用的条件变量。多个线程可以等待某个事件发生,在事件发生后,所有的线程都被激活;

Semaphore类的实例表示信号量,为等待资源的线程提供一个类似队列的结构,提供acquire和release方法,初始化的时候可以指定初值Semaphore(3);

BoundedSemaphore 与semaphore类似,但不允许超过初始值;

 

 threading.Thread类的内容:

getName(self) 返回线程的名字

isAlive(self) 布尔标志,表示这个线程是否还在运行中

isDaemon(self) 返回线程的daemon标志

join(self, timeout=None) 程序挂起,直到线程结束,如果给出timeout,则最多阻塞timeout秒

run(self) 定义线程的功能函数

setDaemon(self, daemonic) 把线程的daemon标志设为daemonic

setName(self, name) 设置线程的名字

start(self) 开始线程执行

ps:th.join()方法可能不是很安全,如果th对应的线程没有被真正启动,那么调用th.join()的线程将不会等待,而会继续运行下去。用信号量更好。

 

多线程举例:

复制代码
#coding:utf-8
import threading, time #最简单的启动线程的方式
def sayHi(): time.sleep(1) print 'Hi, linuxapp' th=threading.Thread(target=sayHi) 
th.start() 
th.join() # 使用threading.Thread(),设置线程类实例的target属性,表示一个线程
def T():print threading.current_thread().getName()t1 = threading.Thread(target=T, name='tt11')
t1.start()
t1.join()# 通过threading.Thread类的子类实例表示线程,注意父类构造方法__init__不能省略
class T2(threading.Thread):def __init__(self):threading.Thread.__init__(self)def run(self):print "in run() of T2  " + threading.current_thread().getName()# threading.Lock类的实例表示一个互斥锁,一个资源被加锁后,其他线程不能访问
class T3(threading.Thread):def __init__(self):threading.Thread.__init__(self)self.counter = 0;self.mutex = threading.Lock()def run(self):time.sleep(1)if self.mutex.acquire():self.counter += 1print self.counterself.mutex.release()# 如果同一个线程需要多次获得资源,如果不使用 mutex = threading.RLock() ,就会死锁
class T4(threading.Thread):def __init__(self):threading.Thread.__init__(self)self.counter = 0self.mutex = threading.Lock()def run(self):time.sleep(1)if self.mutex.acquire():self.counter += 1if self.mutex.acquire():self.counter += 1self.mutex.release()self.mutex.release()def main():t = T3()t.start()if __name__ == '__main__':main()# threading.Condition类的实例表示一个条件变量,相当于多功能信号量
condition = threading.Condition()  
products = 0  class Producer(threading.Thread):  def __init__(self):  threading.Thread.__init__(self)  def run(self):  global condition, products  while True:  if condition.acquire():  if products < 10:  products += 1;  print "Producer(%s):deliver one, now products:%s" %(self.name, products)  condition.notify()  else:  print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)  condition.wait();  condition.release()  time.sleep(2)  class Consumer(threading.Thread):  def __init__(self):  threading.Thread.__init__(self)  def run(self):  global condition, products  while True:  if condition.acquire():  if products > 1:  products -= 1  print "Consumer(%s):consume one, now products:%s" %(self.name, products)  condition.notify()  else:  print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)  condition.wait();  condition.release()  time.sleep(2)  if __name__ == "__main__":  for p in range(0, 2):  p = Producer()  p.start()  for c in range(0, 10):  c = Consumer()  c.start()# threading.Event类的实例表示一个信号,如果信号signal为true,那么等待这个signal的所有线程都将可以运行
class MyThread(threading.Thread):  def __init__(self, signal):  threading.Thread.__init__(self) self.singal = signal  def run(self):  print "I am %s,I will sleep ..."%self.name  # 进入等待状态
        self.singal.wait()  print "I am %s, I awake..." %self.name  if __name__ == "__main__":# 初始 为 False singal = threading.Event()  for t in range(0, 3):  thread = MyThread(singal)  thread.start()  print "main thread sleep 3 seconds... "  time.sleep(3)  # 唤醒含有signal, 处于等待状态的线程  singal.set()
复制代码

 

python多线程的限制
python多线程有个讨厌的限制,全局解释器锁(global interpreter lock),这个锁的意思是任一时间只能有一个线程使用解释器,跟单cpu跑多个程序也是一个意思,大家都是轮着用的,这叫“并发”,不是“并行”。手册上的解释是为了保证对象模型的正确性!这个锁造成的困扰是如果有一个计算密集型的线程占着cpu,其他的线程都得等着,试想你的多个线程中有这么一个线程,多线程生生被搞成串行;当然这个模块也不是毫无用处,手册上又说了:当用于IO密集型任务时,IO期间线程会释放解释器,这样别的线程就有机会使用解释器了!所以是否使用这个模块需要考虑面对的任务类型。

 

转载自:http://www.cnblogs.com/xinchrome/p/5031497.html


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/350950.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

练习1

方法一&#xff1a; 统计在一个队列中的数字&#xff0c;有多少个正数&#xff0c;多少个负数&#xff0c;如[1, 3, 5, 7, 0, -1, -9, -4, -5, 8] lists [1, 3, 5, 7, 0, -1, -9, -4, -5, 8]positive 0 negative 0for number in lists:if number > 0:positive 1elif nu…

python sort()、sorted()

python sort、sorted排序 这篇文章主要介绍了python sort、sorted高级排序技巧,本文讲解了基础排序、升序和降序、排序的稳定性和复杂排序、cmp函数排序法等内容. python list内置sort()方法用来排序&#xff0c;也可以用python内置的全局sorted()方法来对可迭代的序列排…

电脑内存占用莫名很高_CPU占用高,电脑莫名卡顿?万能的重启拯救不了就用这3招,妥了!...

大家还记得windows 10 1903推送时发生的大翻车事件吗&#xff1f;那次的更新主要是修复早期版本的Visual Basic 6、VBA和VBScript无反应、远端桌面出现当机黑屏幕等问题&#xff0c;但win10的更新总是“捡了芝麻&#xff0c;丢了西瓜”&#xff0c;解决早期问题而又出现新的问题…

5. 多线程程序如何让 IO 和“计算”相互重叠,降低 latency?

基本思路是&#xff0c;把 IO 操作&#xff08;通常是写操作&#xff09;通过 BlockingQueue 交给别的线程去做&#xff0c;自己不必等待。 例1: logging 在多线程服务器程序中&#xff0c;日志 (logging) 至关重要&#xff0c;本例仅考虑写 log file 的情况&#xff0c;不考…

tomcat web应用_具有可执行Tomcat的独立Web应用程序

tomcat web应用在部署应用程序时&#xff0c;简单性是最大的优势。 您将了解到&#xff0c;尤其是在项目发展且需要在环境中进行某些更改时。 将您的整个应用程序打包到一个独立且自足的JAR中似乎是个好主意&#xff0c;特别是与在目标环境中安装和升级Tomcat相比。 过去&#…

anaconda在ubuntu中添加环境变量

在ubuntu上安装好anaconda后&#xff0c;如果输入conda命令报错&#xff0c;很有可能需要以下修改注册表&#xff08;相当于windows中将路径添加到系统环境变量&#xff09; ~ /anaconda3/bin为.Sh所在home目录路径 在终端输入&#xff1a;sudo gedit ~/.bashrc 打开注册表后…

webpackjsonp 还原_具有催化CO2还原性能的非贵金属配合物的配体设计

Non-noble metal-based molecular complexes for CO2 reduction: From the ligand design perspectiveDong-Cheng Liu, Di-Chang Zhong, Tong-Bu LuEneryChem, 2, 100034 (2020).DOI: https://doi.org/10.1016/j.enchem.2020.100034全文链接https://www.sciencedirect.com/jour…

【数据库系统概论】第3章-关系数据库标准语言SQL(1)

文章目录 3.1 SQL概述3.2 学生-课程数据库3.3 数据定义3.3.1 数据库定义3.3.2 模式的定义3.3.3 基本表的定义3.3.4 索引的建立与删除3.3.5 数据字典 3.1 SQL概述 动词 分类 三级模式 3.2 学生-课程数据库 3.3 数据定义 3.3.1 数据库定义 创建数据库 tips&#xff1a;[ ]表…

适用于Java开发人员的Elasticsearch教程

课程大纲 Elasticsearch是基于Lucene的搜索引擎。 它提供了具有HTTP Web界面和无模式JSON文档的分布式多租户全文搜索引擎。 Elasticsearch是用Java开发的&#xff0c;并根据Apache许可的条款作为开源发布。 Elasticsearch是最受欢迎的企业搜索引擎&#xff0c;紧随其后的也是基…

shell实战之tomcat看门狗

1、脚本简介 tomcat看门狗&#xff0c;在tomcat进程异常退出时会自动拉起tomcat进程并记录tomcat运行的日志。 1 函数说明&#xff1a; 2 log_info&#xff1a;打印日志的函数&#xff0c;入参为需要在日志中打印的msg 3 start_tom&#xff1a;启动tomcat的函数…

tensorflow tf.train.batch()

tf.train.batch([example, label],batch_sizebatch_size, capacitycapacity) [example, label]表示样本和样本标签&#xff0c;这个可以是一个样本和一个样本标签&#xff0c;batch_size是返回的一个batch样本集的样本个数。capacity是队列中的容量。这主要是按顺序组合成一个b…

苹果6s上市时间_iPhone7的A10处理器还能战多长时间?2-3年不成问题!

iPhone 7采用A10 Fusion处理器&#xff0c;简称A10处理器&#xff0c;在2018年依然是处于高端处理器&#xff0c;再加上苹果自己的系统优化和资源调度&#xff0c;流畅度甚至超过其他安卓835机子。16年上市的iPhone7的A10还能再战多长时间&#xff1f;小编今天来分析一下。A10处…

tf.summary.FileWriter

ummary_waiter tf.summary.FileWriter("log",tf.get_default_graph()) log是事件文件所在的目录&#xff0c;这里是工程目录下的log目录。第二个参数是事件文件要记录的图&#xff0c;也就是tensorflow默认的图。

83998 连接服务器出错_服务端 TCP 连接的 TIME_WAIT 问题分析与解决

民工哥技术之路 写在开头&#xff0c;大概 4 年前&#xff0c;听到运维同学提到 TIME_WAIT 状态的 TCP 连接过多的问题&#xff0c;但是当时没有去细琢磨&#xff1b;最近又听人说起&#xff0c;是一个新手进行压测过程中&#xff0c;遇到的问题&#xff0c;因此&#xff0c;花…

SqlServer 时间格式化

select GETDATE() as 当前日期, DateName(year,GetDate()) as 年,DateName(month,GetDate()) as 月,DateName(day,GetDate()) as 日,DateName(dw,GetDate()) as 星期,DateName(week,GetDate()) as 周数,DateName(hour,GetDate()) as 时,DateName(minute,GetDate()) as 分,DateN…

[EBOOK]十大Java性能问题

有兴趣了解更多吗&#xff1f; 然后&#xff0c;您应该在此处下载相关的电子书。 Java中的大多数性能问题都可以归因于一些根本原因。 当然&#xff0c;偶尔会有一些奇怪的极端情况突然出现&#xff0c;并在应用程序中造成了严重破坏&#xff0c;但是在大多数情况下&#xff0…

请上传sku预览图后重新操作_拼多多商家版APP新增商品操作步骤

① 点击右下角“添加商品”按钮&#xff0c;进入创建商品页面&#xff1b;② 快速创建商品&#xff1a;目前手机版支持快速上传商品啦&#xff1a;仅通过上传商品标题、商品轮播图、商品分类、价格和库存&#xff0c;点击创建按钮&#xff0c;即可快速上传您的第一件店…

消息队列概述[幻灯片]

昨天我发表了一个演讲&#xff0c;涉及使用消息队列的所有方面。 我以前曾写过“您可能不需要消息队列” –现在的结论有些细微差别&#xff0c;但我仍然坚持简单性的观点。 演讲探讨了使用消息队列的各种好处和用例&#xff0c;并讨论了典型“消息队列代理”体系结构的替代方…

tf.reshape()

_image tf.reshape(x, [-1,28, 28, 1]) # -1表示任意数量的样本数,大小为28x28深度为一的张量 # 可以忽略(其实是用深度为28的,28x1的张量,来表示28x28深度为1的张量)

面向对象进阶-反射(二)重要知识点

# 面向对象的进阶# 其他常用模块# 作业 考试题# 网络编程 2天# ftp作业# class A:pass# class B(A):pass# a A()# print(isinstance(a,A))返回true&#xff0c;判断a是不是A的对象# print(issubclass(B,A))返回true&#xff0c;判断B是不是A的子类# print(issubclass(A,B))》…