前几篇我们逐一拆解了代理端的各类采集器——服务器信息采集(oshi/Sigar)、Docker 容器监控、网络设备 SNMP 采集。采集只是「生产」数据的第一步,接下来面临两个关键问题:采集到的数据如何可靠地送达服务端?服务端下发的命令如何被正确路由和执行?本篇聚焦代理端的数据转发机制与命令执行器模式,揭开这两个问题背后的设计思路。
一、代理端的「中转站」角色
在 Phoenix 的整体架构中,代理端不仅是一个数据采集器,更是客户端与服务端之间的「中转站」。数据流向如下:
客户端SDK → 代理端 → 服务端
为什么需要这一层中转?主要有三个原因:
- 网络隔离:客户端(业务应用)可能处于内网环境,无法直接访问服务端,代理端作为网关打通链路
- 统一管理:多个客户端的数据由代理端统一收集、转发,减少服务端的连接压力
- 链路追踪:代理端可以对数据进行预处理、日志记录,方便排查问题
二、三层转发架构
代理端内部采用 Controller → Client Service → Server Service 的三层设计,职责明确:
┌────────────────────────────────────────────────────────┐
│ 代理端 │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────┐ │
│ │ Controller │───▶│Client Service│───▶│Server │ │
│ │ (接收数据) │ │ (业务处理) │ │Service │ │
│ └──────────────┘ └──────────────┘ │(转发服务端)│ │
│ └──────────┘ │
└────────────────────────────────────────────────────────┘
- Controller 层(
business.client.controller):接收客户端发来的各类数据包——心跳、告警、JVM、服务器、Docker、网络设备等 - Client Service 层(
business.client.service):处理来自客户端的数据,通过MethodExecuteHandler发起转发调用 - Server Service 层(
business.server.service):真正执行向服务端的 HTTP 转发
这种分层使得「接收」和「转发」彻底解耦——Controller 不关心数据最终去哪里,Server Service 不关心数据从哪里来。
三、命令执行器模式:Invoker 机制
这是本篇的核心。代理端没有让 Client Service 直接 @Autowired 注入 Server Service 来调用,而是引入了一套「命令执行器」模式。
3.1 为什么不直接注入 Service 调用?
直接注入看似简单,但存在问题:Client Service 位于 business.client 包,Server Service 位于 business.server 包,如果直接依赖,两层就耦合在一起了。更关键的是,MethodExecuteHandler 作为一个静态工具类(非 Spring Bean),无法使用 @Autowired。命令执行器模式通过反射 + 注册表的方式,在不引入直接依赖的前提下完成方法调用。
3.2 Invoker:命令的封装
Invoker 是命令模式的核心——它封装了「对哪个对象调用哪个方法」:
@Slf4j
@Setter
public class Invoker {
private Object target; // 目标对象
private Method method; // 目标方法
public static Invoker valueOf(Object target, Method method) {
Invoker invoker = new Invoker();
invoker.setTarget(target);
invoker.setMethod(method);
return invoker;
}
public Object invoke(Object... paramValues) throws Exception {
try {
return method.invoke(target, paramValues);
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
log.error("代理执行业务方法异常!", e);
throw new Exception(e.getCause());
}
}
}
设计很精练:valueOf 工厂方法创建实例,invoke 通过反射执行目标方法。调用方只需持有 Invoker 引用,完全不需要知道具体的 Service 类型。
3.3 InvokerHolder:执行器的注册表
有了 Invoker,还需要一个地方把它们「存起来」。InvokerHolder 使用二级 Map 结构 [接口类 → [方法名 → Invoker]]:
public class InvokerHolder {
private static final Map<Class<?>, Map<String, Invoker>> INVOKERS = new HashMap<>();
public static Invoker getInvoker(Class<?> clazz, String method) {
Map<String, Invoker> map = INVOKERS.get(clazz);
if (map != null) {
return map.get(method);
}
return null;
}
public static void addInvoker(Class<?> clazz, String method, Invoker invoker) {
Map<String, Invoker> map = INVOKERS.computeIfAbsent(clazz, k -> new HashMap<>(16));
map.put(method, invoker);
}
}
查找时只需要两个参数:接口类型 + 方法名。例如 InvokerHolder.getInvoker(IHeartbeatService.class, "sendHeartbeatPackage") 就能精准定位到心跳转发的执行器。
3.4 自动注册:BusinessAgencyScanner
手动注册显然不现实——代理端有近十种数据类型需要转发。Phoenix 通过 BeanPostProcessor 机制实现了自动注册:
首先定义两个标记注解:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TargetInf { } // 标记「这是一个可被代理的接口」
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TargetMethod { } // 标记「这是一个可被代理的方法」
然后在 BusinessAgencyScanner 中扫描并注册:
@Component
public class BusinessAgencyScanner implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
Class<?> clazz = bean.getClass();
String clazzName = clazz.getName();
// 处理内部类(类名含 "$")的情况
if (StringUtils.contains(clazzName, "$")) {
String[] clazzNames = StringUtils.split(clazzName, "$");
for (String name : clazzNames) {
try {
Class<?> tempClazz = Class.forName(name);
this.register(bean, tempClazz);
} catch (ClassNotFoundException ignored) { }
}
} else {
this.register(bean, clazz);
}
return bean;
}
private void register(Object bean, Class<?> clazz) {
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> interFace : interfaces) {
TargetInf targetInf = interFace.getAnnotation(TargetInf.class);
if (targetInf == null) { continue; }
Method[] methods = interFace.getMethods();
for (Method method : methods) {
TargetMethod targetInfMethod = method.getAnnotation(TargetMethod.class);
if (targetInfMethod == null) { continue; }
Invoker invoker = Invoker.valueOf(bean, method);
// 防止重复注册
if (InvokerHolder.getInvoker(interFace, method.getName()) == null) {
InvokerHolder.addInvoker(interFace, method.getName(), invoker);
} else {
log.error("重复注册执行器!");
}
}
}
}
}
Spring 容器启动时,每个 Bean 初始化完成后都会经过 BusinessAgencyScanner。扫描器还考虑了两个细节:内部类处理(类名含 $ 时逐段解析)和重复注册检查(避免同一个 Invoker 被注册两次)。如果某个 Bean 实现了标注 @TargetInf 的接口,其中标注 @TargetMethod 的方法就会被自动注册为 Invoker。零配置,全自动。
四、MethodExecuteHandler:转发的枢纽
MethodExecuteHandler 是代理端转发逻辑的中枢。每种数据类型对应一个 sendXxxPackage2Server() 方法,内部统一通过 execute() 执行:
public class MethodExecuteHandler {
// 双重身份的包构造器:非 Spring Bean 通过静态单例获取
private static final AgentPackageConstructor AGENT_PACKAGE_CONSTRUCTOR = AgentPackageConstructor.getInstance();
public static BaseResponsePackage sendHeartbeatPackage2Server(HeartbeatPackage heartbeatPackage) {
Invoker invoker = InvokerHolder.getInvoker(IHeartbeatService.class, "sendHeartbeatPackage");
return execute(invoker, heartbeatPackage);
}
public static BaseResponsePackage execute(Invoker invoker, Object... objects) {
BaseResponsePackage responsePackage;
try {
assert invoker != null;
Object object = invoker.invoke(objects);
responsePackage = (BaseResponsePackage) object;
} catch (Exception e) {
Result result = Result.builder().isSuccess(false).msg(e.getMessage()).build();
responsePackage = AGENT_PACKAGE_CONSTRUCTOR.structureBaseResponsePackage(result);
}
return responsePackage;
}
}
注意 AGENT_PACKAGE_CONSTRUCTOR 的获取方式——AgentPackageConstructor 同时具备**Spring Bean(@Component)和饿汉单例(getInstance())**双重身份。在 Spring 管理的类中可通过 @Autowired 注入,而在 MethodExecuteHandler 这样的静态工具类中,则通过 getInstance() 获取单例。这种设计解决了「非 Spring Bean 无法使用依赖注入」的实际问题。
这里有三个设计亮点:
- 统一异常处理:
execute方法捕获所有异常并包装为失败响应,调用方无需关心底层网络错误 - 开闭原则:新增数据类型只需添加一个
sendXxx方法 + 对应的@TargetInf接口,不修改已有代码 - 泛型抽象演进:最新版本还引入了
send(T data, Class<T> dataType)泛型方法,进一步统一了数据包的构建与发送流程,调用方只需传入数据对象和类型即可,无需关心具体的包构建逻辑
五、HTTP 转发:加密传输与失败重试
Server Service 最终通过 HttpServiceImpl 完成 HTTP 转发,流程分四步:
@Service
public class HttpServiceImpl implements IHttpService {
@Autowired
private RestTemplate restTemplate;
@Override
@Retryable // Spring Retry:失败自动重试
public BaseResponsePackage sendHttpPost(String json, String url) {
// 1. 可选压缩 + 加密:明文JSON → CiphertextPackage
CiphertextPackage requestCiphertextPackage = MsgPayloadUtils.encryptPayloadTo(json);
// 2. 构造HTTP请求
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
HttpEntity<CiphertextPackage> entity = new HttpEntity<>(requestCiphertextPackage, headers);
// 3. 发送POST请求
ResponseEntity<CiphertextPackage> responseEntity =
this.restTemplate.exchange(url, HttpMethod.POST, entity, CiphertextPackage.class);
// 4. 解密响应(如有压缩则自动解压)
CiphertextPackage responseciphertextPackage = Objects.requireNonNull(responseEntity.getBody());
String decryptStr = MsgPayloadUtils.decryptPayloadFrom(responseciphertextPackage);
return JSON.parseObject(decryptStr, BaseResponsePackage.class);
}
}
| 步骤 | 作用 | 关键类/方法 |
|---|---|---|
| 可选压缩 + 加密 | 按阈值判断是否 Gzip 压缩,再按配置算法加密 | MsgPayloadUtils.encryptPayloadTo() |
| 构造请求 | 封装为 Spring HTTP 实体 | HttpEntity<CiphertextPackage> |
| 发送请求 | RestTemplate 连接池发送 | restTemplate.exchange() |
| 解密 + 可选解压 | 按配置算法解密,按标志位判断是否 Gzip 解压 | MsgPayloadUtils.decryptPayloadFrom() |
@Retryable 注解让方法在网络抖动时自动重试,无需手写重试逻辑。
加密算法并非固定为 AES,而是通过配置文件动态选择。
SecureUtils内部通过InitSecure的静态初始化块,从配置属性monitoring.secure.encryption-algorithm-type加载加密类型,然后通过SecurerEnums枚举动态派发到对应的算法实现:// SecureUtils 加密入口:根据配置的 SECRET_TYPE 动态选择算法 public static String encrypt(String str, Charset charset) { if (StringUtils.isBlank(SECRET_TYPE)) { return str; // 未配置则不加密 } return SecurerEnums.valueOf(StringUtils.upperCase(SECRET_TYPE)).encrypt(str, charset); }目前支持三种加密算法:
算法 实现类 说明 DES DesEncryptUtils经典对称加密 AES AesEncryptUtils高级加密标准 SM4 Sm4EncryptUtils国密算法 若配置中未指定加密类型,系统将自动禁用加密,直接传输明文。
另外,
MsgPayloadUtils内部通过ZipUtils.isNeedGzip()按数据大小阈值判断是否需要 Gzip 压缩——小数据包直接加密,大数据包先压缩再加密。CiphertextPackage中的isUnGzipEnabled标志位告诉接收方是否需要解压。
六、一个完整的转发周期
以心跳包为例,完整链路如下:
客户端SDK
│
│ HTTP POST(密文)
▼
HeartbeatController(代理端)
│ @RequestBody 自动解密反序列化 → 获取明文 HeartbeatPackage
▼
Client HeartbeatService
│ 调用 MethodExecuteHandler.sendHeartbeatPackage2Server()
▼
MethodExecuteHandler
│ InvokerHolder.getInvoker(IHeartbeatService.class, "sendHeartbeatPackage")
│ → 获取 Invoker
│ → invoker.invoke(heartbeatPackage) // 反射调用
▼
Server HeartbeatServiceImpl
│ 添加链路信息 → 序列化 → httpService.sendHttpPost(json, url)
▼
HttpServiceImpl
│ 可选压缩 + 配置算法加密 → RestTemplate POST → 解密 + 可选解压
▼
服务端(接收并处理)
注意:Controller 层的解密并非手动完成——Spring MVC 框架通过自定义的消息转换器(或请求拦截器)自动将密文请求体解密并反序列化为 Java 对象,Controller 的 @RequestBody 拿到的已经是明文对象。
整个过程中,Controller 只管接收,Client Service 只管调度,Server Service 只管发送——每一层都只做自己该做的事。
七、传输通道的演进:从 HTTP 到 WebSocket
值得关注的是,代理端的数据传输通道正在经历一次重要的架构演进。目前源码中,多个 HTTP 方式的服务接口(如 IHeartbeatService、IAlarmService、IServerService)及其实现类、MethodExecuteHandler 中的多个 sendXxxPackage2Server() 方法已标注 @Deprecated。
以 Docker 数据为例,DockerServiceImpl 已不再走 HTTP 通道,而是改为 WebSocket 方式传输:
@Service
public class DockerServiceImpl implements IDockerService {
@Override
public BaseResponsePackage sendDockerPackage(DockerPackage dockerPackage) {
if (!DataExchanger.isReady()) {
Result result = Result.builder().isSuccess(false).msg("数据交换器未准备好,请稍后再试!").build();
return this.agentPackageConstructor.structureBaseResponsePackage(result);
}
// 添加链路信息
dockerPackage.setChain(this.agentPackageConstructor.getChain(dockerPackage));
// 通过 WebSocket 通道发送,而非 HTTP
WebSocketPackage requestPackage = new WebSocketPackage();
requestPackage.setClassName(DockerPackage.class.getName());
requestPackage.setPayload(dockerPackage);
DataExchanger.sendMessage(requestPackage);
Result result = Result.builder().isSuccess(true).msg(ResultMsgConstants.SUCCESS).build();
return this.agentPackageConstructor.structureBaseResponsePackage(result);
}
}
相比 HTTP 的请求-响应模式,WebSocket 通道具有以下优势:
- 实时性更强:Docker Stats 等持续监控数据天然适合流式推送,无需反复建连
- 双向通信:服务端可以主动下发命令,不再受限于 HTTP 的「客户端发起」模型
- 连接复用:一条长连接承载多种数据类型,减少 TCP 握手和 TLS 协商开销
这一演进趋势表明,Phoenix 正在将数据传输逐步从 HTTP 迁移到 WebSocket,命令执行器模式(Invoker 机制)作为调用层的抽象,在这一迁移过程中充当了稳定的「缓冲层」——上层调用方式不变,只需替换底层 Server Service 的实现。
八、小结
本篇深入分析了代理端的数据转发机制,核心要点如下:
- 三层转发架构:Controller → Client Service → Server Service,层次清晰、职责单一,接收与发送彻底解耦
- 命令执行器模式:Invoker + InvokerHolder + BusinessAgencyScanner 三件套,实现了转发逻辑的松耦合调用
- MethodExecuteHandler 枢纽:统一入口、统一异常处理,新增数据类型只需扩展不需修改;最新版本的泛型
send方法进一步提升了抽象程度 - AgentPackageConstructor 双重身份:同时作为 Spring Bean 和饿汉单例,打通了 Spring 管理类与静态工具类之间的访问通道
- 加密传输与自动重试:HttpServiceImpl 通过可选 Gzip 压缩 + 可配置加密算法(支持 DES/AES/SM4 三种)保证安全,@Retryable 保证可靠
- 注解驱动零配置注册:@TargetInf + @TargetMethod + BeanPostProcessor,Spring 启动时全自动完成 Invoker 注册,并内置重复注册检查
- 传输通道演进:部分数据类型(如 Docker)已从 HTTP 迁移到 WebSocket,Invoker 机制作为稳定的调用抽象层,使得底层传输方式的切换对上层透明
下一篇我们将转向服务端,探讨服务端的数据接收架构——代理端把数据发出去了,服务端是如何通过 Controller 层设计统一接收并分发处理这些数据的?敬请期待。
项目地址:
https://gitcode.com/monitoring-platform/phoenix
https://gitee.com/monitoring-platform/phoenix
https://github.com/709343767/phoenix

评论