一般来说,大部分遇到的多线程,只要能各自完成好各自的任务即可。少数情况下,不同线程可能需要在线程安全的情况下,进行通信和数据交换。Python 中常用的线程通信有以下方法。
共享变量
共享变量是最简单的线程通信方式,比如进行计数更新等操作,但需要配合锁来保证线程安全。
import threading # 共享变量 shared_data = 0 lock = threading.Lock() def worker(): global shared_data with lock: # 使用锁保证线程安全 shared_data += 1 threads = [] for i in range(5): t = threading.Thread(target=worker) threads.append(t) t.start() for t in threads: t.join() print(f"最终结果: {shared_data}") # 应该是5Queue队列
最常用的线程安全通信方式,是生产者-消费者模式的理想选择。比如使用优先级队列优先消费高优先级的数据(序号越低,优先级越高,越优先消费)。
from time import sleep from random import random, randint from threading import Thread from queue import PriorityQueue queue = PriorityQueue() def producer(queue): print('Producer: Running') for i in range(5): # create item with priority value = random() priority = randint(0, 5) item = (priority, value) queue.put(item) # wait for all items to be processed queue.join() queue.put(None) print('Producer: Done') def consumer(queue): print('Consumer: Running') while True: # get a unit of work item = queue.get() if item is None: break sleep(item[1]) print(item) queue.task_done() print('Consumer: Done') producer = Thread(target=producer, args=(queue,)) producer.start() consumer = Thread(target=consumer, args=(queue,)) consumer.start() producer.join() consumer.join()Producer: Running Consumer: Running (0, 0.9945246262101098) (2, 0.35853829355476663) (2, 0.4794139132317813) (3, 0.8460111545035349) (5, 0.6047655828611674) Producer: Done Consumer: DoneEvent事件
线程模提供了 Event 用于线程间的简单信号传递。Event 对象管理内部标志的状态。标志初始为False,并通过 set() 方法变为 True,通过 clear() 方法重新设置为 False。wait() 方法会阻塞,直到标志变为 True。
比如下面使用 Event 通知,模拟交通信号灯周期性变化及车辆通行之间的协同运行。车辆根据信号灯的状态判断是否通行还是等待;车辆通行完毕以后,只剩下信号灯周期性变化。
from threading import * import time def signal_state(): while True: time.sleep(5) print("Traffic Police Giving GREEN Signal") event.set() time.sleep(10) print("Traffic Police Giving RED Signal") event.clear() def traffic_flow(): num = 0 while num < 10: print("Waiting for GREEN Signal") event.wait() print("GREEN Signal ... Traffic can move") while event.is_set(): num = num + 1 print("Vehicle No:", num, " Crossing the Signal") time.sleep(2) print("RED Signal ... Traffic has to wait") event = Event() t1 = Thread(target=signal_state) t2 = Thread(target=traffic_flow) t1.start() t2.start()Waiting for GREEN Signal Traffic Police Giving GREEN Signal GREEN Signal ... Traffic can move Vehicle No: 1 Crossing the Signal Vehicle No: 2 Crossing the Signal Vehicle No: 3 Crossing the Signal Vehicle No: 4 Crossing the Signal Vehicle No: 5 Crossing the Signal Traffic Police Giving RED Signal RED Signal ... Traffic has to wait Waiting for GREEN Signal Traffic Police Giving GREEN Signal GREEN Signal ... Traffic can move Vehicle No: 6 Crossing the Signal Vehicle No: 7 Crossing the Signal Vehicle No: 8 Crossing the Signal Vehicle No: 9 Crossing the Signal Vehicle No: 10 Crossing the Signal Traffic Police Giving RED Signal RED Signal ... Traffic has to wait Traffic Police Giving GREEN Signal Traffic Police Giving RED Signal Traffic Police Giving GREEN Signal Traffic Police Giving RED Signal ...Condition条件对象
线程模块中的 Condition 类实现了条件变量对象。条件对象会强制一个或多个线程等待,直到被另一个线程通知。Condition 用于更复杂的线程同步,允许线程等待特定条件。比如上面的 Event 的实现,其内部也是在使用 Condition。
import threading import time # 共享资源 buffer = [] MAX_ITEMS = 5 condition = threading.Condition() def producer(): """生产者""" for i in range(10): time.sleep(0.2) with condition: while len(buffer) >= MAX_ITEMS: print("Buffer full,wait...") condition.wait() # 等待缓冲区有空位 item = f"item-{i}" buffer.append(item) print(f"Producer: {item}, Buffer: {len(buffer)}") condition.notify_all() # 通知消费者 def consumer(): """消费者""" for i in range(10): time.sleep(0.8) with condition: while len(buffer) == 0: print("Buffer empty,wait...") condition.wait() # 等待缓冲区有数据 item = buffer.pop(0) print(f"Consumer: {item}, Buffer: {len(buffer)}") condition.notify_all() # 通知生产者 # 创建线程 prod = threading.Thread(target=producer) cons = threading.Thread(target=consumer) prod.start() cons.start() prod.join() cons.join()Producer: item-0, Buffer: 1 Producer: item-1, Buffer: 2 Producer: item-2, Buffer: 3 Consumer: item-0, Buffer: 2 Producer: item-3, Buffer: 3 Producer: item-4, Buffer: 4 Producer: item-5, Buffer: 5 Buffer full,wait... Consumer: item-1, Buffer: 4 Producer: item-6, Buffer: 5 Buffer full,wait... Consumer: item-2, Buffer: 4 Producer: item-7, Buffer: 5 Buffer full,wait... Consumer: item-3, Buffer: 4 Producer: item-8, Buffer: 5 Buffer full,wait... Consumer: item-4, Buffer: 4 Producer: item-9, Buffer: 5 Consumer: item-5, Buffer: 4 Consumer: item-6, Buffer: 3 Consumer: item-7, Buffer: 2 Consumer: item-8, Buffer: 1 Consumer: item-9, Buffer: 0Semaphore信号量
Semaphore 信号量控制对共享资源的访问数量。信号量的基本概念是使用一个内部计数器,每个 acquire() 调用将其递减,每个 release() 调用将其递增。计数器永远不能低于零;当 acquire() 发现计数器为零时,它会阻塞,直到某个其他线程调用 release()。当然,从源码看,信号量也是通过 Condition 条件对象来进行实现的。
import threading import time # 信号量,限制最多3个线程同时访问 semaphore = threading.Semaphore(3) def access_resource(thread_id): """访问共享资源""" with semaphore: print(f"Thread {thread_id} acquire\n", end="") time.sleep(2) # 模拟资源访问 print(f"Thread {thread_id} release\n", end="") # 创建10个线程 threads = [] for i in range(10): t = threading.Thread(target=access_resource, args=(i,)) threads.append(t) t.start() for t in threads: t.join()Thread 0 acquire Thread 1 acquire Thread 2 acquire Thread 0 release Thread 3 acquire Thread 1 release Thread 2 release Thread 4 acquire Thread 5 acquire Thread 3 release Thread 6 acquire Thread 4 release Thread 7 acquire Thread 5 release Thread 8 acquire Thread 6 release Thread 9 acquire Thread 7 release Thread 8 release Thread 9 releaseBarrier屏障
Barrier 使多个线程等待,直到指定数目的线程都到达某个点,这些线程才会被同时唤醒,然后继续往后执行(需要注意的是:如果没有设置 timeout,且总的线程数无法整除给定的线程数 parties 时,会导致线程阻塞,形成死锁)。
import threading import time # 创建屏障,等待3个线程(注意:如果总的线程数无法整除3,则会导致线程阻塞) barrier = threading.Barrier(3) def worker(worker_id): """工作线程""" print(f"Worker {worker_id} start") time.sleep(worker_id) # 模拟不同工作速度 print(f"Worker {worker_id} arrive") barrier.wait() # 等待所有线程到达 print(f"Worker {worker_id} continue") # 创建3个线程 threads = [] for i in range(6): t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start() for t in threads: t.join()Worker 0 start Worker 0 arrive Worker 1 start Worker 2 start Worker 3 start Worker 4 start Worker 5 start Worker 1 arrive Worker 2 arrive Worker 2 continue Worker 0 continue Worker 1 continue Worker 3 arrive Worker 4 arrive Worker 5 arrive Worker 5 continue Worker 3 continue Worker 4 continue不管以什么样的方式进行线程通信,最重要的当属线程安全,线程通信的各种设计,也是建立在通过锁的机制保证线程安全的情况下来实现各种功能的。