从零开始实现简易版Netty(九) MyNetty 实现池化内存的线程本地缓存

从零开始实现简易版Netty(九) MyNetty 实现池化内存的线程本地缓存

从零开始实现简易版Netty(九) MyNetty 实现池化内存的线程本地缓存

1. Netty 池化内存线程本地缓存介绍

在上一篇博客中,截止lab8版本MyNetty已经实现了Normal和Small规格的池化内存分配。按照计划,在lab9中MyNetty将实现池化内存的线程本地缓存功能,以完成池化内存功能的最后一块拼图。
由于本文属于系列博客,读者需要对之前的博客内容有所了解才能更好地理解本文内容。

  • lab1版本博客:从零开始实现简易版Netty(一) MyNetty Reactor模式
  • lab2版本博客:从零开始实现简易版Netty(二) MyNetty pipeline流水线
  • lab3版本博客:从零开始实现简易版Netty(三) MyNetty 高效的数据读取实现
  • lab4版本博客:从零开始实现简易版Netty(四) MyNetty 高效的数据写出实现
  • lab5版本博客:从零开始实现简易版Netty(五) MyNetty FastThreadLocal实现
  • lab6版本博客:从零开始实现简易版Netty(六) MyNetty ByteBuf实现
  • lab7版本博客:从零开始实现简易版Netty(七) MyNetty 实现Normal规格的池化内存分配
  • lab8版本博客:从零开始实现简易版Netty(八) MyNetty 实现Small规格的池化内存分配

在lab7、lab8的实现中可以发现,出于空间利用率的考虑,一个PoolArena会同时被多个线程并发访问。因此无论是Normal还是Small规格的池化内存分配,Netty在进行实际的池化内存分配时都或多或少的需要使用互斥锁来确保用于追踪池化内存状态的元数据PoolArena、PoolSubPage等不会被并发的更新而出现问题。
jemalloc的论文中提到,内存分配作为一个高频的操作需要尽可能的减少线程的同步竞争以提高效率,大量线程都阻塞在同步锁上会大大降低内存分配的整体吞吐率,通过引入线程本地缓存可以显著减少同步竞争的频率。
"The main goal of thread caches is to reduce the volume of synchronization events."
"Minimize lock contention. jemalloc's independent arenas were inspired by lkmalloc, but as time went on, tcmalloc made it abundantly clear that it's even better to avoid synchronization altogether, so jemalloc also implements thread-specific caching."

引入线程本地缓存后,当前线程在释放池化内存时,不会直接将空闲的池化内存对象还给公共的PoolArena中,而是优先尝试放入独属于本线程的本地缓存中。同时,在尝试申请池化内存分配时,也会优先查询线程本地缓存中是否存在对应规格的可用池化内存段,如果有则直接使用,而无需通过公共的PoolArena获取。
有了线程本地缓存,线程在绝大多数情况下都只和独属于自己的本地缓存进行交互,因此能够大幅减少与其它线程争抢公共PoolArena元数据互斥锁的场景并提高所访问内存空间的缓存局部性,从而大幅提升内存分配的吞吐量。
当然,线程本地缓存也不是没有缺点的,线程本地缓存毫无疑问增加了内存的开销,规格繁多的本地池化内存段对象多数时候都只会静静地在缓存中等待被使用(视为内部碎片),因此线程本地所能缓存的池化内存段数量是被严格限制的,使用者需要在池化内存分配效率与空间利用率的取舍上达成平衡。具体的实现细节,我们在下文中结合源码再展开介绍。

2. MyNetty 池化内存线程本地缓存源码实现

在jemalloc的论文中提到,为了减少线程之间对Arena的争抢,jemalloc设置了多个Arena区域,并使用特别的算法使得每个Arena尽可能的被线程均匀的使用。Arena与线程是一对多的关系,而一个线程在进行池化内存分配前选择并永久绑定一个Arena。

2.1 PoolThreadLocalCache实现解析

在Netty中,参考jemalloc也同样是设置多个PoolArena,并令一个线程在进行最初的池化内存分配之前绑定一个PoolArena。
具体的逻辑在PooledByteBufAllocator中,PooledByteBufAllocator中为基于堆内存的HeapByteBuffer和基于堆外直接内存的DirectByteBuffer以数组的形式分别维护了N个PoolArena(heapArenas、directArenas)。
具体N为多少可以在allocator分配器的构造方法中通过参数设置,默认情况下其值取决与处理器的数量和内存大小。而具体的当前线程与其中某一个PoolArena进行绑定的逻辑则位于PoolThreadLocalCache这一核心数据结构之中。

MyNetty PooledByteBufAllocator实现源码
public class MyPooledByteBufAllocator extends MyAbstractByteBufAllocator{private final MyPoolArena<byte[]>[] heapArenas;private final MyPoolThreadLocalCache threadLocalCache;public MyPooledByteBufAllocator() {this(false);}public MyPooledByteBufAllocator(boolean useCacheForAllThreads) {// 简单起见,arena的数量与处理器核数挂钩(netty中有更复杂的方式去配置,既可以构造参数传参设置,也可以配置系统参数来控制默认值)int arenasNum = Runtime.getRuntime().availableProcessors() * 2;// 初始化好heapArena数组heapArenas = new MyPoolArena.HeapArena[arenasNum];for (int i = 0; i < heapArenas.length; i ++) {MyPoolArena.HeapArena arena = new MyPoolArena.HeapArena(this);heapArenas[i] = arena;}// 创建threadLocalCache,让线程绑定到唯一的PoolArena中,并且在small/normal分配时,启用相关的内存块缓存this.threadLocalCache = new MyPoolThreadLocalCache(useCacheForAllThreads,this);}@Overrideprotected MyByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {// 从ThreadLocal中获得当前线程所绑定的PoolArena(如果是线程第一次分配,则ThreadLocal初始值获取时会进行绑定)MyPoolThreadCache cache = threadLocalCache.get();MyPoolArena<byte[]> targetArena = cache.heapArena;return targetArena.allocate(cache, initialCapacity, maxCapacity);}
}
MyNetty PoolThreadLocalCache实现源码
public class MyPoolThreadLocalCache extends MyFastThreadLocal<MyPoolThreadCache> {private final boolean useCacheForAllThreads;private static final int DEFAULT_SMALL_CACHE_SIZE = 256;private static final int DEFAULT_NORMAL_CACHE_SIZE = 32;private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY = 32 * 1024;private static final int DEFAULT_CACHE_TRIM_INTERVAL = 8192;private MyPooledByteBufAllocator myPooledByteBufAllocator;private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);private final Runnable trimTask = new Runnable() {@Overridepublic void run() {myPooledByteBufAllocator.trimCurrentThreadCache();}};MyPoolThreadLocalCache(boolean useCacheForAllThreads,MyPooledByteBufAllocator myPooledByteBufAllocator) {this.useCacheForAllThreads = useCacheForAllThreads;this.myPooledByteBufAllocator = myPooledByteBufAllocator;}@Overrideprotected synchronized MyPoolThreadCache initialValue() {// 从allocator所包含的HeapArena中挑选出一个最合适的HeapArena与当前线程绑定// 什么是最合适的?就是被其它线程绑定次数最少的(最少被使用 leastUsed),也就是相对最空闲的PoolArenafinal MyPoolArena<byte[]> heapArena = leastUsedArena(myPooledByteBufAllocator.getHeapArenas());final Thread current = Thread.currentThread();// 如果没有配置useCacheForAllThreads=true,则只有FastThreadLocalThread等特殊场景才启用PoolThreadCache缓存功能if (useCacheForAllThreads ||// If the current thread is a FastThreadLocalThread we will always use the cachecurrent instanceof MyFastThreadLocalThread) {final MyPoolThreadCache cache = new MyPoolThreadCache(heapArena, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE,DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);return cache;}else{// No caching so just use 0 as sizes.// 不启用缓存,则返回一个smallCacheSize/normalCacheSize都为0的特殊PoolThreadCache// 但是挑选一个heapArena和当前线程绑定的逻辑依然是存在的,只是没有small/normal具体分配时的线程本地缓存return new MyPoolThreadCache(heapArena, 0, 0, 0, 0);}}@Overrideprotected void onRemoval(MyPoolThreadCache threadCache) {threadCache.free(false);}private <T> MyPoolArena<T> leastUsedArena(MyPoolArena<T>[] arenas) {if (arenas == null || arenas.length == 0) {return null;}MyPoolArena<T> minArena = arenas[0];// optimized// If it is the first execution, directly return minarena and reduce the number of for loop comparisons belowif (minArena.numThreadCaches.get() == 0) {// 当前Allocator第一次分配PoolArena,快速返回第一个即可return minArena;}// 否则从所有的PoolArena中找到相对来说被最少得线程绑定的那个PoolArenafor (int i = 1; i < arenas.length; i++) {MyPoolArena<T> arena = arenas[i];if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {minArena = arena;}}return minArena;}
}
  • MyNetty由于只支持了堆内存的池化内存分配,因此只有heapArenas数组。同时也简化了设置Arenas个数的逻辑。
  • PoolThreadLocalCache是一个FastThreadLocal类型的数据结构。每一个线程都有一个自己所独有的PoolThreadCache,这个结构就是用于存放PooledByteBuf池化内存对象的线程本地缓存。
    initialValue方法中可以看到,Netty并不会无脑的为所有线程都开启线程本地缓存,默认情况下useCacheForAllThreads为false,则只会为FastThreadLocalThread类型的线程设置线程本地缓存。
    对于普通线程,则返回了一个参数均为0的,本质上是无缓存作用的PoolThreadCache对象(PoolThreadCache内部的工作原理我们在下一小节展开)。
  • 在初始化PoolThreadCache时,通过leastUsedArena方法找到当前绑定线程最少的PoolArena与线程专属的PoolThreadCache缓存进行关联。
    通过这一方式,实现了上述jemalloc中提到的将线程与PoolArena进行绑定的功能,并尽可能的使每个PoolArena负载平衡。

2.2 PoolThreadCache实现解析

  • 在PoolThreadCache中,类似Small规格的池化内存分配,为每一种特定的规格都维护了一个对象池。
    对象池是MyMemoryRegionCache结构,small规格的对象池是smallSubPageHeapCaches数组,normal规格的对象池是normalHeapCaches数组。
  • 默认情况下,所有的small规格都会进行缓存;而normal规格中只有32kb的这一最小规格才会被缓存,更大的规格将不会进行线程本地缓存。
    这样的设计出于两方面的考虑,首先是每个线程都要维护线程本地缓存,缓存的池化内存段会占用大量的内存空间,所要缓存的规格越多,则内存碎片越多,空间利用率越低。
    其次,绝大多数情况下越大规格的内存申请的频率越低,进行线程本地缓存所带来的吞吐量的提升越小。基于这两点,netty将最大的本地缓存规格设置为了32kb。
    当然,如果应用的开发者的实际场景中就是有大量的大规格池化内存的分配需求,netty也允许使用对应的参数来控制实际需要进行线程本地缓存的最大规格。
  • MyMemoryRegionCache中都维护了一个队列存放所缓存的池化内存段对象(挂载在Entry节点上,handle连续内存段);与PooledByteBuf对象池的设计一样,该队列也是专门针对多写单读的并发场景优化的MpscQueue。
    因为从线程本地缓存中获取池化内存段的只会是持有者线程,而归还时则可能在经过多次传递后,由其它线程进行归还而写回队列。
PoolThreadCache结构示意图

img

MyNetty PoolThreadCache实现源码
/*** 池化内存分配线程缓存,完全参考Netty的PoolThreadCache* */
public class MyPoolThreadCache {final MyPoolArena<byte[]> heapArena;private final int freeSweepAllocationThreshold;// Hold the caches for the different size classes, which are small and normal.private final MyMemoryRegionCache<byte[]>[] smallSubPageHeapCaches;private final MyMemoryRegionCache<byte[]>[] normalHeapCaches;private int allocations;private final AtomicBoolean freed = new AtomicBoolean();MyPoolThreadCache(MyPoolArena<byte[]> heapArena,int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,int freeSweepAllocationThreshold) {this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;this.heapArena = heapArena;if (heapArena != null) {// 为每一种Small类型的size,都创建一个SubPageMemoryRegionCache来做缓存smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools);// 为每一种Normal类型的size,都创建一个NormalMemoryRegionCache来做缓存normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);// 当前Arena所绑定的ThreadCache数量加1heapArena.numThreadCaches.getAndIncrement();} else {// No heapArea is configured so just null out all cachessmallSubPageHeapCaches = null;normalHeapCaches = null;}// Only check if there are caches in use.if ((smallSubPageHeapCaches != null || normalHeapCaches != null)&& freeSweepAllocationThreshold < 1) {throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)");}}private static <T> MyMemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches) {if (cacheSize > 0 && numCaches > 0) {// 为每一种Small类型的size,都创建一个SubPageMemoryRegionCache来做缓存MyMemoryRegionCache<T>[] cache = new MyMemoryRegionCache[numCaches];for (int i = 0; i < cache.length; i++) {cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);}return cache;} else {return null;}}@SuppressWarnings("unchecked")private static <T> MyMemoryRegionCache<T>[] createNormalCaches(int cacheSize, int maxCachedBufferCapacity, MyPoolArena<T> area) {if (cacheSize > 0 && maxCachedBufferCapacity > 0) {// 所能缓存的buf规格的最大值,由chunk和用户参数指定的最小值决定的int max = Math.min(area.mySizeClasses.getChunkSize(), maxCachedBufferCapacity);List<MyMemoryRegionCache<T>> cache = new ArrayList<>() ;// 为每一种Normal类型的size,都创建一个NormalMemoryRegionCache来做缓存int nSizes = area.getMySizeClasses().getSmallAndNormalTotalSize();for (int idx = area.numSmallSubpagePools;idx < nSizes && area.getMySizeClasses().sizeIdx2size(idx).getSize() <= max ; idx++) {cache.add(new NormalMemoryRegionCache<>(cacheSize));}return cache.toArray(new MyMemoryRegionCache[0]);} else {return null;}}/*** Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise*/boolean allocateSmall(MyPoolArena<?> area, MyPooledByteBuf<?> buf, int reqCapacity, MySizeClassesMetadataItem sizeClassesMetadataItem) {return allocate(cacheForSmall(area, sizeClassesMetadataItem), buf, reqCapacity);}/*** Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise*/boolean allocateNormal(MyPoolArena<?> area, MyPooledByteBuf<?> buf, int reqCapacity, MySizeClassesMetadataItem sizeClassesMetadataItem) {return allocate(cacheForNormal(area, sizeClassesMetadataItem), buf, reqCapacity);}@SuppressWarnings({ "unchecked", "rawtypes" })private boolean allocate(MyMemoryRegionCache<?> cache, MyPooledByteBuf buf, int reqCapacity) {if (cache == null) {// no cache found so just return false herereturn false;}boolean allocated = cache.allocate(buf, reqCapacity, this);if (++ allocations >= freeSweepAllocationThreshold) {allocations = 0;trim();}return allocated;}boolean add(MyPoolArena<?> area, MyPoolChunk chunk,  ByteBuffer nioBuffer, long handle, int normCapacity) {MySizeClassesMetadataItem mySizeClassesMetadataItem = area.getMySizeClasses().size2SizeIdx(normCapacity);MyMemoryRegionCache<?> cache = cache(area, mySizeClassesMetadataItem);if (cache == null) {// 当前无法缓存return false;}if (freed.get()) {return false;}return cache.add(chunk, nioBuffer, handle, normCapacity);}/// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.@Overrideprotected void finalize() throws Throwable {try {super.finalize();} finally {free(true);}}/***  Should be called if the Thread that uses this cache is about to exist to release resources out of the cache*/void free(boolean finalizer) {// As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure// we only call this one time.if (freed.compareAndSet(false, true)) {// 当前ThreadLocal被销毁时,会调用free方法,在free方法中需要将当前已缓存的、未实际释放掉的内存都放回到PoolArena中// 只有这样,才能避免内存泄露free(smallSubPageHeapCaches, finalizer);free(normalHeapCaches, finalizer);if (heapArena != null) {heapArena.numThreadCaches.getAndDecrement();}}}private static int free(MyMemoryRegionCache<?>[] caches, boolean finalizer) {if (caches == null) {return 0;}int numFreed = 0;for (MyMemoryRegionCache<?> c: caches) {numFreed += free(c, finalizer);}return numFreed;}private static int free(MyMemoryRegionCache<?> cache, boolean finalizer) {if (cache == null) {return 0;}return cache.free(finalizer);}private MyMemoryRegionCache<?> cacheForSmall(MyPoolArena<?> area, MySizeClassesMetadataItem sizeClassesMetadataItem) {return cache(smallSubPageHeapCaches, sizeClassesMetadataItem.getSize());}private MyMemoryRegionCache<?> cacheForNormal(MyPoolArena<?> area, MySizeClassesMetadataItem sizeClassesMetadataItem) {int sizeIdx = sizeClassesMetadataItem.getTableIndex();// We need to substract area.numSmallSubpagePools as sizeIdx is the overall index for all sizes.int idx = sizeIdx - area.numSmallSubpagePools;return cache(normalHeapCaches, idx);}private MyMemoryRegionCache<?> cache(MyPoolArena<?> area, MySizeClassesMetadataItem sizeClassesMetadataItem) {switch (sizeClassesMetadataItem.getSizeClassEnum()) {case NORMAL:return cacheForNormal(area, sizeClassesMetadataItem);case SMALL:return cacheForSmall(area, sizeClassesMetadataItem);default:throw new Error();}}private static <T> MyMemoryRegionCache<T> cache(MyMemoryRegionCache<T>[] cache, int sizeIdx) {if (cache == null || sizeIdx > cache.length - 1) {// 当前规格无法缓存return null;}return cache[sizeIdx];}void trim() {trim(smallSubPageHeapCaches);trim(normalHeapCaches);}private static void trim(MyMemoryRegionCache<?>[] caches) {if (caches == null) {return;}for (MyMemoryRegionCache<?> c: caches) {trim(c);}}private static void trim(MyMemoryRegionCache<?> cache) {if (cache == null) {return;}cache.trim();}/*** Cache used for buffers which are backed by TINY or SMALL size.*/public static final class SubPageMemoryRegionCache<T> extends MyMemoryRegionCache<T> {SubPageMemoryRegionCache(int size) {super(size, SizeClassEnum.SMALL);}@Overrideprotected void initBuf(MyPoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, MyPooledByteBuf<T> buf, int reqCapacity,MyPoolThreadCache threadCache) {chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);}}/*** Cache used for buffers which are backed by NORMAL size.*/public static final class NormalMemoryRegionCache<T> extends MyMemoryRegionCache<T> {NormalMemoryRegionCache(int size) {super(size, SizeClassEnum.NORMAL);}@Overrideprotected void initBuf(MyPoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, MyPooledByteBuf<T> buf, int reqCapacity,MyPoolThreadCache threadCache) {chunk.initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);}}
}
public abstract class MyMemoryRegionCache<T> {private final int size;private final Queue<Entry<T>> queue;private final SizeClassEnum sizeClassEnum;private int allocations;@SuppressWarnings("rawtypes")private static final MyObjectPool<Entry> RECYCLER = MyObjectPool.newPool(handle -> new Entry(handle));MyMemoryRegionCache(int size, SizeClassEnum sizeClassEnum) {this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);queue = new MpscUnpaddedArrayQueue<>(this.size);this.sizeClassEnum = sizeClassEnum;}/*** Add to cache if not already full.* @return true 当前线程释放内存时,成功加入到本地线程缓存,不需要实际的回收*         false 当前线程释放内存时,加入本地线程缓存失败,需要进行实际的回收*/@SuppressWarnings("unchecked")public final boolean add(MyPoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);// 尝试加入到当前Cache的队列里boolean queued = queue.offer(entry);if (!queued) {// If it was not possible to cache the chunk, immediately recycle the entryentry.recycle();}return queued;}public final boolean allocate(MyPooledByteBuf<T> buf, int reqCapacity, MyPoolThreadCache threadCache) {Entry<T> entry = queue.poll();if (entry == null) {return false;}// 之前已经缓存过了,直接把对应的内存段拿出来复用,给本次同样规格的内存分配initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);entry.recycle();// allocations is not thread-safe which is fine as this is only called from the same thread all time.++ allocations;return true;}public final int free(boolean finalizer) {return free(Integer.MAX_VALUE, finalizer);}private int free(int max, boolean finalizer) {int numFreed = 0;for (; numFreed < max; numFreed++) {// 遍历所有已缓存的entry,一个接着一个进行实际的内存释放Entry<T> entry = queue.poll();if (entry != null) {freeEntry(entry, finalizer);} else {// all clearedreturn numFreed;}}return numFreed;}public final void trim() {int free = size - allocations;allocations = 0;// We not even allocated all the number that areif (free > 0) {free(free, false);}}@SuppressWarnings({ "unchecked", "rawtypes" })private  void freeEntry(Entry entry, boolean finalizer) {// Capture entry state before we recycle the entry object.MyPoolChunk chunk = entry.chunk;long handle = entry.handle;int normCapacity = entry.normCapacity;if (!finalizer) {// recycle now so PoolChunk can be GC'ed. This will only be done if this is not freed because of// a finalizer.entry.recycle();}// 将当前entry中缓存的handle内存段进行实际的回收,放回到所属的PoolChunk中chunk.arena.freeChunk(chunk, handle, normCapacity);}protected abstract void initBuf(MyPoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,MyPooledByteBuf<T> buf, int reqCapacity, MyPoolThreadCache threadCache);@SuppressWarnings("rawtypes")private static Entry newEntry(MyPoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {Entry entry = RECYCLER.get();entry.chunk = chunk;entry.nioBuffer = nioBuffer;entry.handle = handle;entry.normCapacity = normCapacity;return entry;}static final class Entry<T> {final MyObjectPool.Handle<Entry<?>> recyclerHandle;MyPoolChunk<T> chunk;ByteBuffer nioBuffer;long handle = -1;int normCapacity;Entry(MyObjectPool.Handle<Entry<?>> recyclerHandle) {this.recyclerHandle = recyclerHandle;}void recycle() {chunk = null;nioBuffer = null;handle = -1;recyclerHandle.recycle(this);}}
}

2.3 引入线程本地缓存后的池化内存分配与释放

在实现了线程本地缓存的功能后,我们再完整的梳理一下Netty中池化内存的分配与释放的流程。

池化内存分配
  1. 入口为PooledByteBufAllocator,当前线程获得独属于自己的线程本地缓存PoolThreadCache。
    初始化时,通过最少使用算法(leastUsedArena方法)找到目前负载最低的PoolArena与当前线程建立绑定关系。
    获取到当前线程所绑定的PoolArena后,通过其allocate方法进行池化内存分配。
  2. PoolArena的allocate方法中,先从Recycle对象池中获取一个未绑定底层内存的裸PooledByteBuf对象(newByteBuf方法),然后再尝试为其分配底层内存。
    通过计算所要申请的内存规格(SizeClasses类计算规范化后的规格),判断其是Small、Normal还是Huge规格级别,分别走不同的分配逻辑。
    分配时,优先从线程本地缓存中获取可用的内存段。如果本地缓存中已缓存对应规格的handle内存段,则直接将其与当前PooledByteBuf进行绑定后返回,完成内存分配。
  3. 如果本地缓存无法满足当前分配,则需要从所绑定的PoolArena中获取可供使用的空闲内存段。
    Huge规格内存申请由于较为少见,且缓存空间代价过大,因此Netty中不进行池化,通过单独的额外分配内存空间以满足其需求。
    Normal规格的池化内存分配基于伙伴算法,以PoolChunk为基础单位管理连续的内存段(PoolChunkList管理不同使用率的PoolChunk),在通过分割和合并内存段的方式来追踪内存段的使用情况。
    Small规格的池化内存分配使用slab算法,通过一系列的PoolSubpage集合管理相同规格的内存段插槽,使用bitmap来追踪各个内存段插槽的分配情况(已分配/未分配)。
  4. 被分配出去的底层内存,以handle的形式进行表征。handle是一个64位的long类型结构,共划分为5个属性,分别是runOffset、size、isUsed、isSubpage、bitmapIdx of subpage。
    第一个属性是runOffset,即当前内存段的起始位置在第几个Page页。
    第二个属性是size,即handle所标识的连续内存段有多长,一共几个Page页大小(类似一家人买了个连号,都挨着坐一起)。
    第三个属性是isUsed,是否使用。一个handle被分配出去了,isUsed=1;未被分配出去则isUsed=0。
    第四个属性是isSubPage,是否是SubPage类型的内存段,0代表是Normal级别,1代表是Small级别。
    第五个属性是bitmapIdx of subpage,仅用于small级别的分配(Normal分配所有位全为0),标识当前内存段位于所属PoolSubPage的第几个插槽。
  5. 分配完成后,对应的底层内存段被标识为已分配,其所对应的handle值与PooledByteBuf进行绑定,在后续释放内存时基于该handle对象定位到对应的连续内存段将其释放。
    PooledByteBuf在初始化时,计算好其在对应PoolChunk底层内存中的offset偏移量,在实际使用时通过该偏移量才能正确的读写到实际分配出去的对应内存地址。
池化内存释放
  1. Netty中通过引用计数法追踪PooledByteBuf对象的使用情况,在PooledByteBuf被初始化时其被引用数refCnt为1。后续每次被额外依赖,被引用数自增1;不再被使用时,通过release方法减少其被引用数。当refCnt被减为0时,说明当前PooledByteBuf已不再被使用,需要进行释放。
  2. 释放时,通过PooledByteBuf的deallocate方法进行释放。通过当前Buf对象所属的PoolArena的free方法,释放归还此前分配出的底层内存段。
    和分配一样,释放时也优先通过PoolThreadCache.add方法尝试将对应的handle内存段放入线程本地缓存中。
    如果线程本地缓存能够存放的了,则将当前内存段放入对应规格的MemoryRegionCache所对应的队列中,完成底层内存的释放。
  3. 如果线程本地缓存因为一些原因无法缓存,则找到当前内存段所属的PoolChunk,判断其大小规格,进行对应的释放逻辑。
    如果释放后满足一些条件,比如对应的PoolSubPage或PoolChunk完全空闲,则可能会把对应的PoolSubPage、PoolChunk等结构也一并回收掉。
  4. reallocate方法释放底层池化内存后,再尝试将PooledByteBuf归还给其所属的对象池中,以供后续新的池化内存分配申请使用。

总结

至此,在实现了线程本地缓存的功能后,MyNetty终于完成了目标中池化内存分配的全部功能。
为了简化理解的难度,相比Netty做了大量的简化工作,比如没有实现堆外内存的池化,内存泄露的自动检测,统计追踪,写死了本应灵活配置的各项参数等等。

在简化逻辑的基础上,再结合jemalloc论文有机的拆分原本耦合很紧密、非常复杂的Netty整体实现,大幅降低了读者在理解Netty关于池化内存管理的难度。
可以看到,在内存分配这一对性能非常敏感的底层功能上,大佬们绞尽脑汁进行了各方面的优化,以提升时间和空间效率;而在无法同时兼顾空间与时间的情况下,基于实际的使用场景,Netty站在巨人的肩膀上,参考jemalloc做到了非常好的平衡。
比如为高频使用Small规格内存池化,使用更占空间的slab算法;而相对低频使用的Normal规格内存池化,则使用更节约空间的伙伴算法;至于较为罕见的huge规格,则干脆放弃池化。再比如,为了提升吞吐量避免大量的互斥锁争抢,默认允许FastLocalThread线程启用线程本地缓存。但本地缓存的内存段数量被严格控制,同时能被缓存的规格大小也被严格控制,避免空间效率过低。

博客中展示的完整代码在我的github上:https://github.com/1399852153/MyNetty (release/lab9_thread_local_cache 分支)。
希望MyNetty系列博客能够帮助到对Netty感兴趣的读者,内容如有错误,还请多多指教。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/939175.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

杏帘招客饮,在望有山庄

五道口体校赌石五字班小生,专业方向举棋不定中. 一些为了忘却的纪念: 曾经的记忆:编程模板合集 曾经的我:我的2022Updated on 2025/10/17 V1.0.0

洛谷 P8512

有长度为 \(m\) 的序列 \(a\)(初始全为 \(0\))以及 \(n\) 次操作,每次操作形如 \(l, r, v\),表示将 \(a_{l} \sim a_r\) 变为 \(v\)。现在给定 \(q\) 组询问,每组询问给定 \(l, r\),输出若依次执行第 \(l \sim r…

从libtorch_cuda.so中提取某个函数的sass汇编指令

研究cuda算子用得着。 用cuobjdump工具,linux window上都有。安装cuda toolkit后就有了,和nvcc一套的wget https://download.pytorch.org/libtorch/cu128/libtorch-shared-with-deps-2.9.0%2Bcu128.zip 下载后解压 c…

【题解】成外友谊赛

T1 舞蹈机器⼈题目大意: 给一个可以向四个方向移动的小点,对于每次移动,如果上或下⽅向进⾏了⼀次移动,那么,下⼀次就只能往左或右⽅向进⾏⼀次移动,反之亦然。 求该点可以到达的位置数量。STEP 1. 对于这个问题…

小程序商城客服系统

vx:llike620 网址:gofly.v1kf.com 微信小程序对接有两种方式:webview组件嵌入页面,小程序客服组件对接消息 使用webview组件嵌入聊天页面形式。这种形式更加的灵活可控,可以传递更多的信息给到客服,例如可以把用户…

ubuntu 主机创建虚拟 ip,应对容器内部配置了宿主固定 ip,宿主迁移网络环境后容器报错

配置文件:/etc/netplan/01-network-manager-all.yaml# Let NetworkManager manage all devices on this system network:version: 2renderer: NetworkManager 备份配置:cd /etc/netplan cp 01-network-manager-all.…

2025权威报告:微信编辑器排版Top 10工具推荐(全链路解决方案)

作为一名深耕新媒体领域多年的运营者,我深知内容创作者在选题、写作、配图到分发过程中面临的痛点。从文案质量把控到微信编辑器排版效率提升,再到多平台同步分发,每个环节都需要耗费大量精力。为了帮助大家找到真正…

洛谷 P10149

给定序列 \(a_1,\dots,a_n(n \le 5 \times 10^5, 3s)\) ,\(m\) 次询问,每次询问给出 \(l,r\) ,问有多少组 \((i,j,k)\) 满足 \(l\le i<j<k\le r,\;a_i=a_k>a_j\) 。 这个题看起来如果离线下来按 \(r\) 排序…

从0到1构建企业数据资产 - 智慧园区

在当今这个时代,每一个制造企业都漂浮在一片浩瀚的数据海洋之上。然而,拥有数据并不等同于拥有洞察力,更不意味着拥有了真正的数据资产。对于许多制造业者而言,数据常常是沉睡的、割裂的、甚至混乱的,它们是亟待开…

2025.10.17

今天没课,在宿舍躺了一天。

一行代码清空所有 docker 容器的日志文件

一行代码搞定cd $(docker info 2>/dev/null | awk /Docker Root Dir/ {print $4})/containers 2>/dev/null && find . -name "*-json.log" -exec truncate -s 0 {} \; && find . -nam…

塔吊施工 “隐形风险” 克星!思通数科 AI 卫士精准识别核心部件隐患

在塔吊施工安全巡检中,传统人工排查模式常陷入 “看得见却查不细、查得慢还易遗漏” 的困境:钢管开裂、锈蚀等部件损耗,靠肉眼难发现细微痕迹,尤其在光线不足、雨天或夜间作业时,识别准确率骤降;扣件是否合规、是…

ubuntu配置vsftpd

在Ubuntu上快速创建FTP服务器,推荐使用vsftpd(Very Secure FTP Daemon)。以下是完整步骤: 1. 安装vsftpdbashsudo apt update sudo apt install vsftpd 2. 配置vsftpd 备份原始配置bashsudo cp /etc/vsftpd.conf /…

时序数据库 Apache IoTDB 等你“打卡”!2025 OSCAR 开源产业大会完整版议程揭晓

10月 28 日,期待与您在北京相见!开源作为一种开放共享的新型生产模式,已成为数字经济创新、开放、共享和可持续发展的核心驱动力。由中国通信标准化协会牵头主办,中国信息通信研究院具体承办的 2025 OSCAR 开源产业…

2024 CCPC Final F

F. Witnessing the Miracle / 见证奇迹 动态规划。 打表发现当一个被拿走的磁铁集合确定之后,未被拿走的磁铁的方向和距离可以由它左边被拿走的磁铁数量确定,因此,拿走磁铁的先后顺序不影响最终局面的状态,即从 \(…

Windows关闭端口占用

Microsoft Windows [版本 10.0.18363.535] (c) 2019 Microsoft Corporation。保留所有权利。 C:\Users\Administrator> netstat -ano | find "1099"TCP 0.0.0.0:1099 0.0.0.0:0 …

洛谷 P12865

给定长度为 \(n\) 的序列 \(a\) 和 \(q\) 次操作。每次操作为对 \(a\) 进行一次冒泡排序(\(a_i > a_{i + 1}\) 时交换)或者查询 \(a_l \sim a_r\) 之和。 对于一次冒泡排序,显然会把最大值挪到最后面。所以,当 …

ubuntu清理内存缓存

sudo sync; echo 3 | sudo tee /proc/sys/vm/drop_caches

ubuntu常用技巧

获取qq的pid sudo netstat -anpt | grep qq | awk {print $7} | awk -F / {print $1} | head -n 1 获取无线网卡的ip地址 ifconfig wlp2s0 | grep inet | head -n 1 | awk {print $2} 获取首行 全局范围的IPv6地址…

单线程如何撑起百万连接?I/O多路复用:现代网络架构的基石

单线程如何撑起百万连接?I/O多路复用:现代网络架构的基石I/O多路复用(I/O Multiplexing)是一种允许单个线程同时监视多个文件描述符的I/O模型。其核心价值在于,它将应用程序从低效的I/O等待中解放出来,实现了“一…