云计算百科
云计算领域专业知识百科平台

【RocketMQ Broker 相关源码】- 清除不活跃的生产者、消费者、过滤服务器连接

文章目录

  • 1. 前言
  • 2. 客户端心跳检测服务
  • 3. scanExceptionChannel 扫描不活跃的连接
    • 3.1 ProducerManager#scanNotActiveChannel
    • 3.2 ConsumerManager#scanNotActiveChannel
    • 3.3 FilterServerManager#scanNotActiveChannel
  • 4. 小结

本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源码系列目录
  • 【RocketMQ Broker 相关源码】- 清除不活跃的 broker

上一篇文章我们探究了 NameServer 是如何清除不活跃的 broker,而这篇文章我们也来看下 broker 是如何清除不活跃的生产者、消费者、过滤服务器连接。

2. 客户端心跳检测服务

在 broker#start 方法启动 broker 的时候会启动客户端心跳检测定时任务,这个任务会在启动 10s 之后执行,每隔 10s 执行一次。在这里插入图片描述 在 start 方法中会调用 scanExceptionChannel 去扫描过期的连接,这个连接里面包括生产者连接、消费者连接和过滤服务器连接,之前 broker 启动的文章我们也分析过了,这个版本下过滤服务器应该是不再用了,而且在 5.0.0 之后这个过滤服务也会从 RocketMQ 中删掉,推荐用 TAG 过滤和 SQL92 过滤。

/**
* 启动 10s 之后执行,每隔 10s 执行一次
*/

public void start() {

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 扫描过期的连接
ClientHousekeepingService.this.scanExceptionChannel();
} catch (Throwable e) {
log.error("Error occurred when scan not active client channels.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
}

/**
* 扫描出不活跃的连接
*/

private void scanExceptionChannel() {
// 生产者连接
this.brokerController.getProducerManager().scanNotActiveChannel();
// 消费者连接
this.brokerController.getConsumerManager().scanNotActiveChannel();
// 过滤服务器连接
this.brokerController.getFilterServerManager().scanNotActiveChannel();
}

3. scanExceptionChannel 扫描不活跃的连接

3.1 ProducerManager#scanNotActiveChannel

这个方法就是扫描生产者的非活跃连接,生产者和消费者启动的时候会通过定时任务每隔 30s 上报一次心跳信息。而由于生产者和消费者都有组的概念,所以 broker 存储连接通道会使用 Map 存储,key 就是生产者组,value 就是这个消费者组下面的所有生产者。

scanNotActiveChannel 的逻辑就是扫描所有的生产者组下面的生产者,然后判断如果当前距离上一次上报心跳信息已经超过了 120s,说明是一个不活跃的连接,这种情况下会关闭这个连接的通道,然后将这个连接从 groupChannelTable 集合中删掉。 lastUpdateTimestamp 记录的就是上一次上报心跳的事件。

/**
* 扫描不活跃的生产者连接
*/

public void scanNotActiveChannel() {
for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) {
// 生产者组
final String group = entry.getKey();
// 生产者组下面的生产者连接
final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();

// 遍历所有连接
Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, ClientChannelInfo> item = it.next();
// final Integer id = item.getKey();
final ClientChannelInfo info = item.getValue();

// 如果当前距离上一次上报心跳事件已经超过了 120s, 说明是一个不活跃的连接, 生产者和消费者会每隔 30s 上报一次心跳信息
long diff = System.currentTimeMillis() info.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
clientChannelTable.remove(info.getClientId());
log.warn(
"SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
// 关闭连接
RemotingUtil.closeChannel(info.getChannel());
}
}
}
}

3.2 ConsumerManager#scanNotActiveChannel

/**
* 扫描出非活跃的消费者连接
*/

public void scanNotActiveChannel() {
// 遍历所有消费者组
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> next = it.next();
// 消费者组
String group = next.getKey();
// 消费者组下面的消费者连接集合
ConsumerGroupInfo consumerGroupInfo = next.getValue();
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();

Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
// 遍历所有消费者连接
while (itChannel.hasNext()) {
// 获取连接的信息
Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
ClientChannelInfo clientChannelInfo = nextChannel.getValue();
// 如果距离上一次上报心跳已经超过 120s, 说明连接不活跃
long diff = System.currentTimeMillis() clientChannelInfo.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
log.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
// 关闭连接, 将这个连接从消费者组连接集合中删掉
RemotingUtil.closeChannel(clientChannelInfo.getChannel());
itChannel.remove();
}
}

// 如果这个消费者组连接都删掉了, 就把这个消费者组也删掉
if (channelInfoTable.isEmpty()) {
log.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
group);
it.remove();
}
}
}

这个方法跟上面生产者的差不多的,都是扫描消费者连接,如果这个消费者距离上一次上报心跳已经超过 120s,说明连接不活跃,将这个连接删掉,消费者也是一样 30s 上报一次心跳。

3.3 FilterServerManager#scanNotActiveChannel

/**
* 扫描不活跃的过滤服务连接
*/

public void scanNotActiveChannel() {

// 遍历所有过滤服务连接
Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, FilterServerInfo> next = it.next();
// 上一次上报的心跳事件
long timestamp = next.getValue().getLastUpdateTimestamp();
Channel channel = next.getKey();
// 如果超过 30s 没有上报信息
if ((System.currentTimeMillis() timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {
// 删除并关闭连接
log.info("The Filter Server<{}> expired, remove it", next.getKey());
it.remove();
RemotingUtil.closeChannel(channel);
}
}
}

这里逻辑跟上面的差不多,但是因为过滤服务器这个版本不再用了,所以直接看注释就行。

4. 小结

好了,这篇文章我们就探究了 broker 是如何扫描出不活跃的的生产者、消费者和过滤服务器,生产者和消费者在启动的时候都会每隔 30s 上报一次心跳到 broker,所以 broker 这里的判断是如果生产者和消费者 120s 内没有再次上报心跳,就认为这个连接是不活跃的,过期的,可能是生产者或者消费者遇到了什么问题,这种情况下 broker 就会断开这个连接,等待后续服务恢复之后再次和 broker 建连。

如有错误,欢迎指出!!!!

赞(0)
未经允许不得转载:网硕互联帮助中心 » 【RocketMQ Broker 相关源码】- 清除不活跃的生产者、消费者、过滤服务器连接
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!