共享内存是自python3.8开始引入的新功能。
1. 共享内存 – multiprocessing.shared_memory
multiprocessing.shared_memory
— 可跨进程直接访问的共享内存,是从python3.8 开始引入的功能。
该模块提供了一个 SharedMemory
类,用于分配和管理多核或对称多处理器(SMP)机器上进程间的共享内存。为了协助管理不同进程间的共享内存生命周期,multiprocessing.managers
模块也提供了一个 BaseManager
的子类: SharedMemoryManager
。
本模块中,共享内存是指 “System V 类型” 的共享内存块(虽然可能和它实现方式不完全一致)而不是 “分布式共享内存”。这种类型的的共享内存允许不同进程读写一片公共(或者共享)的易失性存储区域。一般来说,进程被限制只能访问属于自己进程空间的内存,但是共享内存允许跨进程共享数据,从而避免通过进程间发送消息的形式传递数据。相比通过磁盘、套接字或者其他要求序列化、反序列化和复制数据的共享形式,直接通过内存共享数据拥有更出色性能。
class multiprocessing.shared_memory.SharedMemory
(name=None, create=False, size=0)
创建一个新的共享内存块或者连接到一片已经存在的共享内存块。每个共享内存块都被指定了一个全局唯一的名称。通过这种方式,一个进程可以通过提供一个特定的名字创建一个共享内存区块,然后其他进程使用同样的名字连接到这个共享内存块。
作为一种跨进程共享数据的方式,共享内存块的寿命可能超过创建它的原始进程。一个共享内存块可能同时被多个进程使用,当一个进程不再需要访问这个共享内存块的时候,应该调用 close()
方法。当一个共享内存块不被任何进程使用的时候,应该调用 unlink()
方法以执行必要的清理。
name 是共享内存的唯一名称,字符串类型。如果创建一个新共享内存块的时候,名称指定为 None
(默认值),将会随机产生一个新名称。
create 指定创建一个新的共享内存块 (True
) 还是连接到已存在的共享内存块 (False
) 。
如果是新创建共享内存块则 size 用于指定块的大小为多少字节。由于某些平台是使用特定内存页大小为最小单位来分配的,最终得到的内存块大小可能大于或等于要求的大小。如果是连接到已经存在的共享内存块, size
参数会被忽略。
-
close
()关闭实例对于共享内存的访问连接。所有实例确认自己不再需要使用共享内存的时候都应该调用
close()
,以保证必要的资源清理。调用close()
并不会销毁共享内存区域。 -
unlink
()请求销毁底层的共享内存块。 为了执行必要的资源清理,在所有使用这个共享内存块的进程中,
unlink()
应该调用一次(且只能调用一次)。 发出此销毁请求后,共享内存块可能会、也可能不会立即销毁,且此行为在不同操作系统之间可能不同。 调用unlink()
后再尝试访问其中的数据可能导致内存错误。 注意:最后一个关闭共享内存访问权限的进程可以以任意顺序调用unlink()
和close()
。 -
buf
共享内存块内容的 memoryview 。
-
name
共享内存块的唯一标识,只读属性。
-
size
共享内存块的字节大小,只读属性。
示例1:展示了 SharedMemory
的基础用法
from multiprocessing import shared_memory
import arrayif __name__ == '__main__':shm_a = shared_memory.SharedMemory(create=True, size=40)print('type: ', type(shm_a))print('name: ', shm_a.name)print('size: ', shm_a.size)buffer = shm_a.bufbuffer[:4] = bytearray([22, 33, 44, 55]) # Modify multiple at onceprint('buffer_a: ', [i for i in bytes(buffer[:4])])buffer[4] = 100 # Modify single byte at a timeprint('buffer_a: ', [i for i in bytes(buffer[:5])])shm_b = shared_memory.SharedMemory(shm_a.name)shm_b_array = array.array('b', shm_b.buf[:5]) # Copy the data into a new array.arrayprint('shm_b_array: ', shm_b_array)shm_b.buf[:5] = b'howdy' # Modify via shm_b using bytesprint('buffer_a: ', bytes(shm_b.buf[:5])) # Access via shm_ashm_b.close() # Close each SharedMemory instanceshm_a.close()shm_a.unlink() # Call unlink only once to release the shared memory
输出:
type: <class 'multiprocessing.shared_memory.SharedMemory'>
name: wnsm_97359521
size: 40
buffer_a: [22, 33, 44, 55]
buffer_a: [22, 33, 44, 55, 100]
shm_b_array: array('b', [22, 33, 44, 55, 100])
buffer_a: b'howdy'
示例2:展示了 SharedMemory
类结合NumPy
数组的读写示例
import multiprocessing
import time
from multiprocessing import shared_memory
import numpy as npclass ChildProcess(multiprocessing.Process):def __init__(self, shm_name=None, shape=None, dtype=None):super().__init__()self._shm_name = shm_nameself._shape = shapeself._dtype = dtypedef run(self) -> None:existing_shm = shared_memory.SharedMemory(name=self._shm_name, create=False) # Attach to the existing shared memory blockprint(existing_shm.name)c = np.ndarray(shape=self._shape, dtype=self._dtype, buffer=existing_shm.buf)print('c0: ', c)c[-1] = 888print('c1: ', c)del cexisting_shm.close()if __name__ == '__main__':a = np.array([1, 1, 2, 3, 5, 6]) # Start with an existing NumPy arrayprint('a: ', a)print('a.shape: ', a.shape)print('a type: ', type(a))shm = shared_memory.SharedMemory(name=None, create=True, size=a.nbytes)b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) # Now create a NumPy array backed by shared memoryprint('b0: ', b)b[:] = a[:] # Copy the original data into shared memoryprint('b1: ', b)print('shm name: ', shm.name)child_process = ChildProcess(shm_name=shm.name, shape=b.shape, dtype=b.dtype)child_process.start()time.sleep(2)print('b2: ', b)shm.close()shm.unlink() # Free and release the shared memory block at the very end
输出:
a: [1 1 2 3 5 6]
a.shape: (6,)
a type: <class 'numpy.ndarray'>
b0: [0 0 0 0 0 0]
b1: [1 1 2 3 5 6]
shm name: wnsm_cb62400b
wnsm_cb62400b
c0: [1 1 2 3 5 6]
c1: [ 1 1 2 3 5 888]
b2: [ 1 1 2 3 5 888]
示例3:展示了 SharedMemory
类结合NumPy
图像的示例
from multiprocessing import shared_memory
import numpy as np
import cv2if __name__ == '__main__':image = cv2.imread('1.jpg')print(type(image))print(image.shape)image_shape = image.shapeimage_dtype = image.dtypeshm = shared_memory.SharedMemory(name=None, create=True, size=2448*2048*3*1) # 图像像素2448x2048, RGB 3通道shm_ndarray = np.ndarray(shape=image_shape, dtype=image_dtype, buffer=shm.buf)shm_ndarray[:] = imageexisting_shm = shared_memory.SharedMemory(name=shm.name, create=False)c = np.ndarray(shape=image_shape, dtype=image_dtype, buffer=existing_shm.buf)cv2.imwrite('2.jpg', c)shm.close()existing_shm.close()existing_shm.unlink()
2. 共享内存队列
Github项目地址: https://hub.nuaa.cf/soloist-v/SharedMemoryQueue/tree/main 使用其main分支。
(不要使用Gitee上的,Gitee上的没有更新,运行有问题)
示例:使用共享内存队列传输图像
示例连接,https://download.csdn.net/download/WonderThink/88792215
from shared_memory_queue.shared_memory_record import SharedMemoryRecorder
from shared_memory_queue.kfifo_queue import Queue
import multiprocessing
import os
import cv2def producer(shm_queue: Queue):print('producer start.')for i in range(0, len(os.listdir('./img/'))):image_path = './img/' + str(i) + '.jpg'if os.path.exists(image_path):image_ndarray = cv2.imread(image_path)print(f'i: {i}, shape: {image_ndarray.shape}')shm_queue.put(item=image_ndarray, block=True, timeout=1)print('producer end.')def consumer(shm_queue: Queue, shape=None):print('consumer start.')i = 0while True:try:data = shm_queue.get(block=True, timeout=2)except:breakimage = data.reshape(shape)new_image_dir = './img2/'if not os.path.exists(new_image_dir):os.makedirs(new_image_dir)cv2.imwrite(os.path.join(new_image_dir, str(i)+'.jpg'), image)i += 1print('consumer end.')if __name__ == '__main__':image_shape = (2048, 2448, 3) # numpy shape: 行数、列数、RGB三通道数shm_queue = Queue(buffer_size=image_shape[0] * image_shape[1] * image_shape[2] * 10)print(shm_queue.sm.get_sm_name())producer_process = multiprocessing.Process(target=producer, args=(shm_queue,))producer_process.start()consumer_process = multiprocessing.Process(target=consumer, args=(shm_queue, image_shape,))consumer_process.start()producer_process.join()consumer_process.join()shm_queue.sm.close()# SharedMemoryRecorder.release_last_sm()