ConcurrentHashMap
如何正确使用ConcurrentHashMap?
错误示例
public static void main(String[] args) {
demo(
// 创建 map 集合,使用 ConcurrentHashMap
() -> new ConcurrentHashMap<String, Integer>(),
(map, words) -> {
for (String word : words) {
Integer counter = map.get(word);
int newValue = counter == null ? 1 : counter + 1;
map.put(word, newValue);
}
}
);
}
ConcurrentHashMap的线程安全是指它每一个方法是原子的,get/put是原子的,但是get+put不是原子的。
如何解决
public static void main(String[] args) {
demo(
// 创建 ConcurrentHashMap 的工厂方法
() -> new ConcurrentHashMap<String, Integer>(),
// 处理 map 和 words 的逻辑
(map, words) -> {
for (String word : words) {
// 同步块,锁定 map 对象
synchronized (map) {
Integer counter = map.get(word);
int newValue = counter == null ? 1 : counter + 1;
map.put(word, newValue);
}
}
}
);
}
可以直接加锁来保证多个操作的原子性,但是并不好,ConcurrentHashMap底层是通过更加细粒度的锁来实现的,直接给整个map加锁先当于直接退化了,并发度和性能都不高
解法:
public class Main {
public static void main(String[] args) {
// 调用 demo 方法,传入两个 Lambda 表达式:
// 1. 用于创建 ConcurrentHashMap 的工厂
// 2. 用于处理 map 和 words 的逻辑
demo(
// 创建 ConcurrentHashMap<String, LongAdder> 的工厂方法
// 延迟创建集合,通过 Lambda 表达式封装创建逻辑
() -> new ConcurrentHashMap<String, LongAdder>(),
// 处理 map 和 words 的逻辑,BiConsumer 接收两个参数:
// ConcurrentHashMap<String, LongAdder> map 和 单词集合 words(推测为 String 数组/集合)
(map, words) -> {
// 遍历单词集合中的每个单词
for (String word : words) {
// computeIfAbsent 是 ConcurrentHashMap 的原子操作:
// 1. 检查 map 中是否存在指定 key(word)
// 2. 如果不存在,则执行 (key) -> new LongAdder() 创建新的 LongAdder 作为 value
// 3. 如果存在,则直接获取已有的 LongAdder
// 该操作是线程安全的,适合多线程环境
LongAdder value = map.computeIfAbsent(word, (key) -> new LongAdder());
// 执行累加操作,LongAdder 是线程安全的累加器,比 AtomicLong 更适合高并发场景
value.increment();
// 以下是被注释的旧实现逻辑,保留用于对比说明:
/*
// 旧逻辑:先 get 再 put,非原子操作(多线程下可能有问题)
Integer counter = map.get(word);
int newValue = counter == null ? 1 : counter + 1;
map.put(word, newValue);
*/
}
}
);
}
computeIfAbsent保证三个操作的原子性,LongAdder保证累加操作是原子的
HashMap并发死链(jdk7)
究其原因,是因为在多线程环境下使用了非线程安全的 map 集合
JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能 够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)
jdk8
构造器分析
可以看到实现了懒惰初始化,在构造方法中仅仅计算了 table 的大小,以后在第一次使用时才会真正创建
// ConcurrentHashMap 的构造函数,允许指定初始容量、负载因子、并发级别
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
// 1. 参数合法性校验
// 负载因子必须 > 0,初始容量不能 < 0,并发级别不能 <= 0
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) {
throw new IllegalArgumentException();
}
// 2. 调整初始容量(与并发级别关联)
// 如果初始容量小于并发级别,将初始容量调整为并发级别
// 原因:ConcurrentHashMap 内部用分段(桶)存储数据,至少要保证桶数量 >= 并发级别,
// 避免哈希冲突过于集中,影响并发性能
if (initialCapacity < concurrencyLevel) {
initialCapacity = concurrencyLevel;
}
// 3. 计算表的目标大小(基于初始容量和负载因子)
// 公式:size = 1.0 + (初始容量 / 负载因子),向上取整(转为 long 避免溢出)
// 作用:根据初始容量和负载因子,估算需要的哈希表容量,为后续调整为 2 的幂做准备
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// 4. 限制容量最大值
// MAXIMUM_CAPACITY 是 ConcurrentHashMap 允许的最大容量(通常是 1<<30 等,不同版本可能有差异)
// 如果计算出的 size 超过最大值,就用 MAXIMUM_CAPACITY,否则调整为最接近的 2 的幂
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY :
// tableSizeFor 方法会返回大于等于给定数值的最小的 2 的幂(如输入 5 返回 8,输入 8 返回 8 )
tableSizeFor((int)size);
// 5. 初始化 sizeCtl 字段
// sizeCtl 在 ConcurrentHashMap 中是一个多功能的控制字段:
// – 初始化时,存储哈希表的容量(后续会调整为其他状态,如 -1 表示正在初始化 )
// – 扩容时,用于协调多线程的扩容操作等
this.sizeCtl = cap;
}
get流程
全程不加锁
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 1. 计算 key 的哈希值:
// – 先调用 key.hashCode() 获取原始哈希
// – spread 方法通过异或和位运算,确保结果为正数(避免哈希冲突)
int h = spread(key.hashCode());
// 2. 检查哈希表状态:
// – tab = table:获取当前哈希表数组
// – n = tab.length > 0:哈希表已初始化且长度有效
// – e = tabAt(…):通过 tabAt 方法(线程安全)获取桶位置的头节点
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n – 1) & h)) != null) {
// 3. 检查头节点是否匹配目标 key:
// – eh = e.hash:获取头节点的哈希值
// – 先对比哈希值(快速筛选),再对比 key 对象(== 或 equals)
if ((eh = e.hash) == h) {
if (((ek = e.key) == key) || (ek != null && key.equals(ek)))
return e.val; // 找到匹配,返回值
}
// 4. 处理特殊节点(哈希值为负数的情况):
// – eh < 0 表示节点是 ForwardingNode(扩容中)或 TreeBin(红黑树)
// – 调用 find 方法递归查找(扩容时遍历新、旧表;红黑树时按树结构查找)
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 5. 遍历链表查找匹配的 key:
// – 哈希值匹配时,再对比 key 对象(== 或 equals)
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val; // 找到匹配,返回值
}
}
// 6. 未找到 key,返回 null
return null;
}
计算哈希:
通过 spread 方法计算 key 的哈希值,确保结果为正数,避免哈希冲突。
检查哈希表状态:
验证哈希表是否初始化、长度是否有效,并用 tabAt 线程安全地获取桶位置的头节点。
匹配头节点:
先对比头节点的哈希值,再用 == 或 equals 对比 key,匹配则直接返回值。
处理特殊节点:
若头节点哈希为负数(扩容中或红黑树),调用 find 方法递归查找目标 key。
遍历链表查找:
哈希值匹配时,逐一遍历链表节点,用 == 或 equals 对比 key,找到则返回值。
未找到返回 null:
若遍历完所有节点仍未匹配,返回 null。
put流程
以下数组简称(table),链表简称(bin)
public V put(K key, V value) {
return putVal(key, value, false);
}
- 作用:对外提供 put 操作的入口。
- 参数:
- key:键(不能为 null)。
- value:值(不能为 null)。
- onlyIfAbsent:是否仅在键不存在时插入(false 表示覆盖旧值)。
- 返回值:旧值(如果没有旧值则返回 null)。
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 1. 校验 key 和 value 不为 null
if (key == null || value == null) throw new NullPointerException();
// 2. 计算 key 的哈希值
int hash = spread(key.hashCode());
int binCount = 0; // 记录链表长度(用于判断是否需要树化)
// 3. 无限循环,确保操作完成
for (Node<K, V>[] tab = table; ; ) {
Node<K, V> f; // 当前桶的头节点
int n, i, fh; // n: 哈希表长度; i: 桶的下标; fh: 头节点的哈希值
// 3.1 初始化哈希表(延迟初始化)
if (tab == null || (n = tab.length) == 0) {
tab = initTable(); // 使用 CAS 初始化,无需 synchronized
}
// 3.2 定位桶的位置,并检查桶是否为空
else if ((f = tabAt(tab, i = (n – 1) & hash)) == null) {
// 桶为空,使用 CAS 插入新节点
if (casTabAt(tab, i, null, new Node<>(hash, key, value, null))) {
break; // 插入成功,退出循环
}
}
// 3.3 处理扩容中的 ForwardingNode(简化版,实际 JDK 中需要处理扩容逻辑)
else if ((fh = f.hash) == -1) {
// 帮助扩容(简化版,实际 JDK 中需要迁移数据)
tab = table; // 重新获取最新的哈希表
}
// 3.4 处理非空桶(链表或红黑树)
else {
V oldVal = null;
// 加锁(简化版,实际 JDK 中使用 synchronized 或 CAS 锁)
synchronized (f) {
if (tabAt(tab, i) == f) { // 再次检查头节点是否变化
if (fh >= 0) { // 头节点是普通节点(链表)
binCount = 1;
// 遍历链表
for (Node<K, V> e = f; ; ++binCount) {
K ek;
// 3.4.1 找到相同 key 的节点
if (e.hash == hash &&
((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent) {
e.val = value; // 更新值
}
break;
}
Node<K, V> pred = e;
// 3.4.2 遍历到链表末尾,插入新节点
if ((e = e.next) == null) {
pred.next = new Node<>(hash, key, value, null);
break;
}
}
}
// 3.5 处理红黑树(简化版,实际 JDK 中需要树化逻辑)
else {
// … 树化逻辑(省略) …
}
}
}
// 3.6 判断是否需要树化(简化版,实际 JDK 中需要判断链表长度)
if (binCount != 0) {
if (binCount >= 8) {
// 链表转红黑树(简化版,实际 JDK 中需要树化操作)
}
if (oldVal != null) {
return oldVal; // 返回旧值
}
break;
}
}
}
// 4. 更新 size(简化版,实际 JDK 中需要 CAS 更新)
// … 省略更新 size 的逻辑 …
return null; // 返回旧值(如果没有旧值则返回 null)
}
校验参数:
检查 key 和 value 是否为 null,若为 null 则抛出 NullPointerException。
计算哈希:
通过 spread 方法计算 key 的哈希值,确保哈希分布均匀。
初始化哈希表:
如果哈希表未初始化(tab == null 或长度为 0),则通过 initTable 延迟初始化。
定位桶并插入节点:
- 如果桶为空,使用 CAS 插入新节点,避免 synchronized。
- 如果桶不为空,遍历链表或红黑树,找到相同 key 则更新值,否则插入新节点。
处理扩容:
如果遇到扩容中的 ForwardingNode,协助完成扩容(简化版,实际逻辑更复杂 )。
链表树化:
如果链表长度超过阈值(默认 8),将链表转换为红黑树,提升查询效率。(仅仅只锁住桶)
更新 size:
使用 CAS 更新哈希表的大小(简化版,实际逻辑更复杂 )。
返回旧值:
如果插入前存在旧值,则返回旧值;否则返回 null。
如果遇到扩容中的 ForwardingNode,协助完成扩容(锁住链表帮忙扩容)
initTable流程
保证只有一个线程能创建hash表,其他线程都是盲等待
private final Node<K, V>[] initTable() {
Node<K, V>[] tab;
int sc;
// 循环尝试初始化:只要 table 未初始化(null 或长度 0 ),就持续尝试
while ((tab = table) == null || tab.length == 0) {
// sizeCtl < 0 说明有其他线程正在初始化,当前线程主动让出 CPU,避免竞争
if ((sc = sizeCtl) < 0) {
Thread.yield();
}
// 尝试用 CAS 将 sizeCtl 从 sc(原状态)改为 -1,标记“正在初始化”
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 二次检查:防止其他线程已经初始化了 table
if ((tab = table) == null || tab.length == 0) {
// 确定初始化容量:有自定义容量(sc > 0 )则用,否则用默认 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 创建 Node 数组,长度为 n,作为哈希表的存储结构
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
// 将新数组赋值给 table,完成初始化
table = tab = nt;
// 计算扩容阈值:n – (n >>> 2) 等价于 n * 0.75(负载因子 0.75 )
sc = n – (n >>> 2);
}
} finally {
// 更新 sizeCtl 为扩容阈值,后续用于判断是否需要扩容
sizeCtl = sc;
}
break; // 初始化完成,跳出循环
}
}
return tab; // 返回初始化好的哈希表数组
}
持续检查 table 是否未初始化(null 或长度为 0 ),未初始化则进入初始化流程。
若 sizeCtl < 0,说明其他线程正在初始化,当前线程调用 Thread.yield() 让出 CPU,避免冲突。
通过 UNSAFE.compareAndSwapInt 将 sizeCtl 设为 -1,标记 “当前线程正在初始化”,保证线程安全。
再次确认 table 未被其他线程初始化,防止竞争导致重复创建。
自定义容量 sc > 0 则用 sc,否则用默认容量(如 16 )。
新建长度为 n 的 Node 数组,作为哈希表的底层存储。
计算 sizeCtl 为 n – (n >>> 2)(即 0.75 * n ),作为后续扩容的触发条件。
更新 sizeCtl 并跳出循环,返回初始化好的 table。
addCount流程
利用了Longadder的思想,设置了多个累加单元,减少CAS操作的冲突
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 分支 1:优先使用 CounterCells 或 CAS 更新 baseCount
if ((as = counterCells) != null ||
// 若 counterCells 不存在,尝试 CAS 更新 baseCount(基础计数 )
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true; // 标记 Cell 的 CAS 是否成功
// 分支 2:处理 CounterCells 的竞争
if (as == null || (m = as.length – 1) < 0 ||
// 计算线程对应的 Cell 下标(线程本地随机数 & 长度 -1 )
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// 尝试 CAS 累加 Cell 的值
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 竞争失败,进入全量累加逻辑(创建 Cell 或扩容 CounterCells )
fullAddCount(x, uncontended);
return;
}
// 分支 3:检查是否需要触发扩容(check 参数控制 )
if (check <= 1)
return;
// 计算总计数(baseCount + 所有 Cell 的值 )
s = sumCount();
}
// 分支 4:根据总计数检查是否需要扩容哈希表
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) { // 哈希表正在扩容
// 检查扩容状态,避免重复扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else { // 哈希表未在扩容,尝试触发扩容
if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
s = sumCount(); // 重新计算总计数,判断是否仍需扩容
}
}
}
优先累加尝试:
若 CounterCells 存在则用其累加,否则尝试 CAS 更新 baseCount,避免锁竞争。
处理 Cell 竞争:
计算线程对应的 Cell 下标,尝试 CAS 累加 Cell 的值;若失败则进入 fullAddCount 处理。
全量累加逻辑:
竞争失败时,创建新 Cell 或扩容 CounterCells,保证计数不丢失。
检查扩容条件:
根据 check 参数判断是否需要触发哈希表扩容,通过 sumCount 计算总计数。
触发哈希表扩容:
若总计数超过阈值,通过 transfer 方法扩容哈希表,多线程协作迁移数据。
size计算流程
借鉴 LongAdder 的设计思路,将计数分为 baseCount(无竞争 )和 counterCells(有竞争 ),size 计算时再累加,避免实时维护精确计数带来的高并发竞争。
高并发下,baseCount 和 counterCells 的更新可能存在延迟,size 方法返回的是 “近似准确” 的计数(最终会一致,但获取瞬间可能有偏差 ),这是为了换取高并发场景下的性能提升。
- size 计算实际发生在 put、remove 等改变集合元素的操作之中
- 计数累加规则:
- 没有竞争发生时,向 baseCount 累加计数
- 有竞争发生时,新建 counterCells,向其中一个 cell 累加计数
- counterCells 初始有两个 cell
- 若计数竞争较激烈,会创建新的 cell 来累加计数
// size 方法,用于获取 ConcurrentHashMap 中元素的数量
public int size() {
// 调用 sumCount 方法,计算 baseCount 和所有 counterCells 中的计数值总和
long n = sumCount();
// 处理计数值的边界情况:
// – 如果总和 n 小于 0,返回 0(理论上不会出现,这里做防御性处理 )
// – 如果总和 n 超过 Integer 的最大值,返回 Integer.MAX_VALUE
// – 否则,将 long 类型的总和转换为 int 类型返回
return (n < 0L) ? 0 :
(n > (long) Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) n;
}
// sumCount 方法,计算 ConcurrentHashMap 的总计数
final long sumCount() {
// 获取当前的分段计数数组 counterCells
CounterCell[] as = counterCells;
CounterCell a;
// 先将基础计数 baseCount 的值累加到总和中,baseCount 用于无竞争场景的计数
long sum = baseCount.get();
// 如果分段计数数组不为 null,说明存在竞争场景,需要遍历累加每个 Cell 的计数值
if (as != null) {
for (int i = 0; i < as.length; ++i) {
// 取出当前 Cell,若 Cell 不为 null,将其计数值累加到总和
if ((a = as[i]) != null) {
sum += a.value;
}
}
}
// 返回累加后的总计数,代表 ConcurrentHashMap 中元素的大致数量(高并发下可能有延迟 )
return sum;
}
调用 sumCount 方法,获取 ConcurrentHashMap 中元素数量的总计数。
进入 sumCount 方法,开始计算总计数。
先将 baseCount(无竞争场景的计数 )的值累加到总和 sum 中。
若 counterCells(分段计数数组,有竞争场景使用 )不为 null,遍历数组中的每个 CounterCell,将非 null Cell 的计数值累加到 sum。
sumCount 返回累加后的总计数,代表集合元素的大致数量。
对 sumCount 返回的总计数做边界处理,转换为 int 类型返回,超过 Integer.MAX_VALUE 则返回最大值,小于 0 则返回 0 。
transfer流程
多线程协作扩容:
通过 transferIndex 分配迁移范围,允许多线程同时迁移不同桶,提升扩容效率。
渐进式迁移:
每次迁移一个桶,避免长时间持有锁,保证高并发下的响应性。
自动重定向:
ForwardingNode 自动重定向访问到新数组,保证扩容时读写操作的正确性。
链表与红黑树拆分:
适配不同节点类型(链表 / 红黑树 ),保证拆分和迁移的效率。
// 标记节点正在迁移(hash 值)
private static final int MOVED = -1;
// CPU 核心数
private static final int NCPU = Runtime.getRuntime().availableProcessors();
// 最小迁移步长
private static final int MIN_TRANSFER_STRIDE = 16;
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算每个线程的迁移步长(stride)
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // 保证步长不小于 16
// 初始化 nextTab(扩容后的新数组)
if (nextTab == null) {
try {
nextTab = (Node<K,V>[])new Node<?,?>[n << 1]; // 新容量为原容量的 2 倍
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab; // 记录新数组
transferIndex = n; // 初始化迁移索引
}
int nextn = nextTab.length;
// 创建 ForwardingNode,标记当前桶正在迁移
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; // 标记是否继续迁移下一个桶
boolean finishing = false; // 标记是否完成所有迁移
// 循环迁移每个桶
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 确定当前线程的迁移范围
while (advance) {
int nextIndex, nextBound;
if (–transferIndex >= 0) {
nextIndex = transferIndex;
nextBound = (nextIndex > stride ? nextIndex – stride : 0);
bound = nextBound;
i = nextIndex;
advance = false;
} else {
advance = false;
if (finishing) { // 所有桶迁移完成
nextTable = null;
table = nextTab; // 替换为新数组
sizeCtl = (n << 1) – (n >>> 1); // 更新扩容阈值
return;
}
}
}
// 检查当前桶是否越界
if (i < 0 || i >= n || i + n >= nextn)
continue;
// 获取当前桶的头节点
if ((f = tabAt(tab, i)) == null) {
advance = casTabAt(tab, i, null, fwd); // 标记桶为迁移中
}
// 处理正在迁移的桶(跳过)
else if ((fh = f.hash) == MOVED) {
advance = true;
}
// 迁移当前桶的节点
else {
synchronized (f) { // 加锁当前桶的头节点
if (tabAt(tab, i) == f) { // 二次检查,防止并发修改
Node<K,V> ln, hn;
if (fh >= 0) { // 头节点是普通链表节点
int runBit = fh & n; // 计算迁移后的位置
Node<K,V> lastRun = f;
Node<K,V> p = f.next;
// 找到最后一段连续相同 runBit 的节点
while (p != null) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
p = p.next;
}
// 将最后一段节点直接放入新桶
if (runBit == (fh & n)) {
ln = lastRun;
hn = null;
} else {
// 拆分链表为两部分
ln = null;
hn = lastRun;
for (p = f; p != lastRun; p = p.next) {
int b = p.hash & n;
Node<K,V> newNode = new Node<K,V>(p.hash, p.key, p.val, null);
if (b == runBit)
ln = (ln == null) ? newNode : ln.next = newNode;
else
hn = (hn == null) ? newNode : hn.next = newNode;
}
}
// 将拆分后的链表放入新桶
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 标记当前桶已迁移
setTabAt(tab, i, fwd);
advance = true;
} else if (f instanceof TreeBin) { // 头节点是红黑树
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 遍历红黑树节点,拆分到新桶
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if (loTail == null) lo = p;
else loTail.next = p;
loTail = p;
lc++;
} else {
if (hiTail == null) hi = p;
else hiTail.next = p;
hiTail = p;
hc++;
}
}
// 判断是否需要转换为链表
ln = (lc <= 6) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= 6) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
// 放入新桶
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 标记当前桶已迁移
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
// 辅助方法:CAS 设置桶的节点
private void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
计算迁移步长:
根据 CPU 核心数和原数组长度,确定每个线程的迁移桶数量。
初始化新数组:
创建扩容后的新数组(容量翻倍),标记迁移状态。
创建 ForwardingNode:
标记桶迁移状态,用于重定向访问。
分配迁移范围:
通过 transferIndex 分配线程的迁移桶范围,避免竞争。
处理空桶:
标记为空桶为迁移中(fwd )。
处理迁移中桶:
跳过已标记为 MOVED 的桶。
拆分链表 / 红黑树:
加锁后拆分节点,分别放入新数组的原位置和扩容位置。
标记迁移完成:
替换原数组为新数组,更新扩容阈值。
jdk7
维护一个 segment 数组,每个 segment 对应一把锁
- 优点:多个线程访问不同 segment 时无冲突,和 JDK8 中类似(JDK8 虽底层实现不同,但也有分散竞争的思想 )
- 缺点:Segments 数组默认大小为 16,容量初始化后不可改变,且不是懒惰初始化(创建时就分配好资源,未延迟到首次使用 )
构造器分析
// 最大 Segment 数量(必须是 2 的幂,且不超过 1<<16)
static final int MAX_SEGMENTS = 1 << 16;
// 每个 Segment 中哈希表的最小容量(必须是 2 的幂,默认 2)
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
// 1. 参数合法性校验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {
throw new IllegalArgumentException();
}
// 2. 限制 concurrencyLevel 的最大值
if (concurrencyLevel > MAX_SEGMENTS) {
concurrencyLevel = MAX_SEGMENTS;
}
// 3. 计算 Segment 数组的大小(ssize):必须是 2 的幂
int sshift = 0; // 左移次数
int ssize = 1; // Segment 数组的初始大小(至少为 1)
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1; // 左移 1 位(乘以 2),直到 ssize >= concurrencyLevel
}
// 4. 计算 segmentShift 和 segmentMask
this.segmentShift = 32 – sshift; // 用于计算 Segment 下标
this.segmentMask = ssize – 1; // 用于计算 Segment 下标的掩码
// 5. 限制 initialCapacity 的最大值
if (initialCapacity > MAXIMUM_CAPACITY) {
initialCapacity = MAXIMUM_CAPACITY;
}
// 6. 计算每个 Segment 的容量(cap)
int c = initialCapacity / ssize; // 平均每个 Segment 的容量
if (c * ssize < initialCapacity) {
++c; // 向上取整
}
int cap = MIN_SEGMENT_TABLE_CAPACITY; // 每个 Segment 的最小容量
while (cap < c) {
cap <<= 1; // 左移 1 位(乘以 2),直到 cap >= c
}
// 7. 创建第一个 Segment(s0)
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
// 8. 创建 Segment 数组(ss)
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
// 9. 初始化 Segment 数组的第一个元素(有序写,避免指令重排)
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss; // 赋值给全局变量
}
参数校验:
确保 loadFactor、initialCapacity、concurrencyLevel 合法。
限制并发级别:
concurrencyLevel 超过最大值时,截断为 MAX_SEGMENTS。
计算 Segment 数组大小:
找到大于等于 concurrencyLevel 的最小 2 的幂(ssize )。
计算哈希偏移量:
通过 segmentShift 和 segmentMask 定位 Segment。
限制初始容量:
initialCapacity 超过最大值时,截断为 MAXIMUM_CAPACITY。
计算 Segment 容量:
每个 Segment 的容量(cap )是 2 的幂,至少为 MIN_SEGMENT_TABLE_CAPACITY。
创建第一个 Segment:
初始化第一个 Segment,包含哈希表和扩容阈值。
创建 Segment 数组:
初始化 Segment 数组,大小为 ssize。
有序写入第一个 Segment:
通过 UNSAFE 确保线程安全,避免指令重排。
计算 segmentShift 和 segmentMask
this.segmentShift = 32 – sshift;
this.segmentMask = ssize – 1;
- 作用:
- segmentShift:用于计算 key 的哈希值在 Segment 维度的偏移量。
- segmentMask:用于计算 Segment 数组的下标(掩码)。
- 细节:结合 key 的哈希值,通过 (hash >>> segmentShift) & segmentMask 定位到具体的 Segment。
put流程
public V put(K key, V value) {
// 1. 校验 value 不为 null(JDK 7 不允许 value 为 null)
if (value == null) {
throw new NullPointerException();
}
// 2. 计算 key 的哈希值
int hash = hash(key);
// 3. 计算 key 对应的 Segment 下标
int j = (hash >>> segmentShift) & segmentMask;
// 4. 获取或创建对应的 Segment
Segment<K,V> s;
if ((s = (Segment<K,V>)UNSAFE.getObject(
segments, (j << SSHIFT) + SBASE)) == null) {
// 若 Segment 为 null,通过 ensureSegment 安全创建
s = ensureSegment(j);
}
// 5. 调用 Segment 的 put 方法插入数据
return s.put(key, hash, value, false);
}
校验 value:
value 为 null 时抛出异常。
计算哈希值:
二次哈希优化,减少冲突。
定位 Segment:
通过 segmentShift 和 segmentMask 计算 Segment 下标。
获取或创建 Segment:
用 UNSAFE 直接访问 Segment 数组;若为 null,通过 ensureSegment 安全创建。
调用 Segment 的 put:
进入 Segment 内部的锁逻辑,插入键值对。
segment继承了可重入锁(ReentrantLock),它的put方法为
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 1. 尝试加锁(非阻塞锁)
HashEntry<K,V> node = tryLock() ? null :
// 加锁失败,进入扫描并加锁流程(最多尝试 64 次)
scanAndLockForPut(key, hash, value);
// 2. 加锁成功后,执行插入逻辑
V oldValue;
try {
HashEntry<K,V>[] tab = table; // 当前 Segment 的哈希表
int index = (tab.length – 1) & hash; // 计算桶下标
HashEntry<K,V> first = entryAt(tab, index); // 获取桶的头节点
// 3. 遍历桶的链表,查找或插入节点
for (HashEntry<K,V> e = first;;) {
if (e != null) { // 桶不为空
K k;
// 3.1 找到相同 key 的节点,更新值
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) { // 是否仅在不存在时插入
e.value = value;
++modCount; // 记录结构修改次数
}
break;
}
e = e.next; // 遍历下一个节点
} else { // 桶为空,插入新节点
if (node != null) { // 复用 scanAndLockForPut 创建的节点
node.setNext(first);
} else { // 创建新节点
node = new HashEntry<K,V>(hash, key, value, first);
}
int c = count + 1; // 当前 Segment 的元素数量
// 检查是否需要扩容(超过负载因子)
if (c > threshold && tab.length < MAXIMUM_CAPACITY) {
// 扩容并迁移数据
rehash(node);
} else {
// 插入新节点到桶的头部
setEntryAt(tab, index, node);
}
++modCount;
count = c; // 更新元素数量
oldValue = null;
break;
}
}
} finally {
// 4. 释放锁
unlock();
}
return oldValue;
}
尝试加锁:
非阻塞加锁失败后,进入 scanAndLockForPut 流程(最多尝试 64 次 )。
遍历链表查找节点:
找到相同 key 则更新值,否则准备插入新节点。
插入新节点:
桶为空时插入新节点,检查是否需要扩容(rehash )。
扩容与迁移:
超过负载因子时,扩容为原容量的 2 倍,拆分并迁移链表。
释放锁:
操作完成后释放锁,保证线程安全
rehash流程
找到链表中连续相同下标的最长子链表(lastRun ),直接迁移,减少节点复制次数,提升扩容效率。
private void rehash(HashEntry<K,V> node) {
// 1. 保存旧哈希表数据
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 2. 计算新容量(扩容为原容量的 2 倍)
int newCapacity = oldCapacity << 1;
// 3. 更新扩容阈值(新容量 * 负载因子)
threshold = (int)(newCapacity * loadFactor);
// 4. 创建新哈希表
HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity – 1; // 新的掩码(用于计算桶下标)
// 5. 遍历旧哈希表,迁移数据到新哈希表
for (int i = 0; i < oldCapacity; ++i) {
HashEntry<K,V> e = oldTable[i]; // 当前桶的头节点
if (e != null) { // 桶不为空
HashEntry<K,V> next = e.next; // 下一个节点
int idx = e.hash & sizeMask; // 计算新桶下标
// 5.1 桶中只有一个节点(链表长度为 1)
if (next == null) {
newTable[idx] = e;
} else {
// 5.2 桶中有多个节点,复用连续相同下标的节点
HashEntry<K,V> lastRun = e; // 记录连续相同下标的最后一个节点
int lastIdx = idx; // 记录当前下标
// 遍历链表,找到连续相同下标的最长子链表
for (HashEntry<K,V> last = next; last != null; last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 将连续相同下标的子链表直接放入新桶
newTable[lastIdx] = lastRun;
// 5.3 遍历剩余节点,拆分到新桶
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
// 插入到新桶的头部(头插法)
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 6. 处理当前插入的新节点(扩容时被暂时保存的节点)
int nodeIndex = node.hash & sizeMask; // 计算新下标
node.setNext(newTable[nodeIndex]); // 插入到新桶的头部
newTable[nodeIndex] = node; // 更新新桶的头节点
// 7. 替换为新哈希表
table = newTable;
}
保存旧表数据:
记录旧哈希表的引用和容量。
计算新容量:
扩容为原容量的 2 倍(oldCapacity << 1 )。
更新扩容阈值:
新容量乘以负载因子,作为下一次扩容的触发条件。
创建新哈希表:
初始化新哈希表,容量为 newCapacity。
迁移旧表数据:
- 遍历旧哈希表的每个桶,拆分链表并迁移节点。
- 复用连续相同下标的子链表,减少节点复制。
插入新节点:
将扩容时暂时保存的新节点插入到新哈希表。
替换哈希表:
将 Segment 的哈希表替换为新哈希表,完成扩容。
get流程
每个 Segment 独立存储数据,get 时只需访问对应 Segment,无需加锁。
通过 UNSAFE.getObjectVolatile 保证 Segment 和 HashEntry 的可见性,避免脏读。
- 扩容过程中:
- 若 get 先发生(相对于扩容操作 ),从旧表取内容
- 若 get 后发生(相对于扩容操作 ),从新表取内容
public V get(Object key) {
// 1. 计算 key 的哈希值
int h = hash(key);
// 2. 计算 key 对应的 Segment 偏移量
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 3. 获取对应的 Segment
Segment<K,V> s = (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
HashEntry<K,V>[] tab;
// 4. 检查 Segment 和哈希表是否为空
if (s != null && (tab = s.table) != null) {
// 5. 计算桶下标,并获取桶的头节点
long t = (((long)(tab.length – 1) & h) << TSHIFT) + TBASE;
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, t);
e != null;
e = e.next) { // 遍历链表
K k;
// 6. 找到相同 key 的节点,返回对应值
if ((k = e.key) == key || (e.hash == h && key.equals(k))) {
return e.value;
}
}
}
return null; // 未找到对应 key,返回 null
}
计算哈希值:
二次哈希优化,减少哈希冲突。
定位 Segment:
通过哈希值和 Segment 数组参数,计算内存偏移量,获取对应 Segment。
检查非空:
确保 Segment 和哈希表已初始化,避免空指针。
定位桶并遍历链表:
计算桶下标,获取头节点,遍历链表查找匹配的 key。
匹配节点并返回值:
找到 key 和 hash 都匹配的节点,返回对应值;否则返回 null。
size计算流程
- 初步计算:计算元素个数前,先不加锁计算两次,若前后结果一致,认为个数正确并返回
- 重试与加锁:若两次结果不一致,进行重试;重试次数超 3 次时,锁住所有 segment,重新计算个数后返回
- 先尝试无锁计算,利用 modCount 检测并发修改。若两次计算的 modCount 之和一致,认为结果稳定,避免加锁开销。
// 重试次数阈值:超过该次数则加锁所有 Segment
static final int RETRIES_BEFORE_LOCK = 2;
public int size() {
// 1. 初始化变量
final Segment<K,V>[] segments = this.segments;
int size; // 最终返回的元素数量
boolean overflow; // 是否溢出 32 位整数
long sum; // 所有 Segment 的 modCount 之和
long last = 0L; // 上一次计算的 sum
int retries = -1; // 重试次数(初始为 -1,第一次循环不视为重试)
try {
for (;;) { // 无限循环,直到获取稳定结果
// 2. 检查是否需要加锁所有 Segment
if (retries++ == RETRIES_BEFORE_LOCK) {
// 遍历所有 Segment,加锁(确保 Segment 已创建)
for (int j = 0; j < segments.length; ++j) {
ensureSegment(j).lock(); // 加锁,force creation
}
}
// 3. 重置计算参数
sum = 0L;
size = 0;
overflow = false;
// 4. 遍历所有 Segment,累加元素数量
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount; // 累加 modCount(记录结构变化)
int c = seg.count; // 当前 Segment 的元素数量
if (c < 0 || (size += c) < 0) {
overflow = true; // 溢出判断
}
}
}
// 5. 检查两次计算结果是否一致
if (sum == last) { // 若 sum 与上一次相同,说明无结构变化
break;
}
last = sum; // 记录当前 sum,用于下次对比
}
} finally {
// 6. 释放所有 Segment 的锁(仅当加锁过才释放)
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j) {
segmentAt(segments, j).unlock();
}
}
}
// 7. 处理溢出或返回结果
return overflow ? Integer.MAX_VALUE : size;
}
// 辅助方法:获取指定位置的 Segment(确保可见性)
private Segment<K,V> segmentAt(Segment<K,V>[] segments, int j) {
return (Segment<K,V>) UNSAFE.getObjectVolatile(segments,
((long)j << SSHIFT) + SBASE);
}
// 辅助方法:确保 Segment 已创建(延迟初始化)
private Segment<K,V> ensureSegment(int j) {
// … 逻辑与之前解析一致,确保 Segment 创建 …
}
初始化变量:
定义结果、溢出标记、modCount 累加和等变量。
检查加锁条件:
重试次数超过阈值时,加锁所有 Segment,确保结果准确。
重置计算参数:
每次计算前清空临时变量,避免干扰。
遍历累加数量:
遍历所有 Segment,累加 modCount 和 count,检测溢出。
检查结果稳定性:
若 modCount 之和稳定(两次计算相同 ),认为结果准确,跳出循环。
释放锁:
加锁后需释放,避免性能损耗。
返回结果:
根据溢出情况返回最终结果。
总结
底层数据结构 | 由Segment数组和HashEntry数组组成,每个Segment类似于一个小型的HashMap,Segment继承自ReentrantLock,是可重入锁,HashEntry组成链表来处理哈希冲突 | 由Node数组、链表和红黑树组成,数组中的每个元素是一个Node,如果链表长度超过阈值(默认 8),会将链表转换为红黑树以提升查询效率 |
锁机制 | 分段锁,每个Segment对应一把锁,多线程访问不同Segment时不会产生冲突,提高了并发度,但锁粒度相对较粗。当进行写操作时,需要锁定对应的Segment | 锁粒度更细,引入了CAS操作和Synchronized关键字。在链表节点层面,对于插入、更新等操作,先尝试使用CAS操作,失败后再使用synchronized同步代码块锁定单个链表节点或红黑树 |
初始化方式 | 不是懒惰初始化,在创建ConcurrentHashMap时,会根据指定的concurrencyLevel初始化Segment数组,每个Segment内部的HashEntry数组也会进行一定的初始化 | 采用懒惰初始化,在首次插入元素时才会初始化底层的数组 |
扩容机制 | 当某个Segment中的元素数量超过其负载因子对应的阈值时,该Segment进行扩容,扩容时会创建一个新的HashEntry数组,然后将旧数组中的元素重新哈希并迁移到新数组中,采用头插法,在多线程环境下可能会出现链表成环问题 | 当整个ConcurrentHashMap的元素数量超过负载因子对应的阈值时,会触发扩容。扩容时采用复制迁移的方式,将旧数组中的元素复制到新数组中,并且采用尾插法,避免了链表成环问题。同时,扩容支持多线程协作,通过ForwardingNode标记节点,让其他线程知道该节点正在迁移 |
读操作 | 未加锁,通过UNSAFE方法保证可见性。在扩容过程中,如果get操作先于扩容发生,则从旧表取内容;如果get操作后于扩容发生,则从新表取内容 | 读操作大部分情况下无锁,通过volatile关键字保证可见性。在链表上查找时直接遍历,在红黑树上查找时按照红黑树的查找算法进行 |
写操作 | 对需要操作的Segment加锁,然后进行插入、更新等操作。如果在一个Segment内链表长度过长,不会转换为红黑树 | 先通过哈希值确定要操作的数组索引位置,若该位置为空,使用CAS操作插入新节点;若不为空,判断是链表还是红黑树,然后采用不同的方式插入,当链表长度达到阈值(默认 8)且数组长度大于 64 时,会将链表转换为红黑树 |
计算元素个数(size) | 先不加锁计算两次modCount之和以及每个Segment的元素数量总和,如果前后两次计算结果一样,认为个数正确返回;如果不一样,进行重试,重试次数超过 3 次,将所有Segment锁住,重新计算个数返回 | 通过累加baseCount和counterCells数组中所有CounterCell的value值来计算元素个数。在多线程并发修改的情况下,由于counterCells的存在,能较好地处理并发计数问题 |
性能特点 | 在多线程读多写少的场景下,由于分段锁机制,并发性能较好,但写操作时锁粒度相对较粗,可能存在竞争。初始化时会占用一定内存 | 在多线程读写场景下性能都有较大提升,锁粒度更细,CAS和Synchronized结合减少了锁竞争,并且采用了红黑树优化查询,内存使用更高效,初始化时按需分配内存 |
锁机制 | 段级别的锁,每个Segment对应一把锁 | 更细粒度的锁(Synchronized )结合无锁操作(CAS ) | Java 8 减少了锁竞争,允许更多线程同时访问,提高了并发性。例如,在高并发写入场景下,Java 7 可能因锁冲突导致线程阻塞,Java 8 能让更多线程并行执行写入操作 |
get操作 | 未加锁,通过UNSAFE方法保证可见性 | 无锁操作,利用volatile关键字保证可见性 | Java 8 的get操作在保证可见性的同时,减少了UNSAFE带来的潜在风险和复杂性,性能更优。并且volatile语义更符合 Java 内存模型规范,可读性和维护性更好 |
put操作 | 对Segment加锁,采用头插法 | 使用CAS 优先尝试插入,失败后使用细粒度的synchronized锁,采用尾插法 | Java 8 的put操作减少了锁的使用频率,CAS操作避免了不必要的锁竞争,尾插法避免了多线程扩容时链表成环问题,提高了插入和更新的性能和稳定性 |
扩容机制 | 某个Segment元素数量超阈值时,该Segment单独扩容,采用头插法迁移数据 | 整个ConcurrentHashMap元素数量超阈值时触发扩容,多线程协作扩容,逐步迁移节点,采用尾插法 | Java 8 的扩容机制减少了扩容对整体性能的影响,多线程协作加速了扩容过程,且尾插法保证了数据结构的正确性,避免了链表成环 |
查找机制 | 链表结构,查找时间复杂度平均为 O (n) | 链表长度超过阈值(默认 8)时转换为红黑树,查找时间复杂度平均为 O (log n) | Java 8 在数据量较大且哈希冲突较多时,红黑树结构降低了查找时间复杂度,提高了查找效率 |
内存使用 | 不是懒惰初始化,创建时会初始化Segment数组及内部部分资源 | 懒惰初始化,首次插入元素时才初始化底层数组 | Java 8 避免了不必要的内存占用,在内存使用上更加高效,适用于内存敏感的应用场景 |
评论前必须登录!
注册