文章目录
- 1. 前言
- 2. 客户端心跳检测服务
- 3. scanExceptionChannel 扫描不活跃的连接
-
- 3.1 ProducerManager#scanNotActiveChannel
- 3.2 ConsumerManager#scanNotActiveChannel
- 3.3 FilterServerManager#scanNotActiveChannel
- 4. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
- 【RocketMQ Broker 相关源码】- 清除不活跃的 broker
上一篇文章我们探究了 NameServer 是如何清除不活跃的 broker,而这篇文章我们也来看下 broker 是如何清除不活跃的生产者、消费者、过滤服务器连接。
2. 客户端心跳检测服务
在 broker#start 方法启动 broker 的时候会启动客户端心跳检测定时任务,这个任务会在启动 10s 之后执行,每隔 10s 执行一次。 在 start 方法中会调用 scanExceptionChannel 去扫描过期的连接,这个连接里面包括生产者连接、消费者连接和过滤服务器连接,之前 broker 启动的文章我们也分析过了,这个版本下过滤服务器应该是不再用了,而且在 5.0.0 之后这个过滤服务也会从 RocketMQ 中删掉,推荐用 TAG 过滤和 SQL92 过滤。
/**
* 启动 10s 之后执行,每隔 10s 执行一次
*/
public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 扫描过期的连接
ClientHousekeepingService.this.scanExceptionChannel();
} catch (Throwable e) {
log.error("Error occurred when scan not active client channels.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
}
/**
* 扫描出不活跃的连接
*/
private void scanExceptionChannel() {
// 生产者连接
this.brokerController.getProducerManager().scanNotActiveChannel();
// 消费者连接
this.brokerController.getConsumerManager().scanNotActiveChannel();
// 过滤服务器连接
this.brokerController.getFilterServerManager().scanNotActiveChannel();
}
3. scanExceptionChannel 扫描不活跃的连接
3.1 ProducerManager#scanNotActiveChannel
这个方法就是扫描生产者的非活跃连接,生产者和消费者启动的时候会通过定时任务每隔 30s 上报一次心跳信息。而由于生产者和消费者都有组的概念,所以 broker 存储连接通道会使用 Map 存储,key 就是生产者组,value 就是这个消费者组下面的所有生产者。
scanNotActiveChannel 的逻辑就是扫描所有的生产者组下面的生产者,然后判断如果当前距离上一次上报心跳信息已经超过了 120s,说明是一个不活跃的连接,这种情况下会关闭这个连接的通道,然后将这个连接从 groupChannelTable 集合中删掉。 lastUpdateTimestamp 记录的就是上一次上报心跳的事件。
/**
* 扫描不活跃的生产者连接
*/
public void scanNotActiveChannel() {
for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) {
// 生产者组
final String group = entry.getKey();
// 生产者组下面的生产者连接
final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
// 遍历所有连接
Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, ClientChannelInfo> item = it.next();
// final Integer id = item.getKey();
final ClientChannelInfo info = item.getValue();
// 如果当前距离上一次上报心跳事件已经超过了 120s, 说明是一个不活跃的连接, 生产者和消费者会每隔 30s 上报一次心跳信息
long diff = System.currentTimeMillis() – info.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
clientChannelTable.remove(info.getClientId());
log.warn(
"SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
// 关闭连接
RemotingUtil.closeChannel(info.getChannel());
}
}
}
}
3.2 ConsumerManager#scanNotActiveChannel
/**
* 扫描出非活跃的消费者连接
*/
public void scanNotActiveChannel() {
// 遍历所有消费者组
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> next = it.next();
// 消费者组
String group = next.getKey();
// 消费者组下面的消费者连接集合
ConsumerGroupInfo consumerGroupInfo = next.getValue();
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();
Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
// 遍历所有消费者连接
while (itChannel.hasNext()) {
// 获取连接的信息
Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
ClientChannelInfo clientChannelInfo = nextChannel.getValue();
// 如果距离上一次上报心跳已经超过 120s, 说明连接不活跃
long diff = System.currentTimeMillis() – clientChannelInfo.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
log.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
// 关闭连接, 将这个连接从消费者组连接集合中删掉
RemotingUtil.closeChannel(clientChannelInfo.getChannel());
itChannel.remove();
}
}
// 如果这个消费者组连接都删掉了, 就把这个消费者组也删掉
if (channelInfoTable.isEmpty()) {
log.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
group);
it.remove();
}
}
}
这个方法跟上面生产者的差不多的,都是扫描消费者连接,如果这个消费者距离上一次上报心跳已经超过 120s,说明连接不活跃,将这个连接删掉,消费者也是一样 30s 上报一次心跳。
3.3 FilterServerManager#scanNotActiveChannel
/**
* 扫描不活跃的过滤服务连接
*/
public void scanNotActiveChannel() {
// 遍历所有过滤服务连接
Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, FilterServerInfo> next = it.next();
// 上一次上报的心跳事件
long timestamp = next.getValue().getLastUpdateTimestamp();
Channel channel = next.getKey();
// 如果超过 30s 没有上报信息
if ((System.currentTimeMillis() – timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {
// 删除并关闭连接
log.info("The Filter Server<{}> expired, remove it", next.getKey());
it.remove();
RemotingUtil.closeChannel(channel);
}
}
}
这里逻辑跟上面的差不多,但是因为过滤服务器这个版本不再用了,所以直接看注释就行。
4. 小结
好了,这篇文章我们就探究了 broker 是如何扫描出不活跃的的生产者、消费者和过滤服务器,生产者和消费者在启动的时候都会每隔 30s 上报一次心跳到 broker,所以 broker 这里的判断是如果生产者和消费者 120s 内没有再次上报心跳,就认为这个连接是不活跃的,过期的,可能是生产者或者消费者遇到了什么问题,这种情况下 broker 就会断开这个连接,等待后续服务恢复之后再次和 broker 建连。
如有错误,欢迎指出!!!!
评论前必须登录!
注册