前两篇我们分别剖析了服务端 Controller 层的数据接收架构和 CompletableFuture 并行编排机制。本篇将目光聚焦于一个看似简单实则精巧的子系统——应用实例管理:一个心跳包从到达服务端到最终触发告警,中间经历了哪些环节?服务端又是如何判定实例的在线/离线状态、处理服务启停带来的状态抖动的?让我们深入源码一探究竟。
一、为什么心跳机制是监控平台的基石?
在分布式监控场景中,应用实例与监控服务端之间通常是松耦合的网络通信关系。服务端需要回答一个最基本的问题:此刻,哪些应用实例还活着?
这个问题的答案无法通过「推论」获得,只能通过「事实」确认——客户端必须定期向服务端报告「我还在」。这就是心跳机制的核心理念:通过周期性的主动上报,让服务端在可接受的时间窗口内感知到应用实例的存活状态。
Phoenix 的心跳机制不只是简单地接收一个信号,它围绕心跳构建了一套完整的实例生命周期管理模型:
实例注册 → 心跳续约 → 超时检测 → 状态流转 → 告警触发 → 恢复检测
这个模型的每一个环节都有值得推敲的设计细节。
二、心跳包的数据结构
2.1 HeartbeatPackage 的继承体系
心跳包在 Phoenix 中被建模为 HeartbeatPackage,它的继承结构如下:
HeartbeatPackage
└── BaseRequestPackage(id、dateTime、extraMsg)
└── AbstractSuperPackage(instanceId、instanceName、instanceDesc、instanceEndpoint、ip、computerName、chain、instanceLanguage、appServerType)
└── AbstractSuperBean(toJsonString)
HeartbeatPackage 自身只定义了四个字段:
public class HeartbeatPackage extends BaseRequestPackage {
// 心跳频率(秒)
private long rate;
// 是否启用Arthas在线诊断
private boolean isEnableArthas = false;
// 是否收集JVM指标
private boolean isCollectVmMetrics = true;
// 是否收集线程池指标
private boolean isCollectThreadPoolMetrics = false;
}
设计意图:心跳包的核心职责是「保活」,因此它携带的信息分为两类——身份标识(继承自父类的 instanceId、instanceName、ip 等)和能力声明(rate、isEnableArthas、isCollectVmMetrics、isCollectThreadPoolMetrics 等布尔标记)。后者告诉服务端「我这个实例支持采集哪些数据」,服务端将这些能力信息存储到数据库中,供 UI 端据此决定是否展示 JVM 信息、线程池信息等监控面板。需要注意的是,服务端并不主动请求这些数据,所有监控数据都是由客户端定时上报、服务端被动接收的。
2.2 心跳频率:客户端与服务端的契约
rate 字段是客户端与服务端之间的一个关键契约。客户端通过它告诉服务端「我会每隔 rate 秒发一次心跳」。服务端在判定超时时,会将这个频率作为基准来计算判决时间窗口。这个设计使得不同应用实例可以配置不同的心跳频率,不必一刀切。
三、心跳接收的双通道架构
Phoenix 支持两种通信通道接收心跳包:HTTP 和 WebSocket。两种通道最终都会调用同一个 IHeartbeatService.dealHeartbeatPackage() 方法,区别在于入口不同。
3.1 HTTP 通道:HeartbeatController(已标记 @Deprecated)
@Deprecated
@RestController
@RequestMapping("/heartbeat")
public class HeartbeatController {
@PostMapping("/accept-heartbeat-package")
public BaseResponsePackage acceptHeartbeatPackage(
@RequestBody HeartbeatPackage heartbeatPackage) throws NetException {
TimeInterval timer = DateUtil.timer();
Result result = this.heartbeatService.dealHeartbeatPackage(heartbeatPackage);
BaseResponsePackage response = this.serverPackageConstructor
.structureBaseResponsePackage(result);
if (timer.intervalSecond() > 1) {
log.warn("处理心跳包耗时:{}", timer.intervalPretty());
}
return response;
}
}
HTTP 通道的入口是 HeartbeatController,处理流程清晰:计时 → 调用 Service → 构造响应 → 慢处理告警。值得注意的是,虽然该类标注了 @Deprecated(新版本推荐使用 WebSocket),但它的处理逻辑仍然完整可用。
HTTP 通道的增强逻辑通过 AOP 切面 HeartbeatAspect 实现:在心跳处理方法执行前,切面会回调所有注册的 ILinkListener,用于维护应用实例之间的链路拓扑关系。
3.2 WebSocket 通道:事件驱动模型
WebSocket 通道采用完全不同的事件驱动架构。当 Netty 接收到 WebSocket 文本帧后,处理链路如下:
MonitoringFrameHandler.onMessageReceived()
→ WebSocketPackage.convert()(解密解压 + 白名单校验)
→ WebSocketMessageDispatcher.dispatch()(路由分发)
→ 发布 HeartbeatEvent(Spring ApplicationEvent)
→ HeartbeatListener.handleHeartbeatPackage()(@Async + @EventListener)
→ IHeartbeatService.dealHeartbeatPackage()
分发器的路由设计值得一提。WebSocketMessageDispatcher 在静态初始化块中注册了一张映射表:
static {
register(HeartbeatPackage.class,
(ctx, payload) -> new HeartbeatEvent(ctx, (HeartbeatPackage) payload));
register(AlarmPackage.class,
(ctx, payload) -> new AlarmEvent(ctx, (AlarmPackage) payload));
register(JvmPackage.class,
(ctx, payload) -> new JvmEvent(ctx, (JvmPackage) payload));
// ... 其他包类型
}
分发时根据 WebSocketPackage 中的 className 字段查表,找到对应的工厂 Lambda,创建事件对象并发布。这种表驱动 + 工厂模式的设计使得新增包类型时只需添加一行 register 代码,分发逻辑完全不需要改动。
HeartbeatListener 使用 @Async + @EventListener 组合注解,确保心跳处理在独立线程中异步执行,不阻塞 Netty 的 I/O 线程:
@Async
@EventListener
public void handleHeartbeatPackage(HeartbeatEvent event) {
TimeInterval timer = DateUtil.timer();
HeartbeatPackage heartbeatPackage = event.getHeartbeatPackage();
this.beforeWakeUp(heartbeatPackage); // 回调链路监听器
this.heartbeatService.dealHeartbeatPackage(heartbeatPackage);
if (timer.intervalSecond() > 1) {
log.warn("处理心跳包耗时:{}", timer.intervalPretty());
}
}
3.3 两种通道的对比
| 维度 | HTTP 通道 | WebSocket 通道 |
|---|---|---|
| 入口 | HeartbeatController | HeartbeatListener |
| 增强机制 | AOP 切面(HeartbeatAspect) | 监听器内部直接调用 |
| 线程模型 | Servlet 容器线程池 | @Async 线程池 |
| 状态 | @Deprecated | 推荐 |
| 请求/响应 | 标准请求-响应 | 事件驱动,无直接响应 |
两种通道的增强逻辑本质相同——都是在心跳处理前回调 ILinkListener 来更新链路信息。区别在于 HTTP 通道通过 AOP 切面拦截,WebSocket 通道通过监听器内部显式调用。之所以保留两种方式,是为了向后兼容。
四、心跳处理的核心逻辑
4.1 HeartbeatServiceImpl:极简的入口
@Service
public class HeartbeatServiceImpl implements IHeartbeatService {
@Autowired
private IInstanceService instanceService;
@Override
public Result dealHeartbeatPackage(HeartbeatPackage heartbeatPackage) {
this.instanceService.operateMonitorInstance(heartbeatPackage);
return Result.builder().isSuccess(true).msg(ResultMsgConstants.SUCCESS).build();
}
}
HeartbeatServiceImpl 的实现极其简洁——只有一行有效代码。这种设计将心跳处理与实例管理解耦:HeartbeatService 只负责接收和路由,InstanceService 负责实际的数据库操作。
4.2 InstanceServiceImpl.operateMonitorInstance():插入 or 更新
这是心跳处理的核心方法:
@Retryable
@Transactional(rollbackFor = Throwable.class)
@Override
public void operateMonitorInstance(HeartbeatPackage heartbeatPackage) {
String instanceId = heartbeatPackage.getInstanceId();
Date currentTime = new Date();
// 查询数据库中有没有当前应用实例
LambdaQueryWrapper<MonitorInstance> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(MonitorInstance::getInstanceId, instanceId);
int selectCountDb = this.count(wrapper);
// 封装实例对象
MonitorInstance entity = new MonitorInstance();
entity.setInstanceId(instanceId);
entity.setInstanceName(heartbeatPackage.getInstanceName());
entity.setInstanceDesc(heartbeatPackage.getInstanceDesc());
entity.setEndpoint(heartbeatPackage.getInstanceEndpoint());
entity.setIp(heartbeatPackage.getIp());
entity.setConnFrequency((int) heartbeatPackage.getRate());
entity.setIsEnableArthas(heartbeatPackage.isEnableArthas());
entity.setIsCollectVmMetrics(heartbeatPackage.isCollectVmMetrics());
entity.setIsCollectThreadPoolMetrics(heartbeatPackage.isCollectThreadPoolMetrics());
entity.setLanguage(heartbeatPackage.getInstanceLanguage());
entity.setAppServerType(heartbeatPackage.getAppServerType().getName());
entity.setIsOfflineNotice(ZeroOrOneConstants.ZERO);
if (selectCountDb == 0) {
// 首次注册:插入新记录
entity.setInsertTime(currentTime);
entity.setOfflineCount(0);
entity.setIsEnableMonitor(ZeroOrOneConstants.ONE);
entity.setIsEnableAlarm(ZeroOrOneConstants.ONE);
this.save(entity);
} else {
// 已有记录:更新 update_time
entity.setUpdateTime(currentTime);
LambdaUpdateWrapper<MonitorInstance> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.eq(MonitorInstance::getInstanceId, instanceId);
this.update(entity, updateWrapper);
}
}
这段代码揭示了几个关键设计:
1. 幂等的注册机制
通过 instanceId 查询数据库,如果不存在则插入,存在则更新。这意味着客户端首次发送心跳时自动完成注册,无需预先在服务端配置。instanceId 是客户端的唯一标识,由 InstanceGenerator 基于 IP、端口号、应用名等信息生成,确保同一应用实例在不同环境下生成不同的 ID。
2. updateTime 是状态判定的命脉
更新操作中最关键的是 entity.setUpdateTime(currentTime)。这个时间戳就是服务端判断实例是否存活的依据——如果 updateTime 距当前时间超过了阈值,实例就被判定为离线。因此每次心跳成功处理,本质上是刷新了这个「续约时间戳」。
3. 首次注册的默认策略
新注册的实例默认开启监控(isEnableMonitor = "1")和告警(isEnableAlarm = "1"),离线次数初始化为 0。这种「默认全开」的设计减少了运维配置成本,后续可通过 UI 界面按需调整。
4. @Retryable 容错机制
方法上标注了 @Retryable,当数据库操作因网络抖动、连接超时等瞬态故障失败时,Spring Retry 会自动重试,提高系统的容错能力。
5. isOfflineNotice 重置为 0
每次心跳都会将 isOfflineNotice 重置为 0,表示「未收到离线通知」。这个字段在后续的在线状态判定中扮演重要角色——它用来区分「心跳超时导致的离线」和「主动下线通知导致的离线」。
4.3 MONITOR_INSTANCE 表结构
对应的数据表 MONITOR_INSTANCE 包含以下核心字段:
| 字段 | 类型 | 说明 |
|---|---|---|
ID |
BIGINT | 自增主键 |
INSTANCE_ID |
VARCHAR | 应用实例唯一标识(业务主键) |
INSTANCE_NAME |
VARCHAR | 应用实例名称 |
INSTANCE_DESC |
VARCHAR | 应用实例描述(来自客户端) |
INSTANCE_SUMMARY |
VARCHAR | 应用实例摘要(用户通过 UI 设置,优先级更高) |
ENDPOINT |
VARCHAR | 端点类型:client / agent / server |
IP |
VARCHAR | IP 地址 |
INSERT_TIME |
DATETIME | 首次注册时间 |
UPDATE_TIME |
DATETIME | 最后一次心跳更新时间 |
IS_ONLINE |
CHAR(1) | 在线状态:0-离线,1-在线 |
IS_ENABLE_MONITOR |
CHAR(1) | 是否开启监控 |
IS_ENABLE_ALARM |
CHAR(1) | 是否开启告警 |
IS_OFFLINE_NOTICE |
CHAR(1) | 是否收到离线通知:0-否,1-是 |
OFFLINE_COUNT |
INT | 累计离线次数 |
CONN_FREQUENCY |
INT | 心跳频率(秒) |
LANGUAGE |
VARCHAR | 编程语言 |
APP_SERVER_TYPE |
VARCHAR | 应用服务器类型 |
MONITOR_ENV |
VARCHAR | 监控环境(用户自定义) |
MONITOR_GROUP |
VARCHAR | 监控分组(用户自定义) |
IS_OFFLINE_NOTICE 是一个容易忽略但至关重要的字段。它标识实例离线的原因来源——是心跳超时推断出来的,还是收到了客户端主动发送的 OfflinePackage。这两者在状态恢复时的处理逻辑不同。
五、在线状态判定:InstanceMonitorJob
心跳处理只是「续约」动作,真正的状态判定发生在定时任务 InstanceMonitorJob 中。
5.1 任务调度配置
在 QuartzConfig 中,实例监控任务被配置为每 30 秒执行一次:
@Bean
public Trigger instanceMonitorTrigger() {
return TriggerBuilder.newTrigger()
.forJob(this.instanceMonitorJobDetail())
.withIdentity("instanceMonitorTrigger", TRIGGER_GROUP)
.startAt(new DateTime().plusSeconds(5).toDate())
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(30).repeatForever())
.build();
}
启动延迟 5 秒,之后每 30 秒扫描一次。这个频率与客户端默认的心跳频率一致。
5.2 @DisallowConcurrentExecution 的意义
@DisallowConcurrentExecution
public class InstanceMonitorJob extends QuartzJobBean
implements CommandLineRunner, DisposableBean, IOfflineListener {
@DisallowConcurrentExecution 确保 Quartz 不会在上一次任务执行完毕之前启动下一次任务。考虑到扫描逻辑涉及数据库查询和状态更新,如果允许并发执行,可能导致同一实例被两个线程同时判定为离线,发送重复告警。
5.3 多重身份:不只是 Quartz 任务
InstanceMonitorJob 同时实现了三个接口:
| 接口 | 职责 | 触发时机 |
|---|---|---|
QuartzJobBean |
定时扫描实例状态 | 每 30 秒 |
CommandLineRunner |
启动时重置实例状态 | 应用启动后 |
DisposableBean |
关闭时通知自身离线 | 应用关闭前 |
IOfflineListener |
响应其他实例的下线通知 | 收到 OfflinePackage 时 |
这种「多重身份」的设计将实例状态管理的所有逻辑集中在一个类中,避免了散落在各处的状态操作导致的不一致问题。
5.4 启动时的状态重置
@Override
public void run(String... args) {
List<MonitorInstance> initMonitorInstances = this.instanceService.list(
new LambdaQueryWrapper<>());
initMonitorInstances.forEach(instance -> {
boolean isOnline = StringUtils.equals(
instance.getIsOnline(), ZeroOrOneConstants.ONE);
if (isOnline) {
MonitorInstance monitorInstance = MonitorInstance.builder()
.id(instance.getId())
.updateTime(new Date())
.isOfflineNotice(ZeroOrOneConstants.ZERO)
.build();
this.instanceService.updateById(monitorInstance);
}
});
commandLineRunnerHasRun = true;
}
这段代码解决了一个非常实际的问题:服务端重启后,数据库中可能残留着上次运行时标记为「在线」的实例。如果不处理,这些实例会因为 updateTime 过旧而被立即判定为离线,产生大量虚假告警。
解决方案很优雅:启动时把所有在线实例的 updateTime 刷新为当前时间,相当于给予一个「宽限期」。这样服务端重启后的第一次扫描不会误判,客户端的心跳会在宽限期内续约。
commandLineRunnerHasRun 是一个 volatile 标志位,确保只有启动初始化完成后,定时任务和监听器回调才会生效,避免在初始化过程中就开始处理业务逻辑。
5.5 核心扫描逻辑:executeInternal()
@Override
protected void executeInternal(@NonNull JobExecutionContext context) {
if (!commandLineRunnerHasRun) return;
// 双重开关检查
boolean isEnable = this.monitoringConfigPropertiesLoader
.getMonitoringProperties().getInstanceProperties().isEnable();
if (!isEnable) return;
boolean isStatusEnable = this.monitoringConfigPropertiesLoader
.getMonitoringProperties().getInstanceProperties()
.getInstanceStatusProperties().isEnable();
if (!isStatusEnable) return;
synchronized (InstanceMonitorJob.class) {
List<MonitorInstance> monitorInstances =
this.instanceService.list(new LambdaQueryWrapper<>());
for (MonitorInstance monitorInstance : monitorInstances) {
// 跳过未开启监控的实例
if (!StringUtils.equals(ZeroOrOneConstants.ONE,
monitorInstance.getIsEnableMonitor())) continue;
// 计算判决时间窗口
int thresholdSecond = monitorInstance.getConnFrequency()
* this.monitoringConfigPropertiesLoader
.getMonitoringProperties().getThreshold();
Date dateTime = monitorInstance.getUpdateTime() == null
? monitorInstance.getInsertTime()
: monitorInstance.getUpdateTime();
DateTime judgeDateTime = new DateTime(dateTime)
.plusSeconds(thresholdSecond).plusSeconds(30);
if (judgeDateTime.isBeforeNow()) {
this.offLine(monitorInstance, false); // 心跳超时 → 离线
} else {
if (StringUtils.equals(monitorInstance.getIsOfflineNotice(),
ZeroOrOneConstants.ZERO)) {
this.onLine(monitorInstance); // 心跳恢复 → 上线
}
}
}
}
}
这段代码的核心逻辑可以用一个时间轴来直观理解:
最后心跳时间 判决时间点 当前时间
│ │ │
│◄──── threshold ────►│◄─── 30s 缓冲 ───►│
│ │
│◄───────── judgeDateTime ─────────────►│
│ │
updateTime judgeDateTime.isBeforeNow() = true
→ 判定为离线
判决时间的计算公式:
judgeDateTime = 最后心跳时间 + (connFrequency × threshold) + 30秒缓冲
其中:
connFrequency:该实例配置的心跳频率(秒)threshold:全局监控阈值,默认为 5- 30 秒:固定的网络抖动缓冲时间
以默认配置为例:客户端每 30 秒发一次心跳,阈值为 5,那么判决窗口 = 30 × 5 + 30 = 180 秒。也就是说,如果客户端连续 3 分钟没有心跳,服务端才会判定它离线。这个设计给予了充足的网络波动容忍度。
synchronized 的必要性:虽然 @DisallowConcurrentExecution 阻止了 Quartz 层面的并发,但 IOfflineListener.notifyOffline() 可能被其他线程(如 WebSocket 监听器线程)并发调用。synchronized 确保状态更新操作的原子性。
5.6 离线处理:offLine()
private void offLine(MonitorInstance instance, boolean isOfflineNotice) {
// 先发告警,再更新状态
try {
this.sendAlarmInfo("应用程序离线",
AlarmLevelEnums.FATAL, AlarmReasonEnums.NORMAL_2_ABNORMAL, instance);
} catch (Exception e) {
log.error("应用程序告警异常!", e);
}
boolean isOnline = StringUtils.equals(
instance.getIsOnline(), ZeroOrOneConstants.ONE);
if (isOnline) {
int offlineCount = instance.getOfflineCount() == null
? 0 : instance.getOfflineCount();
instance.setOfflineCount(offlineCount + 1);
instance.setIsOnline(ZeroOrOneConstants.ZERO);
instance.setIsOfflineNotice(
isOfflineNotice ? ZeroOrOneConstants.ONE : ZeroOrOneConstants.ZERO);
this.instanceService.updateById(instance);
}
}
关键细节:
-
先告警再更新:先发送告警通知,再更新数据库状态。这确保了即使在数据库更新失败的情况下,运维人员也能收到告警。
-
幂等的离线处理:
isOnline检查的作用是确保只有当前在线的实例才会执行数据库状态更新(更新isOnline、累加offlineCount),避免重复计数和重复状态写入。至于告警的去重,则由告警子系统(alarmService)在更上层处理。 -
离线次数累加:每次从在线变为离线,
offlineCount加 1。这个统计数据可以在 UI 上展示,帮助运维人员识别不稳定的实例。 -
isOfflineNotice 的区分:
isOfflineNotice参数标识离线来源。如果是收到OfflinePackage导致的离线,值为true(通知离线);如果是心跳超时推断的离线,值为false。
5.7 上线处理:onLine()
private void onLine(MonitorInstance instance) {
try {
if (StringUtils.isBlank(instance.getIsOnline())) {
// 首次发现:发送「发现新应用程序」通知
this.sendAlarmInfo("发现新应用程序",
AlarmLevelEnums.INFO, AlarmReasonEnums.DISCOVERY, instance);
} else {
// 离线恢复:发送「应用程序上线」通知
this.sendAlarmInfo("应用程序上线",
AlarmLevelEnums.INFO, AlarmReasonEnums.ABNORMAL_2_NORMAL, instance);
}
} catch (Exception e) {
log.error("应用程序告警异常!", e);
}
boolean isOnline = StringUtils.equals(
instance.getIsOnline(), ZeroOrOneConstants.ONE);
if (!isOnline) {
instance.setIsOnline(ZeroOrOneConstants.ONE);
instance.setIsOfflineNotice(ZeroOrOneConstants.ZERO);
this.instanceService.updateById(instance);
}
}
这里有一个精妙的细节:isOnline 为空白时的语义。当 IS_ONLINE 字段为 null 或空字符串时,意味着这是刚注册的实例——心跳处理时只设置了 INSERT_TIME 和基础信息,但没有设置 IS_ONLINE。onLine() 方法据此区分「首次发现」和「离线恢复」两种场景,发送不同级别的通知。
为什么心跳超时恢复需要检查 isOfflineNotice?
回到 executeInternal() 中的上线判定逻辑:
else {
if (StringUtils.equals(monitorInstance.getIsOfflineNotice(),
ZeroOrOneConstants.ZERO)) {
this.onLine(monitorInstance);
}
}
如果 isOfflineNotice = "1"(收到了主动下线通知),即使心跳时间在判决窗口内,也不会恢复在线。这是因为:收到 OfflinePackage 意味着客户端主动下线了,此时虽然客户端已不再运行,但由于离线发生得很突然(紧接着最后一次心跳之后),updateTime 仍然处于 judgeDateTime 判决窗口之内。服务端不应仅凭时间戳就将其恢复为在线,必须等到下一次真正的心跳到来。
而正常的心跳处理会将 isOfflineNotice 重置为 "0",所以下一次心跳成功后,定时任务就能正常将其恢复为在线。
六、主动下线:优雅的关闭通知
6.1 OfflinePackage 与监听器回调
当客户端正常关闭时,会向服务端发送 OfflinePackage:
public class OfflinePackage extends BaseRequestPackage {
// 离线类型:可以同时指定多种监控类型的下线
private List<MonitorTypeEnums> monitorTypes;
}
monitorTypes 使用 List 类型是为了支持扩展性,不同的监听器可以关注不同类型的离线事件。当前客户端在构造 OfflinePackage 时只会添加 INSTANCE 类型,InstanceMonitorJob 通过 CollectionUtil.contains(monitorTypes, MonitorTypeEnums.INSTANCE) 判断是否需要处理该离线事件。这种设计使得新增监控类型时,只需在枚举中添加新值并编写对应的监听器即可,无需修改已有代码。
服务端通过 IOfflineListener 接口通知所有关注下线事件的组件。InstanceMonitorJob 实现了该接口:
@Override
public void notifyOffline(OfflinePackage offlinePackage) {
if (!commandLineRunnerHasRun) return;
// 检查是否包含 INSTANCE 类型的下线
if (CollectionUtil.contains(
offlinePackage.getMonitorTypes(), MonitorTypeEnums.INSTANCE)) {
String instanceId = offlinePackage.getInstanceId();
MonitorInstance instance = this.instanceService.getOne(
new LambdaQueryWrapper<MonitorInstance>()
.eq(MonitorInstance::getInstanceId, instanceId));
if (instance != null && StringUtils.equals(
ZeroOrOneConstants.ONE, instance.getIsEnableMonitor())) {
this.offLine(instance, true); // isOfflineNotice = true
}
}
}
与心跳超时推断的离线不同,主动下线的 isOfflineNotice = true,这会影响后续的状态恢复判定逻辑。
6.2 服务端自身的优雅关闭
InstanceMonitorJob 还实现了 DisposableBean,在服务端自身关闭时:
@Override
public void destroy() {
if (!commandLineRunnerHasRun) return;
// 获取服务端自身的实例ID
String instanceId = InstanceGenerator.getInstanceId();
MonitorInstance instance = this.instanceService.getOne(
new LambdaQueryWrapper<MonitorInstance>()
.eq(MonitorInstance::getInstanceId, instanceId));
if (instance != null && StringUtils.equals(
ZeroOrOneConstants.ONE, instance.getIsEnableMonitor())) {
this.offLine(instance, true);
}
}
这段代码解决了监控领域的一个经典问题:监控者自身的状态谁来监控? Phoenix 的答案是——监控服务端自己也是被监控的实例。当服务端关闭时,它会在 destroy() 方法中主动将自己标记为离线,确保集群中的其他服务端不会误判。
七、链路拓扑的维护
在心跳处理之前,无论是 HTTP 通道的 HeartbeatAspect 还是 WebSocket 通道的 HeartbeatListener,都会回调 ILinkListener.wakeUpMonitor()。当前唯一的实现是 LinkMonitor,它负责维护应用实例之间的链路拓扑关系。
HeartbeatPackage 继承自 AbstractSuperPackage,其中包含 Chain 对象:
public class Chain extends AbstractSuperBean {
// 网络链路(IP 层级)
private LinkedHashSet<String> networkChain;
// 应用链路(实例ID 层级)
private LinkedHashSet<String> instanceChain;
// 时间链路(每跳的时间戳)
private LinkedHashSet<String> timeChain;
}
instanceChain 记录了心跳包从客户端到服务端经过的所有实例 ID(可能经过代理端中转)。LinkMonitor 收到回调后,将这条链路信息存入 MONITOR_LINK 表,供 UI 端的拓扑图功能使用。
八、告警信息的构造
sendAlarmInfo() 方法构造的告警信息内容非常详实:
应用ID:{instanceId},应用名称:{instanceName},应用描述:{instanceDesc/instanceSummary},
应用环境:{monitorEnv},应用分组:{monitorGroup},应用端点:{endpoint},
IP地址:{ip},时间:{currentTime}
值得注意的设计细节:
- 告警码的唯一性:使用
instanceId作为告警码(code),确保同一实例的告警在告警收敛时能被正确去重。 - 摘要优先级:
instanceSummary(用户通过 UI 设置)优先于instanceDesc(来自客户端上报),因为用户手动设置的摘要更符合运维视角。 - 三级开关:告警触发需要同时满足——全局配置中实例告警开关打开、该实例的告警开关打开、该实例的监控开关打开。
九、完整的状态流转图
将上述所有逻辑串起来,一个应用实例的完整生命周期如下:
首次心跳到达
│
▼
┌─────────────────────┐
│ 新注册 │
│ IS_ONLINE=null │
│ IS_OFFLINE_NOTICE=0 │
└──────────┬──────────┘
│ onLine() → 发现新应用程序
▼
┌─────────────┐
┌─────────▶ │ 在 线 │
│ │ IS_ONLINE=1 │
│ └──────┬──────┘
│ │
│ ┌──────────┴─────────┐
│ │ │
│ 心跳超时 收到 OfflinePackage
│ │ │
│ ▼ ▼
│ ┌──────────┐ ┌──────────────┐
│ │ 离 线 │ │ 离 线 │
│ │ NOTICE=0 │ │ NOTICE=1 │
│ └────┬─────┘ └──────┬───────┘
│ │ │
│ 心跳恢复 下次心跳到达
│ (无需额外条件) (重置 NOTICE=0)
│ │ │
│ ▼ ▼
│ onLine() 心跳续约后
│ 应用程序上线 再由定时任务判定
└─────┴──────────────────────┘
十、小结
Phoenix 的应用实例管理系统设计得相当完善,体现了几个值得借鉴的工程思想:
- 心跳即注册:免预注册,第一个心跳自动完成实例注册,降低接入成本。
- 多层级的容错设计:
@Retryable应对瞬态故障、@DisallowConcurrentExecution防止并发异常、synchronized保护临界区、超时缓冲容忍网络抖动。 - 启动时的状态重置:服务端重启时刷新所有在线实例的时间戳,避免误告警。这是一个容易被忽视但极其重要的工程细节。
- isOfflineNotice 的状态细分:区分心跳超时和主动下线两种离线场景,使状态流转更加精确,避免状态抖动。
- 关注点分离:
HeartbeatService只做路由、InstanceService只做持久化、InstanceMonitorJob只做状态判定和告警。每个组件职责清晰,便于独立测试和演进。 - 多重身份的 Job 类:将定时扫描、启动初始化、关闭通知、下线监听四个职责集中在一个类中,避免了状态管理逻辑的分散。
下一篇我们将继续深入服务端核心,探讨服务器监控数据的处理——多维信息的入库与更新策略。
项目地址:
https://gitcode.com/monitoring-platform/phoenix
https://gitee.com/monitoring-platform/phoenix
https://github.com/709343767/phoenix

评论