十大免费建站app怎样做网站初中生
web/
2025/10/6 18:55:56/
文章来源:
十大免费建站app,怎样做网站初中生,制作网站公司那家好,用wordpress建企业网站目录
一#xff0c;项目介绍
1.1 关于高并发内存池
1.2 关于池化技术
1.3 关于malloc
二#xff0c;定长内存池实现
2.1 实现详情
2.2 完整代码
三#xff0c;高并发内存池整体设计
四#xff0c;threadcache设计
4.1 整体设计
4.2 哈希桶映射对齐规则
4.3 …目录
一项目介绍
1.1 关于高并发内存池
1.2 关于池化技术
1.3 关于malloc
二定长内存池实现
2.1 实现详情
2.2 完整代码
三高并发内存池整体设计
四threadcache设计
4.1 整体设计
4.2 哈希桶映射对齐规则
4.3 申请内存函数实现
4.4 TLS无锁访问
五centralcache设计
5.1 整体设计
5.2 Span的设计
5.3 完善thread cache类的申请内存函数
5.4 centralcache类的实现
六pagecache设计
6.1 整体设计
6.2 page cache详细设计
七申请内存过程联调
7.1 第一次申请
7.2 第二次申请
7.3 第1025次申请
八回收内存设计
8.1 threadcache回收内存
8.2 centralcache回收内存
8.3 pagecache回收内存
九回收内存过程联调 十优化
10.1 大于256KB内存块的申请和释放
10.2 释放对象时优化不传大小
10.3 读取映射关系时的加锁问题
十一多线程下对比malloc性能测试
十二使用基数树优化
12.1 性能瓶颈分析
12.2 关于基数树
12.3 使用基数树优化代码
12.4 最终测试
12.5 周边问题 一项目介绍
1.1 关于高并发内存池
当前项目是实现一个高并发的内存池原型为 Google 的一个开源项目 tcmalloc 全称 Threead-Caching Malloc现成缓存malloc能够将内存配合线程进行高效的管理最终实现的接口用于替换系统内存分配的的相关函数mallcofree tcmalloc以比传统malloc更高的的效率出名比如微软的Go语言就直接用它做了自己底层的内存分配器 该项目目的是为了学习tcmalloc的精华理解其核心思想简化tcmalloc的核心框架进而模拟实现一个迷你版的高并发内存池 该项目设计的技术栈有C/C数据结构操作系统内存管理单例模式多线程与线程安全等
1.2 关于池化技术
“池化技术”就是程序预先向系统申请一块资源然后由程序自行管理之所以要这么做是因为申请资源和释放都有成本的就比如你给对方打电话然后说一个子打一次电话说十个字就要打十次电话所以不如打一次电话一次性说十个字每次申请资源时直接从“池”中获取释放时将资源重新放回“池中”这样可以使程序申请释放内存的效率变高很多在计算机中除“内存池”外还有线程池对象池连接池等所以池化技术的应用非常广泛 问题内存池主要解决什么问题 解答内存池主要是为了提高申请释放内存时的效率能够避免让程序因为频繁的申请和释放内存导致整体效率降低除此之外我们还需要尝试解决外部碎片和内部碎片的问题 外部碎片我们有一些空闲的小内存块但由于这些内存块不连续所以就可能导致总体的空内存够用但是还是程序还是会申请失败的问题内部碎片主要是由于一些对齐的需求导致分配出去的空间中一些内存无法使用 所以要同时兼顾解决两种内存碎片是本项目的一大亮点同时也是一大难点 1.3 关于malloc
C/C中我们要动态申请内存是通过malloc函数去申请的包括C中的new实际上也是封装了malloc函数本质上还是malloc去申请malloc申请内存的流程大致如下图
malloc的实现方式有很多种一般不同编译器平台用的都是不同的。比如微软的VS系列中的malloc就是微软自行实现的而Linux下的gcc用的是glibc中的ptmalloc 问题有malloc了为啥还要搞一个内存池呢 解答malloc其实就是一个通用的内存池在任何场景下都可以使用但是malloc并不是针对某种场景专门设计的这也意味着malloc在任何场景下都不会有很高的性能而内存池就是抓住了这一点内存池的任务就是为了提高效率 二定长内存池实现
定长内存池就是针对固定大小内存块的申请和释放的内存池由于定长内存池只需要支持固定大小内存块的申请和释放因此我们可以将其性能做到极致并且在实现定长内存池时不需要考虑内存碎片等问题因为我们 申请/释放 的都是固定大小的内存块
实现定长内存池的目的是为了熟悉一下对简单内存池的控制其次这个定长内存池后面会作为高并发内存池的一个基础组件 2.1 实现详情 ①如何实现定长 方法一使用非类型模板参数使得该内存池中申请到的对象的大小都是N
templatesize_t N //非类型模板参数
class ObjectPool
{};
方法二定长内存池也叫做对象池在创建对象池时对象池可以根据传入的对象类型的大小来实现定长因为传入对象的大小都是固定的
templateclass T //也可以用T来代替N实现定长的作用因为T的大小是固定的
class ObjectPool
{}; ②定长内存池的成员变量有哪些 private://void* _memory nullptr;char* _memory nullptr; //void*在切割内存池时会不好用所以换成char*它代表一个字节方便申请时操作size_t _remainBytes 0; //表示当前内存池中还剩下的空间大小防止用完后继续申请造成越界void* _freeList nullptr; //自由链表用来管理进程还给我的空间对于向堆申请到的大块内存我们可以用一个指针来对其进行管理然后再用一个变量来记录这块内存的长度。 再然后释放回来的内存块也需要管理由于释放回来的内存块都是一个一个小的块我们可以将这些内存块搞成一个链表这个链表叫做自由链表第三个变量就是指向这个自由链表的指针 ③内存池如何管理进程释放回来的内存块 进程还回来的定长内存块用自由链表组织取来由于链表的每个节点需要一个相互连接的指针所以我们可以让内存块的前4个字节32位或8个字节64位作为连接指针里面存的就是后面内存块的起始地址因此我们的操作进行一个简单的头插先往内存块的前4或前8个字节里面写入一个指向自由链表中第一个节点的指针然后更新 _freeList 即可 代码就这样写 但是有个问题上面的程序在32位下能跑在64位下就挂掉了32位下我们对内存块的前4个字节进行写入64位下我们就需要对内存块的钱8个字节进行写入但由于我们是int*int*解引用后只能访问4个字节需要把int换成long long但是我们并不知道目前是32位还是64位所以有了下面的问题 问题如何让obj在32位下访问4个字节在64位下访问8个字节 解答我们可以先把obj强转成(void**)然后再进行解引用如下代码 *(void**)obj _freeList; //void**解引用看的是void*32位下void*是四字节64位下void*是八字节这里其实只要是二级指针都可以int**也可以 首先obj最开始是T*强转成了void**一个二级指针int* obj解引用后就是取一个int的大小也就是4字节这个32位和64位下都是固定的而void**解引用后看的是void*的大小而这个void*就很有讲究了在32位下void*的大小是4字节64位下void*的大小是8字节注意是这个指针的大小不是这个指针访问空间的大小而对void**解引用后访问的空间大小取决于void*的大小而void*的大小在32位下是4字节在64位下是8字节所以也就达到了我们的目的如上述代码 所以我们的释放函数Delete就是
void Delete(T* obj)
{//直接头插其中_freeList是指向自由链表第一个节点的指针不是哨兵节点哦//显示调用析构函数清理对象obj-~T();//此时obj是一个T*所以我也不知道obj是多大所以我们把obj强转成一个int*然后再对这个int*解引用就可以访问到内存块的前4个字节了//*(int*)obj _freeList; //读取内存块的前4个字节然后解引用进行写入//32位下上面的程序没问题但是64位下就挂了因为32位下是四字节64位下是8个字节了所以32位下用int64位下要用long long*(void**)obj _freeList; //void**解引用看的是void*32位下void*是四字节64位下void*是八字节这里其实只要是二级指针都可以int**也可以_freeList obj;
} ④内存池如何为我们申请对象 最开始的时候先申请一大块内存
申请资源时内存池应该优先把还回来的内存块对象再次重复利用因此如果自由链表当中有内存块的话就直接从自由链表头删一个内存块进行返回即可 如果自由链表当中没有合适内存块那么我们就在大块内存中切出合适长度的内存块进行返回当内存块切出后及时更新_memory指针的指向以及的_remainBytes值即可 两个注意事项 由于我们后面进程释放内存块给内存池时我们需要将内存块头插到自由链表中那么我们必须保证切出来的空间至少能存下一个地址假设用户只申请了1个字节我们也要至少切4个字节或8个字节然后递过去此外当大块剩余的大块内存已经不足以切出合适大小时我们应该再次向堆申请一块更大的空间并更新 _memory指针和 _remainBytes 的值 所以我们的New函数这样写
T* New()
{T* obj nullptr; //指向申请的内存块的开头把开头的指针返回给上一层if (_freeList ! nullptr) //优先把还回来的内存块对象重复利用看一下自由链表{void* next *(void**)_freeList;obj (T*)_freeList;_freeList next;return obj;}else //自由链表没有合适内存块时再去大块内存去切{if (_remainBytes sizeof(T)) //当剩余的空间不够一个T对象大小时重新开大空间{_memory (char*)malloc(128 * 1024);_remainBytes 128 * 1024;if (_memory nullptr) throw std::bad_alloc(); //开失败了表示系统也没有更多的内存块了抛异常}obj (T*)_memory;//32位下void*大小是4字节64位下是8字节如果T的大小也就是进程要申请空间的大小不足以存下一个指针时我们要单独处理一下size_t objSize sizeof(T) sizeof(void*) ? sizeof(void*) : sizeof(T);_memory objSize;_remainBytes - objSize;}//只开了空间未初始化也不好//定位new显示调用T的构造函数初始化new(obj)T;return obj;
} ⑤如何跳过malloc直接向堆申请空间 上面我们是用malloc的所以我们是先调用malloc然后malloc再去向系统申请那么我们可以不可以跳过malloc我们自己直接去向系统申请呢 ①在Windows下我们用的接口是一个 VirtualAlloc函数这个函数的作用就是直接跳过malloc按页为单位直接向堆要内存空间 ②在Linux下也有这样的系统调用 brk函数作用是将数据段的最高指针mmap函数是在进程的虚拟地址空间中在堆栈中间称为文件映射区的区域找一块空闲的虚拟内存 这两种方式分配的都是虚拟内存并没有时机分配物理内存。在第一次访问的时候会发生缺页中断操作系统分配物理内存然后建立虚拟内存和物理内存之间的映射关系 下面通过条件编译分别实现下
#ifdef _WIN32#include Windows.h
#else//...
#endif//直接去堆上申请按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr VirtualAlloc(0, kpage13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr nullptr)throw std::bad_alloc();return ptr;
}2.2 完整代码 我们的定长内存池包括头文件和测试代码 下面是ObjectPool.h的代码 #pragma once
#includeiostream
#includevector
#includetime.h#ifdef _WIN32
#include Windows.h
#else
//...
#endif//直接去堆上申请按页申请空间
inline static void* SystemAlloc(size_t t)
{
#ifdef _WIN32void* ptr VirtualAlloc(0, t 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else // linux下brk mmap等//由于brk和mmap涉及到虚拟内存的扩展和收缩有点复杂所以linux环境下还是用mallocvoid* ptr malloc(128 * 1024)
#endifif (ptr nullptr)throw std::bad_alloc();return ptr;
}//定长内存池
//templatesize_t N //非类型模板参数
templateclass T //也可以用T来代替N实现定长的作用因为T的大小是固定的
class ObjectPool
{
public:T* New(){T* obj nullptr; //指向申请的内存块的开头把开头的指针返回给上一层if (_freeList ! nullptr) //优先把还回来的内存块对象重复利用看一下自由链表{void* next *(void**)_freeList;obj (T*)_freeList;_freeList next;return obj;}else //自由链表没有合适内存块时再去大块内存去切{//32位下void*大小是4字节64位下是8字节如果T的大小也就是进程要申请空间的大小不足以存下一个指针时我们要单独处理一下size_t objSize sizeof(T) sizeof(void*) ? sizeof(void*) : sizeof(T);if (_remainBytes objSize) //当剩余的空间不够一个T对象大小时重新开大空间{_remainBytes 128 * 1024; //每次开空间后更新剩余空间大小//_memory (char*)malloc(128 * 1024);_memory (char*)SystemAlloc(_remainBytes 13);if (_memory nullptr) throw std::bad_alloc(); //开失败了表示系统也没有更多的内存块了抛异常}obj (T*)_memory;_memory objSize;_remainBytes - objSize;}//只开了空间未初始化也不好//定位new显示调用T的构造函数初始化new(obj)T;return obj;}void Delete(T* obj){//直接头插其中_freeList是指向自由链表第一个节点的指针不是哨兵节点哦//显示调用析构函数清理对象obj-~T();//此时obj是一个T*所以我也不知道obj是多大所以我们把obj强转成一个int*然后再对这个int*解引用就可以访问到内存块的前4个字节了//*(int*)obj _freeList; //读取内存块的前4个字节然后解引用进行写入//32位下上面的程序没问题但是64位下就挂了因为32位下是四字节64位下是8个字节了所以32位下用int64位下要用long long*(void**)obj _freeList; //void**解引用看的是void*32位下void*是四字节64位下void*是八字节这里其实只要是二级指针都可以int**也可以_freeList obj;}private://void* _memory nullptr;char* _memory nullptr; //void*在切割内存池时会不好用所以换成char*它代表一个字节方便申请时操作size_t _remainBytes 0; //表示当前内存池中还剩下的空间大小防止用完后继续申请造成越界void* _freeList nullptr; //自由链表用来管理进程还给我的空间
}; 下面是test.cpp的代码主要负责测定长内存池的性能 #includeObjectPool.hstruct TreeNode
{int _val;TreeNode* _left;TreeNode* _right;TreeNode():_val(0), _left(nullptr), _right(nullptr){}
};void TestObjectPool()
{// 申请释放的轮次const size_t Rounds 3;// 每轮申请释放多少次const size_t N 1000000;std::vectorTreeNode* v1;v1.reserve(N);size_t begin1 clock();for (size_t j 0; j Rounds; j){for (int i 0; i N; i) v1.push_back(new TreeNode);for (int i 0; i N; i) delete v1[i];v1.clear();}size_t end1 clock();ObjectPoolTreeNode TNPool;std::vectorTreeNode* v2;v2.reserve(N);size_t begin2 clock();for (size_t j 0; j Rounds; j){for (int i 0; i N; i) v2.push_back(TNPool.New());for (int i 0; i N; i) TNPool.Delete(v2[i]);v2.clear();}size_t end2 clock();std::cout 传统 new 和 delete: end1 - begin1 std::endl;std::cout 定长内存池 New 和 Delete: end2 - begin2 std::endl;
}int main()
{TestObjectPool();
}
结果如下 上述源码地址放在文章结尾
三高并发内存池整体设计 项目主要解决什么问题 现代很多开发环境都是多核多线程那么申请内存必然存在激烈的锁竞争问题malloc内部源码虽然也有针对多线程的申请锁的设计但是我们项目的原型tcmalloc在多线程高并发的场景下能拥有更高的小所以我们实现的内存池主要考虑下面几个问题
性能问题多线程环境下锁竞争问题内存碎片问题 整体设计 主要由下面三个层次组成 thread cache每个线程都会有一个这个用于分配小于等于256KB的内存由于每个线程都有一个所以没有锁能提高效率central cache这个是所有线程共享的所以有锁作用是按时机分配或回收thread cache的内存比如thread内存不够时给它内存thread内存过多时回收内存简单来说就是线程缓存受中心缓存管理page cache 负责向操作系统申请并管理以页为单位的大块内存会在合适时机对central cache的内存进行补给和回收主要将回收的内存尽可能进行合并组成更大的连续内存块缓解内存碎片问题
其实central cache是一个哈希桶的结构只有当多个线程访问到一个桶的时候才会加锁所以这里的锁竞争不会很激烈后面会详细讲解 四threadcache设计
4.1 整体设计
前面我们实现了定长内存池由于其只支持固定大小内存块的申请和释放所以里面只需要一个自由链表管理释放回来的内存块那么现在我们要支持申请和释放不同大小的内存块那么就需要多个自由链表来管理因此thread cache实际上是一个哈希桶结构每个桶都是一个自由链表thread cache支持小于等于256KB也就是256 * 1024字节的内存空间的申请如果我们针对每种字节数的内存块都搞一个自由链表进行管理那么就需要20多万个自由链表光是存储这些自由链表的头指针就需要消耗大量空间显然得不偿失这时我们可以做一些平衡的牺牲和结构体那样对字节数自定义一种对齐规则假设我们的对齐数是8那么thread cache的结构如下图此时当线程申请1~8字节的内存时会直接给出8字节而当线程申请9~16字节的内存时会直接给出16字节以此类推
当线程要申请某一大小的内存块时
①先经过计算得到对齐后的字节数找到对应的哈希桶②如果该哈希桶中的自由链表中有内存块那就从自由链表中头删一个内存块进行返回③如果该自由链表已经为空了那么就向下一层的central cache进行获取
但此时由于对齐的原因就可能会导致一些碎片化的内存无法利用比如线程只申请了6字节的内存而thread cache给了8字节这多给出的2字节就浪费掉了这就是内存碎片的内部碎片 由于后面我们会频繁使用到自由链表于是干脆将其单独分离出来组成一个类放到一个公共头文件 Common.h static void* NextObj(void* obj) //获取空间的前四个或前八个字节
{return *(void**)obj;
}//自由链表管理切分好的小对象
class FreeList
{
public:void Push(void* obj) //头插{assert(obj);NextObj(obj) _freeList;_freeList obj;_size;}void PushRange(void* start, void* end, size_t n) //支持插入多个对象{assert(start);assert(end);NextObj(end) _freeList;_freeList start;_size n;}void* Pop() //头删{assert(_freeList);void* obj _freeList;_freeList NextObj(obj);--_size;return obj;}bool Empty(){return _freeList nullptr;}size_t MaxSize(){return _maxSize;}size_t Size(){return _size;}
private:void* _freeList nullptr;size_t _maxSize 1; //size_t _size 0; //记录桶的数据个数
}; 4.2 哈希桶映射对齐规则
映射规则我们也搞成一个类叫做SizeClass也放在Common.h里 ①如何对齐 上面说过我们要自己定义一个对齐规则而对齐规则主要就是看对齐数8这个数字是最合适的因为不论是32位还是64位机器都至少要保证内存块能存下一个指针但是如果是8那么我们需要256 * 1024 / 8 32768 个桶这个数量还是有点多了所以我们可以分阶段每个阶段匹配一个对齐数tcmalloc源码里面更复杂下面是简化版 每阶段字节数每阶段对齐数哈希桶下标[1, 128]8[0, 16)[128 1, 1024]16[16, 72)[1024 1, 8 * 1024]128[72, 128)[8 * 1024 1, 64 * 1024)1024[128, 184)[64 * 1024 1, 256 * 1024]8 * 1024[184, 208) 浪费率 公式浪费率 浪费的字节数 / 对齐后的字节数 以上面的公式为例我们要得到最大浪费率就要让分子最大分母最小比如以[128 1, 1024]这个区间为例对齐数是16那么最大浪费的字节数是15最小对齐就是129要变成16的倍数也就是144那么该区间的最大浪费率就是 15 / 144 ≈ 10.4%同理后面两个区间的最大浪费率也都在10%左右这个浪费率已经控制得非常小了当然tcmalloc的浪费率更低当然也很复杂 对齐我们需要搞两个函数一个用来获取向上对齐后的字节数一个获取对应哈希桶的下标
//计算对象大小的对齐映射规则
class SizeClass
{
public:// 整体控制在最多10%左右的内碎片浪费// [1,128] 8byte对齐 freelist[0,16)第一个只能是8字节对齐因为 存了指针的// [1281,1024] 16byte对齐 freelist[16,72)// [10241,8*1024] 128byte对齐 freelist[72,128)// [8*10241,64*1024] 1024byte对齐 freelist[128,184)// [64*10241,256*1024] 8*1024byte对齐 freelist[184,208)//获取向上对齐后的字节数static inline size_t RoundUp(size_t bytes);//获取对应哈希桶的下标static inline size_t Index(size_t bytes);
};在获取某一字节数向上对齐的字节数时先判断该字节数属于哪一个阶段然后调用子函数根据每个阶段的对齐数得到最终thread cache要分配给进程的字节数
static const size_t PAGE_SHIFT 13;static inline size_t RoundUp(size_t size) 搞成内联是因为方便其它类不需要通过创建对象就能通过SizeClass访问该函数
{if (size 128){return _RoundUp(size, 8);}else if (size 1024){return _RoundUp(size, 16);}else if (size 8 * 1024){return _RoundUp(size, 128);}else if (size 64 * 1024){return _RoundUp(size, 1024);}else if (size 256 * 1024){return _RoundUp(size, 8*1024);}else //大于256K{return _RoundUp(size, 1 PAGE_SHIFT);}
}
//第一个参数是要申请的字节数第二个参数是该阶段的对齐数
static inline size_t _RoundUp(size_t size, size_t alignNum)
{//1常规方法// size_t//if (size % alignNum 0) return size; //正好是对应阶段的对齐数的倍数所以不需要处理//else return (size / alignNum 1) * alignNum;//2巧妙方法return ((size alignNum - 1) ~(alignNum - 1));//假设是区间[1, 8]那么前面括号的size可能是1到8的任意一个数最大值就是8 8 - 1 15//假设alignNum是8也就是这个区间的对齐数那么右边括号就是77的二进制为000111取反后就是111000//左边括号可以表示的数在[8, 15]假设是15二进制为001111所以15与7的结果就是001000是8//由于7的二进制的后面三位全是0所以就算size是该区间的任何值与的结果的后三位也都会是0而前面的保持不变//再举个例子比如size是3对齐数是8前面括号的值是10二进制是001010后面括号仍然是7二进制是000111//7取反后是111000所以拿这个二进制和10与一下结果仍然是001000还是8//上面两个例子就可以说明通过这种方法最终得到的值的二进制就都与8对齐了//当对齐数是其它区间的对齐数时也是同样的结果
} 再之后我们需要根据size先知道该字节数属于哪一个区间然后通过子函数返回该字节数对应的哈希桶的下标
// 计算映射的是哪一个自由链表桶
static inline size_t Index(size_t bytes)
{assert(bytes MAX_BYTES);// 每个区间有多少个链static int group_array[4] { 16, 56, 56, 56 };if (bytes 128) {return _Index(bytes, 3); //第二个参数是2的次方因为子函数是直接通过位移得到对齐数的}else if (bytes 1024) { //计算出的桶是在该区间的桶的数量所以要加上全面区间桶的数量才是算出的桶的下标return _Index(bytes - 128, 4) group_array[0];}else if (bytes 8 * 1024) {return _Index(bytes - 1024, 7) group_array[1] group_array[0];}else if (bytes 64 * 1024) {return _Index(bytes - 8 * 1024, 10) group_array[2] group_array[1] group_array[0];}else if (bytes 256 * 1024) {return _Index(bytes - 64 * 1024, 13) group_array[3] group_array[2] group_array[1] group_array[0];}else {assert(false);}return -1;
}static inline size_t _Index(size_t size, size_t alignNum)
{/*if (bytes % alignNum 0)return bytes / alignNum - 1;elsereturn bytes / alignNum;*/return ((size (1 alignNum) - 1) alignNum) - 1;
} 4.3 申请内存函数实现 ①类的总体设计 按照上面的规则我们自由链表个数是208thread cache允许申请的最大内存是256KB所以我们现在Common.h里定义好参数
tatic const size_t MAX_BYTES 256 * 1024;
static const size_t NFREE_LIST 208; //总的桶的数量
然后就是ThreadCache类我们需要实现四个成员函数目前先实现申请内存的函数后面三个函数后面再讲
class ThreadCache
{
public:void* Allocate(size_t size); //线程就通过这个函数来申请空间void Deallocate(void* ptr, size_t size) //size要告诉我在哪个桶//如果要申请的内存块大于256KB就通过这个函数从中心缓存获取更多资源void* FetchFromCentralCache(size_t index, size_t size);//如果还给我的内存块数量超过了桶的最大存储数目就要往上层传递void ListTooLong(FreeList list, size_t size);private:FreeList _freeLists[NFREE_LIST]; //开256 * 1024个字节的空间
};
目前我们主要实现申请和释放内存的那两个后面两个我们到后面设计中心缓存时在来设计 ②线程申请内存的函数实现 线程往thread cache申请内存时先计算对齐后的实际分配数和对应的哈希桶下标 如果申请的空间大小在自由链表上有合适的空间就返回这个空间如果不满足就去中心缓存申请
void* Allocate(size_t size) //线程就通过这个函数来申请空间
{assert(size MAX_BYTES); //MAX_BYTES在公共头文件中定义为256 * 1024 KBsize_t alignSize SizeClass::RoundUp(size); //计算size_t index SizeClass::Index(size); //一共是208个桶计算该字节数在哪一个桶然后再去这个桶里拿内存if (!_freeLists[index].Empty()) //如果申请的空间大小在自由链表上有空闲的空间就返回这个空间return _freeLists[index].Pop();elsereturn FetchFromCentralCache(index, alignSize); //后面实现
} 4.4 TLS无锁访问
有个问题我们的线程如何才能获取到这个thread cache呢
要实现每个线程都能无锁访问自己的thread cache需要用到线程本地存储TLS(Thread Local Storage)这是一种变量存储方法通过TLS创建的变量在它所在的线程是全局可访问的但是不能呢被其它线程访问到这样就确保了独立性 Windows和Linux下都有自己的TLS机制 Windows线程本地存储Thread Local Storage - 坦坦荡荡 - 博客园 LinuxThread Local Storage线程局部存储TLS - 知乎 直接在ThreadCache.h文件下加上
static _declspec(thread) ThreadCache* pTLS_ThreadCache nullptr; //通过TLS每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache nullptr)
{pTLSThreadCache new ThreadCache;
}下面是对thread cache分配内存和验证TLS的测试
由于我们不知道线程要申请和释放的内存是多大所以我们不能让线程直接访问thread cache的Allocate函数需要再封装一层在头文件ConcurrentAlloc.h里面实现 #pragma once#includeCommon.h
#include ThreadCache.hstatic void* ConcurrentAlloc(size_t size) //线程通过调用这个函数来获取资源
{if (size MAX_BYTES) //如果线程申请的内存小于256KB就先获取属于自己的thread cache然后再调用它的Allocate{// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象if (pTLS_ThreadCache nullptr){pTLS_ThreadCache new ThreadCache;}cout std::this_thread::get_id() : pTLS_ThreadCache endl;return pTLS_ThreadCache-Allocate(size);}else //如果大于256KB就去上层找{//后面实现}
}static void ConcurrentFree(void* ptr) //线程通过这个函数来释放内存
{//后面实现
}
然后我们搞一个UnitTest.cpp文件专门用来测试
includeConcurrentAlloc.h
void TLStest()
{//每个线程都各自打印TLS的地址且地址不变验证了TLS是存在的std::thread t1([]() {for (size_t i 0; i 5; i){void* ptr ConcurrentAlloc(6);}});t1.join();std::thread t2([]() {for (size_t i 0; i 5; i){void* ptr ConcurrentAlloc(7);}});//如果把t1.join()写在这里由于是一起执行打印结果会混乱不清t2.join();
}int main()
{TLStest();return 0;
} 五centralcache设计
5.1 整体设计
thread cache自己最先向操作系统申请256KB大小的内存当这256KB空间用完之后thread cache就要向central cache申请了并且当thread cache拥有超过256KB的空间时要把多余的内存还给central这就是central cache主要的作用
central cache的结构与thread cache一样都是哈希桶结构并且遵循相同的对齐映射规则这样的好处就是当thread cache的某个桶中没有内存了就可以直接到central cache中对应的哈希桶里去取内存就行了并且共用同一个映射规则发生错误了也容易排查 中心缓存是所有线程共享的所以要有加锁访问中心缓存在加锁时不是全加的因为它和thread cache都是哈希桶结构桶与桶之间是没有竞争关系的所以只有当多个线程通过映射规则访问到同一个桶时才会加锁这个锁我们叫做“桶锁”thread cache的每个桶下面挂的都是自由链表是一个一个已经切好的内存块而central cache挂的是一个一个的spanspan本身有两个指针一个负责SpanList链表的连接一个则指向另一个自由链表这个自由链表里就算一个一个切好了的内存块这些内存块的大小是和哈希桶的编号是对应的 然后就是中心缓存的申请和释放内存 申请内存 当thread cache内存用完时就会批量向central cache申请一些这里的批量申请有点类似与Tcp协议拥塞控制检测网络拥堵法检测报文的那个慢开始算法然后去central cache找对应的桶然后去一个一个遍历span去找合适的内存块并返回这个过程是要加锁的不过是桶锁所以效率不会降低很多当中心缓存里的SpanList的所有Span都没有空间时会再去向page cache申请一个更大的Span然后按大小切好作为自由链表连接到一起然后再取合适的Span给thread cache再central cache的Span类中搞一个变量use_count记录分配了多少个对象出去每分配一个出去就use_count 释放内存 当thread cache的内存超过256KB或者线程销毁时会将内存释放会central cache中并--use_count当use_count减到0时表示所有的对象都回到了Span中则将Span释放给page cache然后page cache会将内存块进行合并 问题如上图为什么需要多个Span为什么第三个Span是空的 解答 假设只有一个Span假设在8Byte这个桶下那么一页要切成8字节的小块那么就要切一千多个小块我们不可能把这1000多个块一次性就给thread cache所以一次给thread cache一个批量的比如一次给5个或8个这就是慢开始所以我们的逻辑就是刚开始只有一个Span下面挂4个小块当这四个小块用完后就再来了一个Span里面再挂几个小块就这样一路下去就变成了多个Span然后再是以上图为例可以看到第三个Span是空的表明第三个Span的小块全被申请走了但是前面两个Span不为空为啥呢不是说好的只有一个span的小块用完之后才会再来一个Span的吗因为线程不仅仅是社申请也在释放假设第三个Span分配完后线程把空间又还回给第一和第二个Span了这就出现了上图的情况 所以Central cache起到的作用主要是“承上启下” 5.2 Span的设计
central cache里面挂的也是一个一个的哈希桶每个桶是一个链表但是不是和thread cache一样的自由链表而是先搞了一层SpanList链表然后链表的每个节点就是一个一个的Span每个Span再挂的就是自由链表
所以我们要搞两个东西一个是Span一个是SpanList这两玩意儿都放在Common.h头文件里 Span设计 每个进程都有自己的地址空间在32位下进程地址空间的大小是2^32在64位下就是2^64页的大小一般设4KB或8KB我们以8KB也就是2^13个字节为例在32位下进制地址空间可以被切分为2^(32 - 13) 2^19个页而在64位下达到了2^51个页如果需要给页号编号在64位下的取值返回是[0, 2^51)因此不能直接用一个无符号整型来存储页号这里我们采用条件编译来解决这个问题如下代码
//在32位下进程地址空间为4G2^32个字节一个页是8K为2^13所以会被分成2^19大概50w个页
//在64位下进程地址空间为16G2^64字节一个页8K会被分成2^51个页
//所以需要分开搞用条件编译
#ifdef _WIN64 //在Win32配置下_Win32有定义_Win64没有定义但是在x64环境下_Win32和_Win64都有定义所以要把_WIN64放在前面
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else//linux
#endifstruct Span //span管理多个连续页大块内存跨度结构
{PAGE_ID _pageId 0; //大块内存起始页的页号便于之后page cache进行前后页的合并size_t _n 0; //表示该Span拥有页的数量Span* _next nullptr; //双向链表Span* _prev nullptr;size_t _useCount 0; //表示已经分配给thread cache的小块内存的计数void* _freeList nullptr; //切好的小块内存的自由链表bool _isUse false; //表示该span是否在被使用默认为false为了加锁使用size_t _objSize 0; //这个表示切好的对象的大小方便释放内存时计算大小
};
设计成双链表是为了方便把内存还给page cache后删除这个span所以搞成了带头双向循环的链表结构 SpanList设计 //带头双向循环链表
class SpanList
{
public:SpanList(){_head new Span;_head-_next _head;_head-_prev _head;}Span* Begin(){return _head-_next;}Span* End(){return _head;}bool Empty(){return _head-_next _head; //如果为真就返回空}void PushFront(Span* span){Insert(Begin(), span);}Span* PopFront(){Span* front _head-_next;Erase(front);return front;}void Insert(Span* pos, Span* newSpan) //在指定位置前面插入{assert(pos);assert(newSpan);Span* prev pos-_prev;prev-_next newSpan;newSpan-_prev prev;newSpan-_next pos;pos-_prev newSpan;}void Erase(Span* pos){assert(pos);assert(pos ! _head); //不能把哨兵干掉Span* prev pos-_prev;Span* next pos-_next;prev-_next next;next-_prev prev;}
private:Span* _head;
public:std::mutex _mtx; //桶锁
};
//注意删除的span不需要delete释放因为是还给page cache 5.3 完善thread cache类的申请内存函数
当线程通过thread cache申请空间时如果申请的空间大于256KB或者thread cache里已经没有足够的空间时thread cache就会去向central cache要内存 上面说过thread cache找中心缓存要空间时采用的慢开始算法
size_t NumMoveSize(size_t size) //thread cache一次从中心缓存获取多少个对象慢开始算法
{assert(size 0);int num MAX_BYTES / size; //先计算出单个对象大小//如果对象很小一次给你多一点点如果对象很大给你少一点点if (num 2) num 2;if (num 512) num 512;return num; //就是给一个折中的大小
}
通过满开始算法就可以让thread cache找中心缓存要内存时一次别要太多减少浪费
//如果要申请的内存块大于256KB或剩余空间不足时就去从中心缓存获取更多资源
void* FetchFromCentralCache(size_t index, size_t size)
{//慢开始反馈调节算法//1一次不会向中心缓存要很多、、太多对象因为太多了用不完会浪费//2随着申请对象数量增加batchNum会不断直至上限//3size越大一次向central cache要的batchNum就越小//3size越小一次向central cache要的batchNum就越大size_t batchNum min(_freeLists[index].MaxSize(), NumMoveSize(size));if (_freeLists[index].MaxSize() batchNum){_freeLists[index].MaxSize() 1;}void* start nullptr;void* end nullptr;//往central cache要内存我们要batchNum个size大小的内存然后通过start和end返获得开始和结束地址//假设我们要10但是中心缓存的对应的桶下面没有10了那我thread cache要不要呢虽然不够但我还是要//假设我只要1个但是由于前面满开始算法可能会给我2个或者更多//所以返回值表示中心缓存实际给我们的个数size_t actualNum CentralCache::GetInstance()-FetchRangeObj(start, end, batchNum, size);assert(actualNum 0);//批量申请的好处假设我只要一个中心缓存给了我3个那么我把剩下的两个挂在自由链表上那么当下一次我又只要一个的时候就不需要再去page里要了if (actualNum 1){assert(start end);return start;}else //走到这里就是获取了多个就把这多个挂到对应的桶的自由链表里去{_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);return start;} Common.h里的FreeList类也要提供支持插入多个对象 //class FreeList
void PushRange(void* start, void* end, size_t n) //支持插入多个对象
{assert(start);assert(end);NextObj(end) _freeList;_freeList start;_size n;
} 在C的algorithm头文件中有一个min函数这是一个函数模板而在windows.h头文件中也有一个min这是一个宏。由于调用函数模板时需要进行参数类型的推演因此当我们调用min函数时编译器会优先匹配windows.h当中以宏的形式实现的min此时当我们以std::min的形式调用min函数时会编译报错这时我们只能选择将std::去掉让编译器调用windows.h当中的min宏。 #include algorithm//algorithm算法头文件里有一个函数模板但是windows.h里面有个宏冲突了
#ifdef _WIN32#include windows.h
#else//Linux相关头文件
#endif 5.4 centralcache类的实现 ①结构设计 由于central cache所有线程共享所以我们搞成单例模式并提供一个全局访问点这样所有线程就都可以通过这个访问点访问中心缓存了这里我们搞成饿汉模式
#pragma once
#include Common.h
#include PageCache.h//它也是一个哈希桶thread cache下面挂的是切好的小对象但是不能每个位置给桶所以进行了一定程度的对齐和哈希映射
//它定义了一个管理大块内存对象的结构——span每个桶下面有很多span是双向链表
//单例模式
class CentralCache
{
public:static CentralCache* GetInstance() { return _sInst; } //全局访问点//获取一个非空的SpanSpan* GetOneSpan(SpanList list, size_t size);//从中心缓存获取一定数量的对象给thread cache该函数计算从span中申请的对象数量size_t FetchRangeObj(void* start, void* end, size_t batchNum, size_t size);void ReleaseListToSpans(void* start, size_t size);private:SpanList _spanLists[NFREE_LIST]; //thread cache有几个桶我就有几个桶
private://单例模式CentralCache() {} //把构造函数搞成私有CentralCache(const CentralCache) delete; //把拷贝构造干掉static CentralCache _sInst; //一上来直接搞一个对象
}; ②获取一定数量的对象给thread cache //获取一定数量的对象给thread cache
size_t FetchRangeObj(void* start, void* end, size_t batchNum, size_t size)
{size_t index SizeClass::Index(size); //算出在桶中对应的位置_spanLists[index]._mtx.lock(); //桶锁对单个桶加锁防止多个线程访问到同一个桶Span* span GetOneSpan(_spanLists[index], size); //要获取一个非空的spanassert(span); //找到的span不为空assert(span-_freeList); //span里也不为空start span-_freeList; //将span的地址通过start返回相当于将内存返回给了thread cache//①假设span有4个块thread要3个我们让end走两步走到第三个块然后把start和end返回//②然后第三个块的前4或8个字节指向的是第四个块的地址所以我们直接将end也就是第三个块的前4或8个地址赋值给span相当于把这三个块切出去了end start;size_t actualNum 1; //该变量用来返回表示实际给了threaad cache了多少个对象//如果span中只有4个对象但是thread cache要5个那么end访问到最后会崩所以这种情况就是有多少给多少for (size_t i 0; i batchNum - 1; i){if (NextObj(end) nullptr) break;end NextObj(end); //end往后移actualNum;}span-_freeList NextObj(end); //赋值span完成切块NextObj(end) nullptr; //将end的前4或8个字节赋值为空使其成为链表的最后一个节点span-_useCount actualNum;_spanLists[index]._mtx.unlock(); //解锁return actualNum; //从span中获取batchNum个对象如果不够batchNum有几个返回几个
} ③获取一个非空的Span GetOneSpan这个函数就是在central cache里获取一个span然后返回去如果central cache一个span都没有了就去page cache里获取并且把获取到的页进行切分然后放进central cache的桶里面 问题如何将page给我的大块span进行切分 解答page cache给我的是一个Span对象指针而Span本身不存储大块内存只负责存储大块内存的信息所以我们需要算出大块内存的起始地址 ①我们可以用这个span起始页号乘以一页的大小就能得到这个span的起始地址②然后用这个span的页数乘以一页的大小就可以得到整个span锁管理的内存块的大小 ③最后用起始地址加上内存块的大小就可以得到内存块的结束位置 再然后就是切分的详细过程可以配合代码加注释理解 //计算一次向page cache要几页空间返回要的页数
size_t NumMovePage(size_t size)
{size_t num NumMoveSize(size); //计算一次要多少个对象size_t npage num * size; //计算一次要多少个字节对象数目*单个对象大小npage PAGE_SHIFT; //总对象除以一个页的大小结果就是要申请的页数if (npage 0) npage 1; //如果要申请的大小一个页都没有我们默认申请一个页return npage;
}//获取一个非空的Span
Span* GetOneSpan(SpanList list, size_t size)
{//先在中心缓存查看有没有非空的SpanSpan* it list.Begin();while (it ! list.End()){if (it-_freeList ! nullptr)return it; //说明有空余内存块的span返回剩余的elseit it-_next; //往后遍历}//走到这里说明没有任何有内存块的span了需要往page cache获取list._mtx.unlock(); //先把中心缓存的桶锁解掉这样如果其它线程释放内存对象回来时不会阻塞PageCache::GetInstance()-GetMtx().lock(); //往page cache申请时需要把整个page cache锁住原因后面会讲Span* span PageCache::GetInstance()-NewSpan(NumMovePage(size)); //找page cache要一定数量的spanspan-_isUse true; //表明该span正在使用等释放时再设为falsespan-_objSize size; //保存切的数量PageCache::GetInstance()-GetMtx().unlock();//下面就是开始切分page cache给我的大块span这个不需要加锁因为这会儿其它线程访问不到这个spanchar* pageStart (char*)(span-_pageId PAGE_SHIFT); //算起始地址size_t bytes span-_n PAGE_SHIFT; //计算span的大块内存的大小单位字节char* pageEnd pageStart bytes; //计算内存块的结束地址//下面就是把大块内存切成自由链表链接起来span-_freeList pageStart; //先把大块内存的起始地址放在链表头节点上pageStart size; //找到下一个对象大小的位置void* tail span-_freeList; //辅助指针负责遍历大内存块while (pageStart pageEnd){NextObj(tail) pageStart; //把当前空间前4个字节搞成下一个空间的地址tail NextObj(tail); //tail指向下一个位置然后继续把下一个位置的前4个或8个字节搞成下下一个空间的地址pageStart size; //pageStart的地址也往后移//整个切分的过程其实就是把一块连续空间分成相等大小空间的起始地址头插到自由链表里所以空间本质是还是连续的}NextObj(tail) nullptr; //标记链表结尾list._mtx.lock(); //当切好span以后需要把span挂到桶里去的时候再重新加锁list.PushFront(span);return span;
} 六pagecache设计
6.1 整体设计
当thread cache内存不够或者要大于256KB的空间时找central cache要空间然后当central cache没有合适的Span时central cache则会去找page cache要更大的空间
page cache的基础结构和前面两个一样都是哈希桶但是和前面两个有很大区别
page cache采用的是直接定址法比如1号桶挂的都是大小为1页的span2号桶挂的是大小为2页的span依次类推page cache里的span是没有切分的完整的span因为page cache是服务于central cache的所以page cache只会把一个完整的span给central cache至于如何对这个span细分那就是central cache的事情了 然后就是page cache的申请和释放内存 申请内存 当central cache向page cache要内存时page cache先检查自己的对应大小的桶有没有span有就切一个返回如果没有则向更大页的桶找一个如果找到了就分裂成两个。加入我要4页page但是4号桶没有就一直往上面找找到10号桶有page就把这10页的page分成4页和6页的两个page如果到128号桶都没有合适的span则page cache调用mmapbrk或者VirtualAlloc等方式正式向系统申请一个128页的大内存然后挂在链表中接着重复上面的过程所以刚开始的时候page cache里没有任何的span所以最开始是向堆申请128页大小的内存块而page cache里面那些小的span和最终要返回给central cache的span都是由这个128页的span切出来的并且每次申请都是申请128页大小的span所以我们可以套用我们之前的定长内存池 释放内存 如果central cache释放回一个span则依次寻找span的前后page id的没有在使用的空闲span看看是否可以合并如果合并继续向前寻找这样就可以将切小的内存合并成一个大的span减少内存碎片 问题为什么最大是128 解答线程申请的单个对象最大限制为256KB也就是256 * 1024个字节前面说过一页大小是2或8KB我们定义为8KB所以128个页能被切分成4个256KB的对象是足够的并且这个最大不一定非得是128可以根据具体的需求进行设置 6.2 page cache详细设计 ①框架设计 central cache是桶锁这是因为多个thread cache向中心缓存申请时是可以同时访问不同的桶的但是当central cache往page cache要内存时是同时向page cache申请的所以在访问page cache的桶的时候要加锁page cache要加全局锁不能加桶锁因为我们可能要访问page cache的多个桶如果page cache用桶锁就会有频繁的加锁和解锁得不偿失所以我们用全局锁把整个page cache锁住page cache也可也以用单例模式这里我们仍然用饿汉模式刚开始就创建变量然后提供一个全局访问点
#pragma once#include Common.h
#include ObjectPool.h
#include PageMap.h
class PageCache
{
public:static PageCache* GetInstance() { return _sInst; } //全局访问点//获取从对象到span的映射Span* MapObjectToSpan(void* obj); //后面实现Span* NewSpan(size_t k); //新获取一个k数量页的Span返回给central cachevoid ReleaseSpanToPageCache(Span* span) //将中心缓存中完整的span还给page cache重新融合成一个大页std::mutex GetMtx() { return _pageMtx; }private: //单例模式 -- 饿汉模式PageCache() {}PageCache(const PageCache) delete;
private:SpanList _spanLists[NPAGES]; //129大小因为数组下标是从0开始的1映射1128映射128ObjectPoolSpan _spanPool; //如果需要new可以直接套用我们前面的定长内存池再次优化性能std::mutex _pageMtx; //不能用桶锁需要全部锁起来因为是整个PageCache活动的和span的分裂和合并有关系static PageCache _sInst; //饿汉模式一开始就把对象搞出来
}; ②申请内存设计 Span* NewSpan(size_t k) //新获取一个k数量页的Span{assert(k 0 k NPAGES);//先到第k号桶去找里面有没有spanif (!_spanLists[k].Empty()) //如果有span就返回{Span* kSpan _spanLists[k].PopFront();return kSpan;}else //第K号桶是空的就去后面的桶找并做切分{for (size_t i k 1; i NPAGES; i) //循环往后面找{//找到不为空的桶就开始切最后就是k页的span返回给中心缓存n-k页的span重新挂到n-k位置的桶去if (!_spanLists[i].Empty()){Span* nSpan _spanLists[i].PopFront(); //找到一个大的span切成k页和n-k页的两个span//Span* kSpan new Span;Span* kSpan _spanPool.New();//在nSpan的头部切一个k页下来只要变页号就可以了kSpan-_pageId nSpan-_pageId; //kSpan是管理nSpan的也就是上面图片每个内存块前面的那个spankSpan-_n k; //保存页数nSpan-_pageId k; nSpan-_n - k; //nSpan被拿走了k个页//把切出来的n-k个页再挂到n-k号桶上去去_spanLists[nSpan-_n].PushFront(nSpan);return kSpan;}}//循环如果走完后意味着page cache里也没有span了所以要直接向操作系统申请一个128页的span//比如线程第一次申请内存时central cache和page cache里是啥也没有的就会走到这个位置 //Span* bigSpan new Span;Span* bigSpan _spanPool.New();void* ptr SystemAlloc(NPAGES - 1); //向系统申请申请128页的空间bigSpan-_pageId (PAGE_ID)ptr PAGE_SHIFT; //设置页号bigSpan-_n NPAGES - 1; //设置页的数量就是129-1128个页_spanLists[bigSpan-_n].PushFront(bigSpan); //把申请出来的空间挂到对应的桶上面然后递归调用自己return NewSpan(k); //为了避免代码重复所以递归调用自己}} 七申请内存过程联调
7.1 第一次申请
申请内存的过程我们用单线程即可多线程不方便调试只要观察在一个桶当中能否正确申请到内存即可下面是测试函数 void* p1 ConcurrentAlloc(6);
void* p2 ConcurrentAlloc(8);
void* p3 ConcurrentAlloc(1); ①当线程第一次申请内存时线程通过TLS获取到自己专属的thread cache对象然后通过这个thread cache对象去申请内存 ②第一次申请的是6字节通过计算索引到了thread cache的第0号桶但是此时第0号桶中没有对象所以thread cache要向central cache申请 ③thread cache一次最多向central cache申请8字节大小也就是512个对象 所以在向中心缓存申请内存前还需要将该上限值与自由链表的_maxSize进行比较而第一次申请8字节也就是一个对象所以得出本次thread cache向central申请1个8字节的对象。 由于我们是使用的慢开始算法第一次申请一个对象第二次如果又申请一个对象就会自增长到申请2个 ④然后就是central cache找span给thread cache了先加锁获取到一个非空的span然后将这个span简单处理后返回给thread cache就ok了 ⑤然后就是要获取一个非空的span先找到第0号桶遍历span双链表看看有没有非空的span但是第一次申请肯定是没有的所以我们就要先向page cache申请。 当page cache给我们一个大span后我们需要将这个span进行加工然后挂到我自己的桶里面然后返回一个非空的span ⑥然后就是向page cache要空间但是page cache是要根据页申请的所以我们需要先计算下要向page cache申请多少个页 ⑦然后就是page cache给大块的span给central cache了但是第一次申请对象时page cache的所有桶中都是没有span的所以要直接向堆申请一个128页的大小的bigSpan 申请到之后填充span的信息然后挂到第128号桶下递归调用自己 ⑧然后把128页的内存拿出来切成1页和127页的两个span把1页的span返回把127页的span再次挂到127号桶下完成page cache给central cache内存的过程 7.2 第二次申请 ①当线程第二次申请8字节时由于thread cache的第0号桶第一次向central cache申请的8字节给了线程第一次申请的6字节了所以thread cache得再次向cache申请对象 ②第二次向central cache申请对象时由于慢增长机制所以第二次会向central cache申请2个8字节大小的空间第三次就会申请3个这就是满增长 ③然后就是central cache分配span给thread cache了由于第一次central cache向page cache申请了一页的内存块并将其切成了1024个8字节大小的对象因此这次central cache是可以直接从0号桶搞2个8字节空间给thread cache的不用再向page cache申请 ④线程只要1个8字节大小的对象所以thread cache把1个对象返回另一个对象就挂在自己的第0号桶上这样第三次再次申请1个对象时就不再去central cache要了第二次申请挂在第0号桶的那个返回即可 7.3 第1025次申请
我们先让线程申请1024次8字节对象然后观察在第1025次申请时central cache是否会再向page cache申请内存块以此测试代码是否正确central cache第一次向page cache申请一页内存这一页内存被 切成了1024个8字节大小的对象所以当第1025次申请时正常情况下会再次向page cache申请内存块 可以看到当第1025次申请时central cache第0号桶中的这个span的_useCount已经增加到了1024也就是说这1024个对象已经全部给线程用了此时central cache就要再向page cache申请一页的span来切分往后就是page cache把127号桶的那个127页大小的span拿出来切成1页和126页的两个span把1页的span返回给central cache126页的那个继续挂到126号桶上 每次调试页号不一样是正常的因为页号是根据地址来的每次重新编译运行后的地址也是不一样的 八回收内存设计
8.1 threadcache回收内存
当线程把空间还给thread cache后需要将该对象插入到对应哈希桶的自由链表当中即可但是随着还回来的空间不断增多我们对应桶的自由链表也会越来越长所以这些堆积在一个线程的thread cache是一种浪费我们应该把这些内存还给central cache好让其它线程去申请所以当thread cache中某个桶的自由链表超过它一次批量向central cache申请的个数时就要把这个自由链表里的内存还给central cache
//class ThreadCache
void Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size MAX_BYTES);size_t index SizeClass::Index(size); //找对应的桶号_freeLists[index].Push(ptr); //把指针放进去// 当链表长度大于一次批量申请的内存时就开始还一段list给central cacheif (_freeLists[index].Size() _freeLists[index].MaxSize())ListTooLong(_freeLists[index], size);
} 还给central cache就是从自由链表中一次取出多个对象然后将这些对象还给central cache
//如果还给我的内存块数量超过了桶的最大存储数目就要往上层传递
void ListTooLong(FreeList list, size_t size)
{void* start nullptr; //做输出型参数void* end nullptr;list.PopRange(start, end, list.MaxSize());CentralCache::GetInstance()-ReleaseListToSpans(start, size);
}
然后我们的自由链表也需要支持一次删除多个对象的接口
void PopRange(void* start, void* end, size_t n)
{assert(n _size);start _freeList;end start;for (size_t i 0; i n - 1; i) //取n个只要控制end就好了{//n - 1是因为end指向第一个节点我走两步即可三个返回给你第三个后面要置空end NextObj(end);}_freeList NextObj(end); //把后面的空间移到前面来NextObj(end) nullptr;_size - n;
} 补充 当thread cache的某个自由链表过长时就把这个自由链表中的所有对象全都还给central cache但是我们的PushRange接口还是设置了三个参数第三个参数表示指定数量的对象因为有时候即使链表很长了但是我们不一定要把所有对象全还给central cache这样是为了代码的可修改性 并且我们这里只是简单实现tcmalloc源码考虑的更多更多比如判断thread cache是否要还对象给central cache时还综合考虑了每个thread cache整体大小。比如当某个thread cache的总占用超过一定阈值时就将该thread cache的部分对象还一些给central 8.2 centralcache回收内存
当thread cache中某个自由链表太长时会将自由链表中地这些对象还给central cache中对应的span但是central cache的每个桶中不止一个span所以我们计算出还回来的对象应该挂到哪个桶里还要知道应该挂到这个桶的哪个span下那么如何根据thread cache还给我的内存块的地址推测出这个内存块原本所在的span我们大体的解决方法就是先存储页号和span之间的映射当thread cache还一个内存块给central cache时是给一个地址给中心缓存的然后就可以根据之前建立的映射根据页号就可以找到span了 ①如 page何根据内存块的地址找到其所在的页号 公式该页的页号 该页的地址 / 该页的大小
假设一页的大小是100那么地址0~99的页通过计算都等于0而地址100~188都属于第1页 ②如何找到一个对象对应的span映射机制 在page cache第一次分配span给central cache时就可以建立页号和span之间的映射这个映射在后面page cache合并页块时也会用到我们使用C的unordered_map用作映射容器 Span* NewSpan(size_t k) //新获取一个k数量页的Span
{assert(k 0 k NPAGES);//先到第k号桶去找里面有没有spanif (!_spanLists[k].Empty()) //如果有span就返回{Span* kSpan _spanLists[k].PopFront();//建立id和span的映射方便central cache回收小块内存时查找对应的spanfor (PAGE_ID i 0; i kSpan-_n; i){_idSpanMap[kSpan-_pageId i] kSpan;}return kSpan;}else //第K号桶是空的就去后面的桶找并做切分{for (size_t i k 1; i NPAGES; i) //循环往后面找{//找到不为空的桶就开始切最后就是k页的span返回给中心缓存n-k页的span重新挂到n-k位置的桶去if (!_spanLists[i].Empty()){Span* nSpan _spanLists[i].PopFront(); //找到一个大的span切成k页和n-k页的两个span//Span* kSpan new Span;Span* kSpan _spanPool.New();//在nSpan的头部切一个k页下来只要变页号就可以了kSpan-_pageId nSpan-_pageId; //kSpan是管理nSpan的也就是上面图片每个内存块前面的那个spankSpan-_n k; //保存页数nSpan-_pageId k; nSpan-_n - k; //nSpan被拿走了k个页//把切出来的n-k个页再挂到n-k号桶上去去_spanLists[nSpan-_n].PushFront(nSpan);//建立id和span的映射方便central cache回收小块内存时查找对应的spanfor (PAGE_ID i 0; i kSpan-_n; i){_idSpanMap[kSpan-_pageId i] kSpan;}return kSpan;}}//循环如果走完后意味着page cache里也没有span了所以要直接向操作系统申请一个128页的span//比如线程第一次申请内存时central cache和page cache里是啥也没有的就会走到这个位置 //Span* bigSpan new Span;Span* bigSpan _spanPool.New();void* ptr SystemAlloc(NPAGES - 1); //向系统申请申请128页的空间bigSpan-_pageId (PAGE_ID)ptr PAGE_SHIFT; //设置页号bigSpan-_n NPAGES - 1; //设置页的数量就是129-1128个页_spanLists[bigSpan-_n].PushFront(bigSpan); //把申请出来的空间挂到对应的桶上return NewSpan(k); //为了避免代码重复所以递归调用自己}
}
我们还要提供一个提供一个能让central cache根据页号获取span的函数
//class PageCache
//获取从对象到span的映射
Span* MapObjectToSpan(void* obj)
{PAGE_ID id ((PAGE_ID)obj PAGE_SHIFT); //算出页号auto ret _idSpanMap.find(id); //根据页号查找映射关系if (ret ! _idSpanMap.end()) //找到了{return ret-second; //返回该对象属于的span指针}else //没找到{//一般来说不会找不到如果找不到说明代码逻辑有问题应全面检查assert(false);return nullptr;}
} ③central cache回收内存 知道了thread cache还给我的内存块属于哪个span后就可以把这写内存块挂到对应的span自由链表中了然后更新一下span的_useCount计数即可如果在之后某个span的_useCount减为0了说明span的正在使用的小块全部回来了我们需要把这个span再还给page cache
//class CentralCache
void ReleaseListToSpans(void* start, size_t size)
{size_t index SizeClass::Index(size); //先算要还给哪个桶_spanLists[index]._mtx.lock();//开始把还给我的对象再还给对应的spanwhile (start){void* next NextObj(start);Span* span PageCache::GetInstance()-MapObjectToSpan(start); //获取对象对应的span//然后只需要把对象头插到对应的span即可NextObj(start) span-_freeList;span-_freeList start;span-_useCount--;if (span-_useCount 0) //当这个span的正在使用的小块全部回来了我们需要还给page cache{//这个span就可以再回收给page cache了page cache就可以去做前后页的合并_spanLists[index].Erase(span); //将这个span从链表中整个分离出来span-_freeList nullptr;span-_next nullptr;span-_prev nullptr;//释放span给page cache时使用page cache的大锁就可以了//这时把桶锁解掉为了让其它线程能够访问这个桶_spanLists[index]._mtx.unlock();PageCache::GetInstance()-GetMtx().lock();PageCache::GetInstance()-ReleaseSpanToPageCache(span); //还给page cachePageCache::GetInstance()-GetMtx().unlock();_spanLists[index]._mtx.lock();}start next; //往后走}_spanLists[index]._mtx.unlock();
} 8.3 pagecache回收内存
当central cache中某个span的_useCount减到0了那么central cache就要将这个span还给page cache了大体过程起始很简单page cache只要将换回来的span挂到对应的哈希桶上就行了但是我们的page cache最主要需要解决的问题是“缓解内存碎片”所以需要将换回来的span与其它空闲的span进行合并 合并的过程分为前合并和后合并
假设还回来的span的起始页号是num该span管理n页的空间那么首先往前合并时就要判断第num-1页对应的span是否空闲在一个桶内如果空闲就进行合并假设8页的span和8页的span合并成了一个16页的span那么就要继续把这个16页的span继续往前面找空闲的span继续合并直到不能合并位置往后合并也是同理所以在合并span时是需要通过页号获取对应span的所以我们之前的映射就派上用场了当我们通过页号找到对应的span时这个span可能挂在page cache也可能挂在central cache因为page cache刚分配一个span给central cache时这个span的_useCount默认就是等于0的这时候我们正在对该span进行切分的时候page cache就把这个span拿去合并了这显然不对这时候我们span结构中的_isUse成员就派上用场了负责标记该span是否正在被使用 下面就是page cache前后页的合并的具体步骤 ①建立映射 注意起始page cache和central cache都有一个映射之前的那个映射是page cache要分配给central cache的span的映射而page cache自己也是有span的所以这个映射是建立page cache自己的span的映射 ②进行span的合并 对span前后的页尝试进行合并缓解外内存碎片问题假如我的pageId为1000n为1我们可以找999页或者1001页是不是空闲的假如我的pageId为2000n为5我们可以找1999页或者2005页是不是空闲的然后1999的为2我就可以合并为一个7页的大页然后2005的n为5我可以再合成一个12页的那么这样是咋缓解内存碎片的问题的呢假设我们不进行合并那么就有两个5页的span和一个2页的span假设central cache要申请一个12页的span虽然我们的总数是够的但是因为是分散的无法分出12页的span给central cache这就是内存碎片问题加上合并后该问题就能缓解
void ReleaseSpanToPageCache(Span* span) //将中心缓存中完整的span还给page cache重新融合成大页
{while (1) //向前合并{PAGE_ID prevId span-_pageId - 1; //先找前面的页有没有空闲的auto ret _idSpanMap.find(prevId);if (ret _idSpanMap.end()) break; //没有找到空闲的无法合并直接breakSpan* prevSpan ret-second;if (prevSpan-_isUse true) break; //前面的页正在使用不合并if (prevSpan-_n span-_n NPAGES - 1) break; //合并出超过128页的span没办法管理也不合并//走到这里就可以往前合并了其实合并就是改变下对应span的属性span-_pageId prevSpan-_pageId;span-_n prevSpan-_n;//合并完后把空的span干掉_spanLists[prevSpan-_n].Erase(prevSpan);//delete prevSpan;_spanPool.Delete(prevSpan);}while (1) //向后合并{//假设一个span有5个页所以我们需要跳过这个span才能找到下一个span的起始位置//假设页号是1000有5个页就要走到1004的位置才是下一个span的开始也就是1000 5 - 1PAGE_ID nextId span-_pageId span-_n;auto ret _idSpanMap.find(nextId);if (ret _idSpanMap.end()) break; //和前面一样Span* nextSpan ret-second;if (nextSpan-_isUse true) break;if (nextSpan-_n span-_n NPAGES - 1) break;//走到这里开始和后面的span合并往后合并就不需要改变页号了span-_n nextSpan-_n;_spanLists[nextSpan-_n].Erase(nextSpan);//delete nextSpan;_spanPool.Delete(nextSpan);}_spanLists[span-_n].PushFront(span); //把合并好的span挂回双链表去span-_isUse false;//然后老样子建立该span与其尾页的映射_idSpanMap[span-_pageId] span;_idSpanMap[span-_pageId span-_n - 1] span;
} 九回收内存过程联调 ①线程释放内存函数 前面说过由于我们不知道线程要申请和释放的内存是多大所以我们不能让线程直接访问thread cache的Allocate和Deallocate函数需要再封装一层在头文件ConcurrentAlloc.h里面实现
static void ConcurrentFree(void* ptr, size_t size)
{pTLS_ThreadCache-Deallocate(ptr, size);
} ②主函数实现 前面申请内存时我们是进行了三次申请那么老样子我们这次仍然进行三次申请但加了三次释放以此来测试我们的释放流程是否符合预期
void TestFreeAlloc()
{void* p1 ConcurrentAlloc(6);void* p2 ConcurrentAlloc(8);void* p3 ConcurrentAlloc(1);ConcurrentFree(p1, 6);ConcurrentFree(p2, 8);ConcurrentFree(p3, 1);
}
这三次申请和释放的对象大小对齐后都是8字节所以涉及的桶就是thread cache和central cache的第0号桶以及page cache1号桶 ③thread cache的释放流程 前两次释放不会把内存块还给central cache 当第三次释放时_size的值为3了所以就要把对象还给central cache了 先将0号桶的对象全部弹出然后调用central cache的ReleaseListToSpans函数来回收内存块到span ④central cache的释放流程 由于我们还回来了3个对象所以循环会指向三次_useCount也会减减三次 当第三次减减时_useCount为0时就要将这个span近一步还给page ⑤page cache回收流程 当page cache拿到内存时第一件事就是向往前或往后合并缓解内存碎片问题但是由于第一次申请是把128切成了1页和127页的span所以不会往前合并 往后合并时找到127页的span合并成一个128页的span然后将原来127页的span干掉紧接着把这个128页的span挂到第128号桶中建立span与首尾页的映射最后再将span的状态设置成未使用即可 十优化 10.1 大于256KB内存块的申请和释放
我们可以将我们涉及到的内存块大小分为三个阶段一页的大小是8K 8 * 1024字节
释放内存的大小申请方式释放方式x 256KB32页向thread cache申请释放给thread cache32页 x 128页向page cache申请释放给page cachex 128页直接向堆申请直接释放给堆 ①申请大块内存 我们前面实现对齐函数RoundUp时早就预先对大于256K的情况做了判断 所以对之前的申请逻辑就要新增一些东西了当申请对象大于256KB时不用再向thread cache申请了直接调用page cache的NewSpan申请指定页数的span即可
static void* ConcurrentAlloc(size_t size) //线程通过调用这个函数来获取资源
{if (size MAX_BYTES) //如果线程申请的内存小于256KB就先获取属于自己的thread cache然后再调用它的Allocate{// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象if (pTLS_ThreadCache nullptr){static ObjectPoolThreadCache tcPool;pTLS_ThreadCache tcPool.New();//pTLS_ThreadCache new ThreadCache;}return pTLS_ThreadCache-Allocate(size);}else //如果大于256KB就去上层找{size_t alignSize SizeClass::RoundUp(size);size_t kpage alignSize PAGE_SHIFT;PageCache::GetInstance()-GetMtx().lock();Span* span PageCache::GetInstance()-NewSpan(kpage);span-_objSize size;PageCache::GetInstance()-GetMtx().unlock();void* ptr (void*)(span-_pageId PAGE_SHIFT);return ptr;}
} 256KB就32页当申请的页数大于128页时直接向堆申请如果小于128页就继续在page cache里申请
Span* NewSpan(size_t k) //新获取一个k数量页的Span
{assert(k 0 k NPAGES);if (k NPAGES - 1) //当k大于256K就直接向堆申请{void* ptr SystemAlloc(k);//Span* span new Span;Span* span _spanPool.New();span-_pageId (PAGE_ID)ptr PAGE_SHIFT;span-_n k;_idSpanMap[span-_pageId] span; //存一下span的页号方便释放return span;}//先到第k号桶去找里面有没有spanif (!_spanLists[k].Empty()) //如果有span就返回{Span* kSpan _spanLists[k].PopFront();//建立id和span的映射方便central cache回收小块内存时查找对应的spanfor (PAGE_ID i 0; i kSpan-_n; i){_idSpanMap[kSpan-_pageId i] kSpan;}return kSpan;}else //第K号桶是空的就去后面的桶找并做切分{for (size_t i k 1; i NPAGES; i) //循环往后面找{//找到不为空的桶就开始切最后就是k页的span返回给中心缓存n-k页的span重新挂到n-k位置的桶去if (!_spanLists[i].Empty()){Span* nSpan _spanLists[i].PopFront(); //找到一个大的span切成k页和n-k页的两个span//Span* kSpan new Span;Span* kSpan _spanPool.New();//在nSpan的头部切一个k页下来只要变页号就可以了kSpan-_pageId nSpan-_pageId; //kSpan是管理nSpan的也就是上面图片每个内存块前面的那个spankSpan-_n k; //保存页数nSpan-_pageId k; nSpan-_n - k; //nSpan被拿走了k个页//把切出来的n-k个页再挂到n-k号桶上去去_spanLists[nSpan-_n].PushFront(nSpan);//存储nSpan的页号跟nSpan的映射方便page cache回收内存时进行前后合并_idSpanMap[nSpan-_pageId] nSpan;_idSpanMap[nSpan-_pageId nSpan-_n - 1] nSpan;//-1是因为假如我从1000页开始到1004就是5个span了所以是1000 5 - 1所以要-1//建立id和span的映射方便central cache回收小块内存时查找对应的spanfor (PAGE_ID i 0; i kSpan-_n; i){_idSpanMap[kSpan-_pageId i] kSpan;}return kSpan;}}//循环如果走完后意味着page cache里也没有span了所以要直接向操作系统申请一个128页的span//比如线程第一次申请内存时central cache和page cache里是啥也没有的就会走到这个位置 //Span* bigSpan new Span;Span* bigSpan _spanPool.New();void* ptr SystemAlloc(NPAGES - 1); //向系统申请申请128页的空间bigSpan-_pageId (PAGE_ID)ptr PAGE_SHIFT; //设置页号bigSpan-_n NPAGES - 1; //设置页的数量就是129-1128个页_spanLists[bigSpan-_n].PushFront(bigSpan); //把申请出来的空间挂到对应的桶上return NewSpan(k); //为了避免代码重复所以递归调用自己}
} ②释放大块内存 释放我们是一样的当释放的内存块大于256KB时我们直接调用page cache的释放逻辑所以我们要先获取对应的span把锁加好在释放
static void ConcurrentFree(void* ptr, size_t size)
{if (size MAX_BYTES){assert(pTLS_ThreadCache);pTLS_ThreadCache-Deallocate(ptr, size);}else{Span* span PageCache::GetInstance()-MapObjectToSpan(ptr);PageCache::GetInstance()-GetMtx().lock();PageCache::GetInstance()-ReleaseSpanToPageCache(span);PageCache::GetInstance()-GetMtx().unlock();}
}
然后在page cache释放内存时如果span的大小大于128页了则直接释放给堆否则继续由page cache回收 对于SystemFree和SystemAlloc是一样的效率比free更快
//Common.h
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else//linux下sbrk unmmap等这里仍然用freefree(ptr);
#endif
} ③测试 void BigAlloc()
{void* p1 ConcurrentAlloc(257 * 1024);ConcurrentFree(p1, 257 * 1024);void* p2 ConcurrentAlloc(129 * 8 * 1024);ConcurrentFree(p2, 129 * 8 * 1024);
} ①申请257KB大小内存 当申请大于256KB内存块时直接向page cache申请由于257KB对齐后是33页所以page cache向内存申请128页大小的span然后分成33页和96页的span把33页的span直接返回释放时也是直接调用page cache回收函数由于是33页不大于128页所以会尝试将33页进行向上合并最终和95页的合并成128页再将128页的span挂到第128号桶下 ②申请129页大小内存 首先129页是大于256KB的所以会先调用page cache的申请函数然后page cache再次判断发现要申请的129页是大于128页的所以直接去找堆申请建立好映射后返回129页的span释放129页内存时也是直接还给堆不过这个不确定因为我们page cache的最大页数是可以自定义的假如我们把桶的最大存储页数改为200那么就不会直接释放到堆里去了 10.2 释放对象时优化不传大小
我们的申请和释放还是有点不方便因为目前我们申请内存时指明了大小但是在释放时仍然要指定大小太麻烦了而且联想到malloc和freefree是直接传入一个指针不需要传入大小的所以我们可以把我们的释放逻辑函数也搞成只传指针的
这时候我们span结构中的_objSize成员派上用场了这个代表这个span管理的内存块被切成一个个对象的大小 最后我们释放逻辑的代码可以优化成不传大小的了
static void ConcurrentFree(void* ptr)
{Span* span PageCache::GetInstance()-MapObjectToSpan(ptr);size_t size span-_objSize;if (size MAX_BYTES){assert(pTLS_ThreadCache);pTLS_ThreadCache-Deallocate(ptr, size);}else{PageCache::GetInstance()-GetMtx().lock();PageCache::GetInstance()-ReleaseSpanToPageCache(span);PageCache::GetInstance()-GetMtx().unlock();}
}
多线程测试代码
void TestMultiThread()
{std::thread t1([]() {std::vectorvoid* v;for (size_t i 0; i 5; i){void* ptr ConcurrentAlloc(6);v.push_back(ptr);}for (auto e : v){ConcurrentFree(e);}});std::thread t2([]() {std::vectorvoid* v;for (size_t i 0; i 5; i){void* ptr ConcurrentAlloc(6);}for (auto e : v){ConcurrentFree(e);}});t1.join();t2.join();
} 10.3 读取映射关系时的加锁问题
span与页号的映射关系是存储在PageCache类中的当我们访问这个映射关系时需要加锁因为page cache申请和释放时也会访问甚至改变映射关系所以我们需要加锁我们可以使用C的只能指针也可以用普通的锁 十一多线程下对比malloc性能测试
下面是多线程下对比malloc的测试代码
#includeConcurrentAlloc.h
#includestdio.h
//调试技巧1函数栈帧 条件断点 2// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vectorstd::thread vthread(nworks);std::atomicsize_t malloc_costtime 0;std::atomicsize_t free_costtime 0;for (size_t k 0; k nworks; k){vthread[k] std::thread([, k]() {std::vectorvoid* v;v.reserve(ntimes);for (size_t j 0; j rounds; j){size_t begin1 clock();for (size_t i 0; i ntimes; i){v.push_back(malloc(16));//v.push_back(malloc((16 i) % 8192 1));}size_t end1 clock();size_t begin2 clock();for (size_t i 0; i ntimes; i){free(v[i]);}size_t end2 clock();v.clear();malloc_costtime (end1 - begin1);free_costtime (end2 - begin2);}});}for (auto t : vthread){t.join();}printf(%u个线程并发执行%u轮次每轮次malloc %u次: 花费%u ms\n,nworks, rounds, ntimes, (int)malloc_costtime);printf(%u个线程并发执行%u轮次每轮次free %u次: 花费%u ms\n,nworks, rounds, ntimes, (int)free_costtime);printf(%u个线程并发mallocfree %u次总计花费%u ms\n,nworks, nworks * rounds * ntimes, malloc_costtime free_costtime);
}// 三个参数分别代表单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vectorstd::thread vthread(nworks);std::atomicsize_t malloc_costtime 0;std::atomicsize_t free_costtime 0;for (size_t k 0; k nworks; k){vthread[k] std::thread([]() {std::vectorvoid* v;v.reserve(ntimes);for (size_t j 0; j rounds; j){size_t begin1 clock();for (size_t i 0; i ntimes; i){/*cout i endl;if (j 1 i 1024){int x 0;}*/v.push_back(ConcurrentAlloc(16));//v.push_back(ConcurrentAlloc((16 i) % 8192 1));}size_t end1 clock();size_t begin2 clock();for (size_t i 0; i ntimes; i){ConcurrentFree(v[i]);}size_t end2 clock();v.clear();malloc_costtime (end1 - begin1);free_costtime (end2 - begin2);}});}for (auto t : vthread){t.join();}printf(%u个线程并发执行%u轮次每轮次concurrent alloc %u次: 花费%u ms\n, nworks, rounds, ntimes, (int)malloc_costtime);printf(%u个线程并发执行%u轮次每轮次concurrent dealloc %u次: 花费%u ms\n,nworks, rounds, ntimes, (int)free_costtime);printf(%u个线程并发concurrent allocdealloc %u次总计花费%u ms\n,nworks, nworks * rounds * ntimes, malloc_costtime free_costtime);
}int main()
{size_t n 10000;cout endl;BenchmarkConcurrentMalloc(n, 4, 10);cout endl endl;BenchmarkMalloc(n, 4, 10);cout endl;return 0;
} ①第一次测试 固定大小内存的释放和申请
v.push_back(malloc(16));
v.push_back(ConcurrentAlloc(16)); 结果如下 每个线程都是申请释放固定大小的对象那么就全都访问的是各自thread cache的同一个桶所以对应的都会访问到central cache的同一个桶那么central cache就没用了
所以下面的第二次测试就是测试申请和释放不同大小的内存 ②第二次测试 第二次测试是测试不同大小内存的申请和释放
v.push_back(malloc((16 i) % 8192 1));
v.push_back(ConcurrentAlloc((16 i) % 8192 1));
结果如下 ③分析结果 可以发现我们现在的内存池的效率和malloc比起来还是有差距的所以下面我们来分析下效率低下的原因在哪里然后尝试解决 十二使用基数树优化
12.1 性能瓶颈分析 VS2022环境下的性能分析步骤 我们要分析我们自己的内存池所以我们把申请次数调小一点并屏蔽malloc的
int main()
{size_t n 1000;cout endl;BenchmarkConcurrentMalloc(n, 4, 10);cout endl endl;//BenchmarkMalloc(n, 4, 10);cout endl;return 0;
} ①打开性能诊断 在debug模式下我们的调试选框打开后有一个选项叫做“性能诊断”不同版本的VS可能叫法不一样 点击后会重新打开一个窗口 ②查看结果 前面是系统内置函数可以发现我们自己写的函数就这两个的时间占比最多加起来达到了差不多70%
而在Deallocate中 再扒开ReleaseListToSpans看 可以看到是我们MapObjectToSpan这个函数里的这个锁消耗了很多时间其次就是我们central cache的那个桶锁 ③分析结果 上面的结果表示了我们自己实现的内存池效率低下的主要原因就是在锁的竞争上面也就是调用MapObjectToSpan函数访问映射关系时的加锁问题tcmalloc中针对这一点使用了基数树进行优化可以使在读取这个映射关系时可以不加锁 12.2 关于基数树
基数树是一个分层的哈希表根据层数可以分为单层基数树二层基数树三层基数树等 ①单层基数树 单层基数树采用直接定址法的哈希表key就是页号value就是一个指针本质就是一个数组 最终数组的长度就是2^19 524288每一个页号对应span的地址就存储在数组对应下标的位置总共占用位置 2^19 * 4 2 ^ 21 2M左右所以单层基数树只在32位下可以用如果放到64位下那么数组的大小就是2^51 * 8 2^23 G显然不行所以如果是64位我们要使用三层基数树
下面是单层基数树的代码当我们需要建立映射时调用set函数读取映射时调用get即可
template int BITS
class TCMalloc_PageMap1
{
public:typedef uintptr_t Number;explicit TCMalloc_PageMap1() {size_t size sizeof(void*) BITS; //要开辟的数组大小size_t alignSize SizeClass::_RoundUp(size, 1 PAGE_SHIFT);array_ (void**)SystemAlloc(alignSize PAGE_SHIFT);memset(array_, 0, sizeof(void*) BITS);}void* get(Number k) const {if ((k BITS) 0) return NULL;return array_[k]; //返回页号对应的span}void set(Number k, void* v) {assert((k BITS) 0); //k的范围必须在[0, 2^BITS - 1]之内array_[k] v;}
private:static const int LENGTH 1 BITS; //页的数量void** array_; //value就是一个指针
}; ②二层基数树 二层基数树就是在单层基数树的基础上把value的指针进行再映射比如用前5个比特位再基数树的第一层进行映射得到对应的第二层然后用剩下的比特位再进行映射最终得到该页的span* 第一层占用2^5 * 4 2^7比特空间第二层和单层基数树一样占用大约2M的空间一层基数树一开始就需要把2M的空间全部开出来而二层数组不是一开始只需要把第一层的数组搞出来当某一页号映射时再开辟对应的第二层数组即可
下面是二层基数树代码
template int BITS
class TCMalloc_PageMap2
{
private:static const int ROOT_BITS 5; //第一层给5个二进制位对应页号的前5个比特位static const int ROOT_LENGTH 1 ROOT_BITS; //第一层存储个个数static const int LEAF_BITS BITS - ROOT_BITS; //第二层对应页号的其余比特位static const int LEAF_LENGTH 1 LEAF_BITS; //第二层存储个数struct Leaf { void* values[LEAF_LENGTH]; }; //第一层数组中存储的元素类型Leaf* root_[ROOT_LENGTH]; //第一层数组void* (*allocator_)(size_t);
public:typedef uintptr_t Number;explicit TCMalloc_PageMap2() {memset(root_, 0, sizeof(root_)); //初始化第一层空间Ensure(0, 1 BITS);//在32位下。我们将二层基数树的两层数组全开辟出来也就消耗了2M的空间所以我们可以最开始就全搞出来64位下就要写在set里面了}void* get(Number k) const {const Number i1 k LEAF_BITS; //获取第一层对应下标const Number i2 k (LEAF_LENGTH - 1); //获取第二层对应下标if ((k BITS) 0 || root_[i1] NULL) return NULL; //当页号不在范围内或者无映射关系直接返回空return root_[i1]-values[i2];}void set(Number k, void* v) {const Number i1 k LEAF_BITS; //第一层对应下标const Number i2 k (LEAF_LENGTH - 1); //第二层对应下标ASSERT(i1 ROOT_LENGTH);root_[i1]-values[i2] v; //建立映射}//该函数是我为了保证当需要建立某一页号与span之间的映射关系时//需要先调用该函数确保用于映射的空间是已经开辟了的没有就现场开辟bool Ensure(Number start, size_t n) {for (Number key start; key start n - 1;) {const Number i1 key LEAF_BITS;if (i1 ROOT_LENGTH) return false; //映射页号超出返回返回错误if (root_[i1] NULL) //第一层对应下标空间未开辟直接现场开辟{static ObjectPoolLeaf leafPool;Leaf* leaf (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] leaf;}key ((key LEAF_BITS) 1) LEAF_BITS;}return true;}
}; ③三层基数树 看了单层和二层基数树那么三层基数树也不难了就是在二层基数树的基础上搞了三层数组映射三次 三层基数树适用于64位此时当要建立某一页号的映射关系时再开辟对应的数组空间而没有建立映射的页号就可以摆在那不管不开辟空间此时就能很大程度地节省内存空间 下面是三层基数树的代码
template int BITS
class TCMalloc_PageMap3
{
private:static const int INTERIOR_BITS (BITS 2) / 3; //第1和2层对应页号static const int INTERIOR_LENGTH 1 INTERIOR_BITS; //第1和2层存储元素static const int LEAF_BITS BITS - 2 * INTERIOR_BITS; //第三层对应页号的比特位个数static const int LEAF_LENGTH 1 LEAF_BITS; //第三次存储元素的个数struct Node { Node* ptrs[INTERIOR_LENGTH]; };struct Leaf { void* values[LEAF_LENGTH]; };Node* root_; //根节点Node* NewNode(){Node* result reinterpret_castNode*();if (result ! NULL) memset(result, 0, sizeof(*result));return result;}
public:typedef uintptr_t Number;explicit TCMalloc_PageMap3() { root_ NewNode(); } //构造函数void* get(Number k) const{const Number i1 k (LEAF_BITS INTERIOR_BITS); //获取第一层对应下标const Number i2 (k LEAF_BITS) (INTERIOR_LENGTH - 1); //第二层const Number i3 k (LEAF_LENGTH - 1); //第三层//页号超出返回或对应的空间未开辟if ((k BITS) 0 || root_-ptrs[i1] NULL || root_-ptrs[i1]-ptrs[i2] NULL) return NULL;return reinterpret_castLeaf*(root_-ptrs[i1]-ptrs[i2])-values[i3]; //返回页号对应的span*}void set(Number k, void* v){ASSERT(k BITS 0);const Number i1 k (LEAF_BITS INTERIOR_BITS); //第一层const Number i2 (k LEAF_BITS) (INTERIOR_LENGTH - 1); //第二层const Number i3 k (LEAF_LENGTH - 1); //第三层Ensure(k, 1); //确保映射第k页页号的空间是开辟好了的reinterpret_castLeaf*(root_-ptrs[i1]-ptrs[i2])-values[i3] v; //建立映射}bool Ensure(Number start, size_t n){for (Number key start; key start n - 1;){const Number i1 key (LEAF_BITS INTERIOR_BITS); //第一层对应的下标const Number i2 (key LEAF_BITS) (INTERIOR_LENGTH - 1); //第二层if (i1 INTERIOR_LENGTH || i2 INTERIOR_LENGTH) return false; //下标超出返回直接返回错if (root_-ptrs[i1] NULL) //第一层下标对应空间为开辟现场开辟{Node* n NewNode();if (n NULL) return false;root_-ptrs[i1] n;}if (root_-ptrs[i1]-ptrs[i2] NULL) //第二层下标对应空间未开辟{Leaf* leaf reinterpret_castLeaf*((*allocator_)(sizeof(Leaf)));if (leaf NULL) return false;memset(leaf, 0, sizeof(*leaf));root_-ptrs[i1]-ptrs[i2] reinterpret_castNode*(leaf);}key ((key LEAF_BITS) 1) LEAF_BITS;}return true;}
}; 12.3 使用基数树优化代码 ①然后我们就可以用基数树代替我们的unordered_map了 但是我们目前就在32位下进行测试因为64位和Linux一样需要做一些额外的处理 ②然后我们前面所有和映射相关的地方都要改下首先从NewSpan开始 ③然后就是释放函数 ①最后就是造成我们效率低下的罪魁祸首MapObjectToSpan函数了 //获取从对象到span的映射
Span* MapObjectToSpan(void* obj)
{PAGE_ID id ((PAGE_ID)obj PAGE_SHIFT); //算出页号//std::unique_lockstd::mutex lock(_pageMtx); //RAII的智能指针类自动构建锁和释放锁//auto ret _idSpanMap.find(id); //根据页号查找映射关系//if (ret ! _idSpanMap.end()) //找到了//{// return ret-second; //返回该对象属于的span指针//}//else //没找到//{// assert(false);// return nullptr;//}//用这个就不用加锁了其它的线程访问unordered_map的时候它底层的红黑树可能会旋转节点之间的关系可能会变所以要加锁//用哈希表的话线程1要扩容但是扩容后的结果还没替换原来的表时线程2也来扩容这样就会有很大的问题所以要加锁//1只有在ReleaseSpanToPageCache和NewSpan这两个函数中才会去建立id和span的映射也就是说会去写//2基数树写之前会提前开好空间写数据过程中不会动结构//3它的读写是分离的。假如线程1对一个位置读写的时候线程2不可能对这个位置写间接完成了锁的功能auto ret (Span*)_idSpanMap.get(id);assert(ret ! nullptr);return ret;
} STL的容器都是没有线程安全的C的map的底层是红黑树unordered_map的底层是哈希表两者再插入数据时底层的结构都有可能变化比如红黑树插入数据后会旋转哈希表插入后可能会扩容造成数据不一致问题需要加锁而基数树我们是采用的固定长度的空间不论什么时候去获取某个页的映射都是在一个固定的位置进行读取建立映射操作都是在page cache进行的也就是说读取映射的都是对应的span的_useCount不为0的页建立映射时都是对应的span的_useCount等于0的页所以读取和建立同一个页的映射操作不会同时进行 12.4 最终测试 现在可以看到我们的效率比malloc快了差不多十倍了 12.5 周边问题 问题tcmalloc是如何替代malloc的原理是什么 解答Google开源的tcmalloc是会直接替换malloc的不同平台替换的方式不同比如Unix系统上的glibc使用了weak alias的技术替换Windows使用了hook的钩子技术进行替换不用改原来的代码只需用钩子将代码使用malloc的地方勾过来让其只需该内存池的代码这样所有的malloc的调用都跳转到了tcmalloc的逻辑所谓的外挂就是这样搞得用来进行系统层面的函数更改 问题能否将threadcache和centralcache合并成一个减掉一层 解答不能前面说过CentralCache作用是承上启下假设去掉中心缓存那么thread cache就直接和central cache直接对接会产生许多问题 本来span的切分工作是central cache搞的去掉central cache后page cache中的span就同时存在切好的和未切好的那么就有可能给了thread cache一部分但是自己却仍然有一部分并且在申请内存时page cache不知道哪些是切好的哪些没切所以如何把这些混乱的span分配给thread cache是一个大问题page cache是用一个大锁把桶全部锁住的所以就效率这块远远没central cache的桶锁高得不偿失 总结 central cache负责均衡多个线程之间的同一个大小的内存需求central cache负责管理切好的spanpage cache负责管理大块span能够让span合理分配使用central cache的桶锁的效率远比page cache的大锁高 问题 thread cache销毁了但是但是它还有内存没释放给central cache怎么办假设线程有内存泄漏有可能还有一些内存没回来或者挂在thread cache中但是这个线程销毁了导致这些内存没有回到central cache怎么办 解答给项目注册一个回调函数规定只要线程结束就调用回调函数先把thread cache里的内存释放给central cache可以在创建TLS时就绑定回调函数在new线程的thread cache时绑定一个回调函数一旦线程挂了崩溃了就调用这个回调函数进行清理 自主实现的定长内存池源码高并发内存池/定长内存池 · 小堃学编程/项目集合 - 码云 - 开源中国
tcmalloc项目完整源码地址为GitHub - google/tcmalloc
或者国内镜像tcmalloc: TCMalloc (google-perftools) 是用于优化C写的多线程应用比glibc 2.3的malloc快。这个模块可以用来让MySQL在高并发下内存占用更加稳定。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/88063.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!