🌈个人主页:秦jh__https://blog.csdn.net/qinjh_?spm=1010.2135.3001.5343 🔥 系列专栏:https://blog.csdn.net/qinjh_/category_13016668.html

目录
从零实现一个高并发内存池
项目介绍
这个项目做的是什么?
什么是内存池
1.池化技术
2.内存池
3.内存池主要解决的问题
4.malloc
开胃菜–先设计一个定长的内存池
高并发内存池整体框架设计
高并发内存池–thread cache
高并发内存池–central cache
高并发内存池–page cache
前言
💬 hello! 各位铁子们大家好哇。
今日更新了高并发内存池的内容 🎉 欢迎大家关注🔍点赞👍收藏⭐️留言📝
从零实现一个高并发内存池
项目介绍
这个项目做的是什么?
当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称 Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存 分配相关的函数(malloc、free)。
这个项目是把tcmalloc最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的就 是学习tcamlloc的精华。
什么是内存池
1.池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过 量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快 捷,大大提高程序运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务 器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端 的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠 状态。
2.内存池
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接 向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操 作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
3.内存池主要解决的问题
内存池主要解决的当然是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎 片的问题。那么什么是内存碎片呢?
再需要补充说明的是内存碎片分为外碎片和内碎片,上面我们讲的外碎片问题。外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请 需求。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。
4.malloc
C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获 取内存的,而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程 序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。malloc的实现方 式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套, linux gcc用的glibc中的ptmalloc 。
开胃菜–先设计一个定长的内存池
作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景 下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能,下面我们就先来设计 一个定长内存池做个开胃菜,当然这个定长内存池在我们后面的高并发内存池中也是有价值的,所以学 习他目的有两层,先熟悉一下简单内存池是如何控制的,第二他会作为我们后面内存池的一个基础组 件。
代码样例:
ObjectPool.h
template<class T>
class ObjectPool
{
public:
T* New()
{
T* obj = nullptr;
//优先把还回来的内存块对象,再次重复利用
if (_freeList)
{
void* next = *((void**)_freeList); //next是自由链表的第一个节点的头四/八个字节,它存放下一个节点的地址
obj = (T*)_freeList;//obj就是第一块内存块
_freeList = next;//再让next变成第一个节点
}
else
{
if (_remainBytes < sizeof(T))//如果剩余内存小于所需内存,就去申请大块空间
{
_remainBytes = 128 * 1024;
_memory = (char*)malloc(_remainBytes);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
obj = (T*)_memory;//把大块内存切成小块,分出去
size_t objSize=sizeof(T)<sizeof(void*)?sizeof(void*):sizeof(T);//保证至少能存放下一个对象的地址
_memory += objSize;
_remainBytes -= objSize;
}
//malloc只开了空间,没有初始化,要进行初始化
//定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
void Delete(T* obj)
{
//if (_freeList == nullptr)
//{
//_freeList = obj;
////*(int*)obj=nullptr; 强转成int*类型,此时只能在32位机器上跑,因为64位机器上,指针大小是8字节。
//*(void**)obj = nullptr; //这里使用二级指针即可解决,void**解引用后,就是void*,此时还是指针,在32位机器上就是4字节,在64位机器上就是8字节,可以自动适应
//}
//else
//{
////头插
//*(void**)obj = _freeList; //先让小内存块指向自由链表的第一个节点,因为freeList指向的就是第一个节点。
//_freeList = obj; //接着再让obj变成第一个节点,让freelist指向boj。
//}
//显示调用析构函数清理对象
obj->~T();
//头插
*(void**)obj = _freeList; //实际上不需要区分情况,因为空的时候,直接头插也可以
_freeList = obj;
}
private:
char* _memory=nullptr;//因为一个char是一个字节,切内存块时好用。指向大块内存的指针
size_t _remainBytes = 0;//大块内存在切分过程中剩余字节数
void* _freeList=nullptr;//还回来过程中链接的自由链表的头指针
};
高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身 其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们 实现的内存池需要考虑以下几方面的问题。
concurrent memory pool主要由以下3个部分构成:
高并发内存池–thread cache
thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会 有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
申请内存:
释放内存:
高并发内存池–central cache
central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每 个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一 个个小内存块对象挂在span的自由链表中。
申请内存:
释放内存:
当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时– use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache, page cache中会对前后相邻的空闲页进行合并。
高并发内存池–page cache
申请内存:
释放内存:
整体代码
码云:https://gitee.com/qinjianhao0/project-warehouse/tree/master/FAPool
CentralCache.h
#pragma once
#include"Common.h"
//单例模式 懒汉
class CentralCache
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
// 获取一个非空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);
// 从中心缓存获取一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
// 将一定数量的对象释放到span跨度
void ReleaseListToSpans(void* start, size_t byte_size);
private:
SpanList _spanLists[NFREELIST];
private:
CentralCache()
{}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};
Common.h
#pragma once
#include<iostream>
#include<vector>
#include<unordered_map>
#include<algorithm>
#include<thread>
#include<mutex>
#include<time.h>
#include<assert.h>
using std::cout;
using std::endl;
#ifdef _WIN32
#include<windows.h>
#else
//…
#endif
// 小于等于MAX_BYTES,就找thread cache申请
// 大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;// thread cache 和 central cache自由链表哈希桶的表大小。thread cache中桶的个数,也就是自由链表的个数是208。
static const size_t NPAGES = 129;
static const size_t PAGE_SHIFT = 13; // <<PAGE_SHIFT 表示乘以8k的大小。页大小转换偏移, 即一页定义为2^13,也就是8KB
#ifdef _WIN64
typedef unsigned long long PAGE_ID; //64位下8字节
#elif _WIN32//32位下4字节
typedef size_t PAGE_ID;
#else
//linux
#endif
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage * (1 << 12), MEM_COMMIT | MEM_RESERVE,
PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
// sbrk unmmap等
#endif
}
// 获取内存对象中存储的头4 or 8字节值,即链接的下一个对象的地址
static void*& NextObj(void* obj)//返回引用后,就可以赋值
{
return *(void**)obj;
}
//管理切分好的小对象的自由链表
class FreeList
{
public:
void Push(void* obj)
{
assert(obj);
//头插
//*(void**)obj = _freeList;
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
void PushRange(void* start, void* end,size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
void PopRange(void*& start, void*& end, size_t n)
{
assert(n >= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n – 1; i++)
{
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
void* Pop()
{
assert(_freeList);
//头删
void* obj = _freeList;
_freeList = NextObj(obj);
return obj;
–_size;
}
bool Empty()
{
return _freeList == nullptr;
}
size_t& MaxSize()
{
return _maxSize;
}
size_t Size()
{
return _size;
}
private:
void* _freeList=nullptr;
size_t _maxSize = 1;
size_t _size = 0;
};
//计算对象大小的对齐映射规则
class SizeClass
{
public:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐freelist[0,16) //最开始最少就是8字节对齐,因为指针是4或者8字节,要取8字节算,这样能兼容4字节。
// [128+1,1024] 16byte对齐freelist[16,72)
// [1024+1,8*1024] 128byte对齐freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
//size_t _RoundUp(size_t size, size_t alignNum)
//{
//size_t alignSize;
//if (size % alignNum != 0)//如果不能整除
//{
//alignSize = (size / alignNum + 1) * alignNum;//向上对齐,如:(5/8+1)*8=8,即可把5对齐到8。
//}
//else
//{
//alignSize = size;
//}
//return alignSize;
//}
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
return (((bytes)+alignNum – 1) & ~(alignNum – 1));
}
//搞成静态的,就可以不用对象去调,里面也没有什么成员
static inline size_t RoundUp(size_t size)//把size向上对齐
{
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <=8* 1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64*1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256*1024)
{
return _RoundUp(size, 8 * 1024);
}
else
{
return _RoundUp(size, 1 << PAGE_SHIFT);
}
}
//size_t _Index(size_t bytes,size_t alignNum) alignNum指对齐数
//{
//if (bytes%alignNum==0)
//{
//return bytes / alignNum – 1;
//}
//else
//{
//return bytes / alignNum;
//}
//}
static inline size_t _Index(size_t bytes, size_t align_shift) //align_shift指2的几次方,align_shift==3时,左移3位就是乘以8,即2的3次方。
{
return ((bytes + (1 << align_shift) – 1) >> align_shift) – 1;
}
// 计算映射到哪一个自由链表桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 每个区间有多少个链
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128) {
return _Index(bytes, 3);
}
else if (bytes <= 1024) {
return _Index(bytes – 128, 4) + group_array[0];//记得加上前面的桶的数量
}
else if (bytes <= 8 * 1024) {
return _Index(bytes – 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024) {
return _Index(bytes – 8 * 1024, 10) + group_array[2] + group_array[1]
+ group_array[0];
}
else if (bytes <= 256 * 1024) {
return _Index(bytes – 64 * 1024, 13) + group_array[3] +
group_array[2] + group_array[1] + group_array[0];
}
else {
assert(false);
}
return -1;
}
// 一次thread cache从中心缓存获取多少个
static size_t NumMoveSize(size_t size) //批量要多个,这样多余的,下次就可以直接在thread cache中拿
{
assert(size > 0);
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
// 小对象一次批量上限高
// 大对象一次批量上限低
size_t num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
// 计算一次向系统获取几个页
// 单个对象 8byte
// …
// 单个对象 256KB
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num * size;//算出要多少个字节
npage >>= PAGE_SHIFT;//总字节数除以一页的字节数,得出需要几页。
if (npage == 0)//,不足一页的,最少也需要一页
npage = 1;
return npage;
}
};
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId=0;//大块内存起始页的页号
size_t _n=0;//页的数量
Span* _next=nullptr;//双向链表的结构
Span* _prev=nullptr;
size_t _objSize = 0;//切好的小对象的大小
size_t _useCount=0;//切好的小块内存,被分配给thread cache的计数
void* _freeList=nullptr;//切好的小块内存的自由链表
bool _isUse=false;//是否在被使用
};
//带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
bool Empty()
{
return _head->_next = _head;
}
void PushFront(Span* span)
{
Insert(Begin(), span);
}
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
//prev newspan pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx;//桶锁
};
ConcurrentAlloc.h
#pragma once
#include"Common.h"
#include"ThreadCache.h"
#include"PageCache.h"
#include"ObjectPool.h"
//搞成静态的,保持在当前文件可见。 如果全局的不给静态的话,在多个文件里面,链接属性会冲突。
static void* ConcurrentAlloc(size_t size)
{
if (size>MAX_BYTES)
{
size_t alignSize = SizeClass::RoundUp(size);
size_t kpage = alignSize >> PAGE_SHIFT;
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kpage);
span->_objSize = size;
PageCache::GetInstance()->_pageMtx.unlock();
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else
{
//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
static ObjectPool<ThreadCache> tcPool;
//pTLSThreadCache = new ThreadCache;
pTLSThreadCache = tcPool.New();
}
//cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
}
static void ConcurrentFree(void* ptr)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
size_t size = span->_objSize;
if (size > MAX_BYTES)
{
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
ObjectPool.h
#pragma once
#include"Common.h"
template<class T>
class ObjectPool
{
public:
T* New()
{
T* obj = nullptr;
//优先把还回来的内存块对象,再次重复利用
if (_freeList)
{
void* next = *((void**)_freeList); //next是自由链表的第一个节点的头四/八个字节,它存放下一个节点的地址
obj = (T*)_freeList;//obj就是第一块内存块
_freeList = next;//再让next变成第一个节点
}
else
{
if (_remainBytes < sizeof(T))//如果剩余内存小于所需内存,就去申请大块空间
{
_remainBytes = 128 * 1024;
_memory = (char*)malloc(_remainBytes);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
obj = (T*)_memory;//把大块内存切成小块,分出去
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);//保证至少能存放下一个对象的地址
_memory += objSize;
_remainBytes -= objSize;
}
//malloc只开了空间,没有初始化,要进行初始化
//定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
void Delete(T* obj)
{
//if (_freeList == nullptr)
//{
//_freeList = obj;
////*(int*)obj=nullptr; 强转成int*类型,此时只能在32位机器上跑,因为64位机器上,指针大小是8字节。
//*(void**)obj = nullptr; //这里使用二级指针即可解决,void**解引用后,就是void*,此时还是指针,在32位机器上就是4字节,在64位机器上就是8字节,可以自动适应
//}
//else
//{
////头插
//*(void**)obj = _freeList; //先让小内存块指向自由链表的第一个节点,因为freeList指向的就是第一个节点。
//_freeList = obj; //接着再让obj变成第一个节点,让freelist指向boj。
//}
//显示调用析构函数清理对象
obj->~T();
//头插
*(void**)obj = _freeList; //实际上不需要区分情况,因为空的时候,直接头插也可以
_freeList = obj;
}
private:
char* _memory=nullptr;//因为一个char是一个字节,切内存块时好用。指向大块内存的指针
size_t _remainBytes = 0;//大块内存在切分过程中剩余字节数
void* _freeList=nullptr;//还回来过程中链接的自由链表的头指针
};
//struct TreeNode
//{
//int _val;
//TreeNode* _left;
//TreeNode* _right;
//TreeNode()
//:_val(0)
//, _left(nullptr)
//, _right(nullptr)
//{}
//};
//void TestObjectPool()
//{
//// 申请释放的轮次
//const size_t Rounds = 3;
//// 每轮申请释放多少次
//const size_t N = 100000;
//std::vector<TreeNode*> v1;
//v1.reserve(N);
//
//size_t begin1 = clock();
//for (size_t j = 0; j < Rounds; ++j)
//{
//for (int i = 0; i < N; ++i)
//{
//v1.push_back(new TreeNode);
//}
//for (int i = 0; i < N; ++i)
//{
//delete v1[i];
//}
//v1.clear();
//}
//size_t end1 = clock();
//std::vector<TreeNode*> v2;
//v2.reserve(N);
//ObjectPool<TreeNode> TNPool;
//size_t begin2 = clock();
//
//for (size_t j = 0; j < Rounds; ++j)
//{
//for (int i = 0; i < N; ++i)
//{
//v2.push_back(TNPool.New());
//}
//for (int i = 0; i < N; ++i)
//{
//TNPool.Delete(v2[i]);
//}
//v2.clear();
//}
//size_t end2 = clock();
//cout << "new cost time:" << end1 – begin1 << endl;
//cout << "object pool cost time:" << end2 – begin2 << endl;
//}
PageCache.h
#pragma once
#include"Common.h"
#include"ObjectPool.h"
#include"PageMap.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
// 获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
// 释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
// 获取一个K页的span
Span* NewSpan(size_t k);
std::mutex _pageMtx;
private:
SpanList _spanLists[NPAGES];
ObjectPool<Span> _spanPool;
//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
TCMalloc_PageMap1<32 – PAGE_SHIFT> _idSpanMap;
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
PageMap.h
#pragma once
#include"Common.h"
// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap1() {
//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize>>PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
void set(Number k, void* v) {
array_[k] = v;
}
};
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS – ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap2() {
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH – 1);
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH – 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n – 1;) {
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL) {
//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
static ObjectPool<Leaf> leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
// How many bits should we consume at each interior level
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
// How many bits should we consume at leaf level
static const int LEAF_BITS = BITS – 2 * INTERIOR_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Interior node
struct Node {
Node* ptrs[INTERIOR_LENGTH];
};
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Node* root_; // Root of radix tree
void* (*allocator_)(size_t); // Memory allocator
Node* NewNode() {
Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
if (result != NULL) {
memset(result, 0, sizeof(*result));
}
return result;
}
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
allocator_ = allocator;
root_ = NewNode();
}
void* get(Number k) const {
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH – 1);
const Number i3 = k & (LEAF_LENGTH – 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
return NULL;
}
return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
}
void set(Number k, void* v) {
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH – 1);
const Number i3 = k & (LEAF_LENGTH – 1);
reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n – 1;) {
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH – 1);
// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL) {
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL) {
Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)
(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
}
};
使用tcmalloc源码中实现基数树进行优化
ThreadCache.h
#pragma once
#include"Common.h"
class ThreadCache
{
public:
//申请和释放内存对象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
// 从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);
// 释放对象时,链表过长时,回收内存回到中心缓存
void ListTooLong(FreeList& list, size_t size);
private:
FreeList _freeLists[NFREELIST];
};
//TLS thread local storage //每个线程起来后,就会调用申请内存的接口获得thread cache,而不是直接就有各自的thread cache
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr; //让变量都只在它所在的线程可全局访问,但不能被其他线程访问到。TLS(线程局部存储)
Benchmark.h
#include"ConcurrentAlloc.h"
//ntimes 一轮申请和释放内存的次数
//rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc(16));
//v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 – begin1);
free_costtime += (end2 – begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\\n",
nworks, rounds, ntimes, malloc_costtime);
printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\\n",
nworks, rounds, ntimes, free_costtime);
printf("%u个线程并发malloc&free %u次,总计花费:%u ms\\n",
nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(ConcurrentAlloc(16));
//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 – begin1);
free_costtime += (end2 – begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\\n",
nworks, rounds, ntimes, malloc_costtime);
printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\\n",
nworks, rounds, ntimes, free_costtime);
printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\\n",
nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}
int main()
{
size_t n = 10000;
cout << "==========================================================" <<
endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" <<
endl;
return 0;
}
多线程并发环境下,对比malloc和ConcurrentAlloc申请和释放内存效率对比
CentralCache.cpp
#include"CentralCache.h"
#include"PageCache.h"
CentralCache CentralCache::_sInst;
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
//查看当前的spanlist中是否还有未分配对象的span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}
//先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
list._mtx.unlock();
//走到这里说明没有空闲的span了,只能找page cache要
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_isUse = true;
span->_objSize = size;
PageCache::GetInstance()->_pageMtx.unlock();
//对获取span进行切分,不需要加锁,因为这时候其他线程访问不到这个span
//计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
//把大块内存切成自由链表链接起来
//1.先切一块下来去做头,方便尾插
span->_freeList = start;
start += size;
void* tail = span->_freeList;
int i = 1;
while (start < end)
{
++i;
NextObj(tail) = start;
tail = NextObj(tail);// 或者tail=start
start += size;
}
NextObj(tail) = nullptr;
//切好span以后,需要把span挂到桶里去的时候,再加锁
list._mtx.lock();
list.PushFront(span);
return span;
}
// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);//找出是哪个桶的
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index], size);
assert(span);
assert(span->_freeList);
//从span中获取batchNum个对象
//如果不够batchNum个,有多少拿多少
start = span->_freeList;
end = start;
size_t i = 0;
size_t actualNum = 1;//因为是少走一步的,所以起始就已经有一个了
while(i < batchNum – 1 && NextObj(end)!=nullptr)
{
end = NextObj(end);
i++;
++actualNum;
}
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
span->_useCount += actualNum;
_spanLists[index]._mtx.unlock();
return actualNum;
}
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);//根据size算出是哪个桶的
_spanLists[index]._mtx.lock();
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);//找到对应的span
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount–;
//==0,说明span切分出去的所有小块内存都回来了
//这个span就可以再回收给page cache,pagecache就可以再尝试去做前后页的合并
if (span->_useCount == 0)
{
_spanLists[index].Erase(span);//把满了的span从链表里拿出来
span->_freeList = nullptr;//虽然这个span里的小块内存已经无序,但是我们已经知道起始和终止地址,所以可以置空
span->_next = nullptr;
span->_prev = nullptr;
//释放span给page cache时,使用page cache的锁即可
//这时要把桶锁解掉
_spanLists[index]._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
_spanLists[index]._mtx.lock();
}
start = next;
}
_spanLists[index]._mtx.unlock();
}
PageCache.cpp
#include"PageCache.h"
PageCache PageCache::_sInst;
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
//大于128 page的直接向堆申请
if (k > NPAGES – 1)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
//_idSpanMap[span->_pageId] = span;
_idSpanMap.set(span->_pageId, span);
return span;
}
//先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i,kSpan);
}
return kSpan;
}
//检查一下后面的桶里有没有span,如果有可以把它进行切分
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
//Span* kSpan = new Span;
Span* kSpan = _spanPool.New();
//在nSpan的头部切一个k页下来
//k页的span返回
//nSpan再挂到对应映射的位置
kSpan->_pageId = nSpan->_pageId; //切出来的页号给kSpan
kSpan->_n = k; //切出来的页数是k
nSpan->_pageId += k;//nSpan的起始页号增加
nSpan->_n -= k;//nSpan的页数减少k
_spanLists[nSpan->_n].PushFront(nSpan);//把nSpan头插到对应的桶上
//这里只需要首尾映射,因为合并的时候,只需要找到首或尾的即可
//存储nSpan的首尾页号跟nSpan映射,方便page cache回收内存时进行合并查找
//_idSpanMap[nSpan->_pageId] = nSpan;
//_idSpanMap[nSpan->_pageId+nSpan->_n-1] = nSpan;
_idSpanMap.set(nSpan->_pageId, nSpan);
_idSpanMap.set(nSpan->_pageId + nSpan->_n – 1, nSpan);
//这里是因为kspan会被切成一小块一小块的,所以需要每一个小块内存都映射
//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n;++i)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
}
//走到这个位置就说明后面没有大页的span了
//这时就去找堆要一个128页的span
//Span* bigSpan = new Span;
Span* bigSpan = _spanPool.New();
void* ptr = SystemAlloc(NPAGES – 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES – 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
//std::unique_lock<std::mutex> lock(_pageMtx);
//
//auto ret = _idSpanMap.find(id);
//if (ret != _idSpanMap.end())
//{
//return ret->second;
//}
//else
//{
//assert(false);
//return nullptr;
//}
auto ret = (Span*)_idSpanMap.get(id);
assert(ret != nullptr);
return ret;
}
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//大于128 page的直接还给堆
if (span->_n > NPAGES – 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span);
return;
}
//对span前后的页,尝试进行合并,缓解内存碎片问题
while (1)
{
PAGE_ID prevId = span->_pageId – 1;
//auto ret = _idSpanMap.find(prevId);
////前面的页号没有,不合并了
//if (ret == _idSpanMap.end())
//{
//break;
//}
auto ret = (Span*)_idSpanMap.get(prevId);
if (ret == nullptr)
{
break;
}
//前面相邻页的span在使用,不合并了
Span* prevSpan = ret;
if (prevSpan->_isUse == true)
{
break;
}
//合并出超过128页的span没办法管理,也不合并
if (prevSpan->_n + span->_n > NPAGES – 1)
{
break;
}
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
//delete prevSpan;
_spanPool.Delete(prevSpan);
}
//向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
/*auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end())
{
break;
}*/
auto ret = (Span*)_idSpanMap.get(nextId);
if (ret == nullptr)
{
break;
}
Span* nextSpan = ret;
if (nextSpan->_isUse == true)
{
break;
}
if (nextSpan->_n + span->_n > NPAGES – 1)
{
break;
}
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
//delete nextSpan;
_spanPool.Delete(nextSpan);
}
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
//_idSpanMap[span->_pageId] = span;
//_idSpanMap[span->_pageId + span->_n – 1] = span;
_idSpanMap.set(span->_pageId, span);
_idSpanMap.set(span->_pageId + span->_n – 1, span);
}
ThreadCache.cpp
#include"ThreadCache.h"
#include"CentralCache.h"
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//慢开始反馈调节算法
//1.最开始不会一次向central cache一次批量要太多,因为要太多可能用不完
//2.如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
//3.size越大,一次向central cache要的batchNum就越小
//4.size越小,一次向central cache要的batchNum就越大
size_t batchNum =min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;//最开始MaxSize是1,所以慢慢增长,一次增长一个,越到后面,一次批量给的就越多。
}
void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);//实际span里挂的链表的对象可能不足batchNum个
assert(actualNum > 0);
if (actualNum == 1)
{
assert(start == end);
return start;
}
else
{
_freeLists[index].PushRange(NextObj(start), end,actualNum-1); //第一个start要返回
return start;
}
}
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size); //计算在哪一个桶(自由链表)里面取
if (!_freeLists[index].Empty()) //如果自由链表不为空,就在这里拿
{
return _freeLists[index].Pop();
}
else
{
return FetchFromCentralCache(index, alignSize); //否则就去中心缓存拿
}
}
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
//找对应映射的自由链表桶,把对象插入进去
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
//当链表长度大于一次批量申请的内存时就开始还一段list给central cache
if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
{
ListTooLong(_freeLists[index], size);
}
}
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
list.PopRange(start, end, list.MaxSize());
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
UnitTest.cpp
#include"ObjectPool.h"
#include"ConcurrentAlloc.h"
void Alloc1()
{
for (size_t i = 0; i < 5; i++)
{
void* ptr = ConcurrentAlloc(6);
}
}
void Alloc2()
{
for (size_t i = 0; i <5 ; i++)
{
void* ptr = ConcurrentAlloc(7);
}
}
void TLSTest()
{
std::thread t1(Alloc1);
t1.join();
std::thread t2(Alloc2);
t2.join();
}
void TestConcurrentAlloc1()
{
void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);
void* p4 = ConcurrentAlloc(7);
void* p5 = ConcurrentAlloc(8);
cout << p1 << endl;
cout << p2 << endl;
cout << p3 << endl;
cout << p4 << endl;
cout << p5 << endl;
ConcurrentFree(p1);
ConcurrentFree(p2);
ConcurrentFree(p3);
ConcurrentFree(p4);
ConcurrentFree(p5);
}
void TestConcurrentAlloc2()
{
for (size_t i = 0; i < 1024; i++)
{
void* p1 = ConcurrentAlloc(6);
cout << p1 << endl;
}
void* p2 = ConcurrentAlloc(8);
cout << p2 << endl;
}
void TestAddressShift()
{
PAGE_ID id1 = 2000;
PAGE_ID id2 = 2000;
char* p1 = (char*)(id1 << PAGE_SHIFT);
char* p2 = (char*)(id1 << PAGE_SHIFT);
while (p1 < p2)
{
cout << (void*)p1 << ":" << ((PAGE_ID)p1 >> PAGE_SHIFT) << endl;
p1 += 8;
}
}
//int main()
//{
////TestObjectPool();
////TLSTest();
//TestConcurrentAlloc1();
//return 0;
//}
网硕互联帮助中心






评论前必须登录!
注册