目录

    Phoenix监控平台技术解析(二十二):代理端数据转发与命令执行器模式

    前几篇我们逐一拆解了代理端的各类采集器——服务器信息采集(oshi/Sigar)、Docker 容器监控、网络设备 SNMP 采集。采集只是「生产」数据的第一步,接下来面临两个关键问题:采集到的数据如何可靠地送达服务端?服务端下发的命令如何被正确路由和执行?本篇聚焦代理端的数据转发机制与命令执行器模式,揭开这两个问题背后的设计思路。


    一、代理端的「中转站」角色

    在 Phoenix 的整体架构中,代理端不仅是一个数据采集器,更是客户端与服务端之间的「中转站」。数据流向如下:

    客户端SDK → 代理端 → 服务端
    

    为什么需要这一层中转?主要有三个原因:

    • 网络隔离:客户端(业务应用)可能处于内网环境,无法直接访问服务端,代理端作为网关打通链路
    • 统一管理:多个客户端的数据由代理端统一收集、转发,减少服务端的连接压力
    • 链路追踪:代理端可以对数据进行预处理、日志记录,方便排查问题

    二、三层转发架构

    代理端内部采用 Controller → Client Service → Server Service 的三层设计,职责明确:

    ┌────────────────────────────────────────────────────────┐
    │                        代理端                           │
    │                                                        │
    │  ┌──────────────┐    ┌──────────────┐    ┌──────────┐ │
    │  │  Controller  │───▶│Client Service│───▶│Server    │ │
    │  │  (接收数据)   │    │ (业务处理)    │    │Service   │ │
    │  └──────────────┘    └──────────────┘    │(转发服务端)│ │
    │                                          └──────────┘ │
    └────────────────────────────────────────────────────────┘
    
    • Controller 层business.client.controller):接收客户端发来的各类数据包——心跳、告警、JVM、服务器、Docker、网络设备等
    • Client Service 层business.client.service):处理来自客户端的数据,通过 MethodExecuteHandler 发起转发调用
    • Server Service 层business.server.service):真正执行向服务端的 HTTP 转发

    这种分层使得「接收」和「转发」彻底解耦——Controller 不关心数据最终去哪里,Server Service 不关心数据从哪里来。


    三、命令执行器模式:Invoker 机制

    这是本篇的核心。代理端没有让 Client Service 直接 @Autowired 注入 Server Service 来调用,而是引入了一套「命令执行器」模式。

    3.1 为什么不直接注入 Service 调用?

    直接注入看似简单,但存在问题:Client Service 位于 business.client 包,Server Service 位于 business.server 包,如果直接依赖,两层就耦合在一起了。更关键的是,MethodExecuteHandler 作为一个静态工具类(非 Spring Bean),无法使用 @Autowired。命令执行器模式通过反射 + 注册表的方式,在不引入直接依赖的前提下完成方法调用。

    3.2 Invoker:命令的封装

    Invoker 是命令模式的核心——它封装了「对哪个对象调用哪个方法」:

    @Slf4j
    @Setter
    public class Invoker {
        private Object target;   // 目标对象
        private Method method;   // 目标方法
    
        public static Invoker valueOf(Object target, Method method) {
            Invoker invoker = new Invoker();
            invoker.setTarget(target);
            invoker.setMethod(method);
            return invoker;
        }
    
        public Object invoke(Object... paramValues) throws Exception {
            try {
                return method.invoke(target, paramValues);
            } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
                log.error("代理执行业务方法异常!", e);
                throw new Exception(e.getCause());
            }
        }
    }
    

    设计很精练:valueOf 工厂方法创建实例,invoke 通过反射执行目标方法。调用方只需持有 Invoker 引用,完全不需要知道具体的 Service 类型。

    3.3 InvokerHolder:执行器的注册表

    有了 Invoker,还需要一个地方把它们「存起来」。InvokerHolder 使用二级 Map 结构 [接口类 → [方法名 → Invoker]]

    public class InvokerHolder {
        private static final Map<Class<?>, Map<String, Invoker>> INVOKERS = new HashMap<>();
    
        public static Invoker getInvoker(Class<?> clazz, String method) {
            Map<String, Invoker> map = INVOKERS.get(clazz);
            if (map != null) {
                return map.get(method);
            }
            return null;
        }
    
        public static void addInvoker(Class<?> clazz, String method, Invoker invoker) {
            Map<String, Invoker> map = INVOKERS.computeIfAbsent(clazz, k -> new HashMap<>(16));
            map.put(method, invoker);
        }
    }
    

    查找时只需要两个参数:接口类型 + 方法名。例如 InvokerHolder.getInvoker(IHeartbeatService.class, "sendHeartbeatPackage") 就能精准定位到心跳转发的执行器。

    3.4 自动注册:BusinessAgencyScanner

    手动注册显然不现实——代理端有近十种数据类型需要转发。Phoenix 通过 BeanPostProcessor 机制实现了自动注册:

    首先定义两个标记注解:

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface TargetInf { }    // 标记「这是一个可被代理的接口」
    
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface TargetMethod { } // 标记「这是一个可被代理的方法」
    

    然后在 BusinessAgencyScanner 中扫描并注册:

    @Component
    public class BusinessAgencyScanner implements BeanPostProcessor {
    
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) {
            Class<?> clazz = bean.getClass();
            String clazzName = clazz.getName();
            // 处理内部类(类名含 "$")的情况
            if (StringUtils.contains(clazzName, "$")) {
                String[] clazzNames = StringUtils.split(clazzName, "$");
                for (String name : clazzNames) {
                    try {
                        Class<?> tempClazz = Class.forName(name);
                        this.register(bean, tempClazz);
                    } catch (ClassNotFoundException ignored) { }
                }
            } else {
                this.register(bean, clazz);
            }
            return bean;
        }
    
        private void register(Object bean, Class<?> clazz) {
            Class<?>[] interfaces = clazz.getInterfaces();
            for (Class<?> interFace : interfaces) {
                TargetInf targetInf = interFace.getAnnotation(TargetInf.class);
                if (targetInf == null) { continue; }
                Method[] methods = interFace.getMethods();
                for (Method method : methods) {
                    TargetMethod targetInfMethod = method.getAnnotation(TargetMethod.class);
                    if (targetInfMethod == null) { continue; }
                    Invoker invoker = Invoker.valueOf(bean, method);
                    // 防止重复注册
                    if (InvokerHolder.getInvoker(interFace, method.getName()) == null) {
                        InvokerHolder.addInvoker(interFace, method.getName(), invoker);
                    } else {
                        log.error("重复注册执行器!");
                    }
                }
            }
        }
    }
    

    Spring 容器启动时,每个 Bean 初始化完成后都会经过 BusinessAgencyScanner。扫描器还考虑了两个细节:内部类处理(类名含 $ 时逐段解析)和重复注册检查(避免同一个 Invoker 被注册两次)。如果某个 Bean 实现了标注 @TargetInf 的接口,其中标注 @TargetMethod 的方法就会被自动注册为 Invoker。零配置,全自动。


    四、MethodExecuteHandler:转发的枢纽

    MethodExecuteHandler 是代理端转发逻辑的中枢。每种数据类型对应一个 sendXxxPackage2Server() 方法,内部统一通过 execute() 执行:

    public class MethodExecuteHandler {
    
        // 双重身份的包构造器:非 Spring Bean 通过静态单例获取
        private static final AgentPackageConstructor AGENT_PACKAGE_CONSTRUCTOR = AgentPackageConstructor.getInstance();
    
        public static BaseResponsePackage sendHeartbeatPackage2Server(HeartbeatPackage heartbeatPackage) {
            Invoker invoker = InvokerHolder.getInvoker(IHeartbeatService.class, "sendHeartbeatPackage");
            return execute(invoker, heartbeatPackage);
        }
    
        public static BaseResponsePackage execute(Invoker invoker, Object... objects) {
            BaseResponsePackage responsePackage;
            try {
                assert invoker != null;
                Object object = invoker.invoke(objects);
                responsePackage = (BaseResponsePackage) object;
            } catch (Exception e) {
                Result result = Result.builder().isSuccess(false).msg(e.getMessage()).build();
                responsePackage = AGENT_PACKAGE_CONSTRUCTOR.structureBaseResponsePackage(result);
            }
            return responsePackage;
        }
    }
    

    注意 AGENT_PACKAGE_CONSTRUCTOR 的获取方式——AgentPackageConstructor 同时具备**Spring Bean(@Component)和饿汉单例(getInstance())**双重身份。在 Spring 管理的类中可通过 @Autowired 注入,而在 MethodExecuteHandler 这样的静态工具类中,则通过 getInstance() 获取单例。这种设计解决了「非 Spring Bean 无法使用依赖注入」的实际问题。

    这里有三个设计亮点:

    1. 统一异常处理execute 方法捕获所有异常并包装为失败响应,调用方无需关心底层网络错误
    2. 开闭原则:新增数据类型只需添加一个 sendXxx 方法 + 对应的 @TargetInf 接口,不修改已有代码
    3. 泛型抽象演进:最新版本还引入了 send(T data, Class<T> dataType) 泛型方法,进一步统一了数据包的构建与发送流程,调用方只需传入数据对象和类型即可,无需关心具体的包构建逻辑

    五、HTTP 转发:加密传输与失败重试

    Server Service 最终通过 HttpServiceImpl 完成 HTTP 转发,流程分四步:

    @Service
    public class HttpServiceImpl implements IHttpService {
    
        @Autowired
        private RestTemplate restTemplate;
    
        @Override
        @Retryable  // Spring Retry:失败自动重试
        public BaseResponsePackage sendHttpPost(String json, String url) {
            // 1. 可选压缩 + 加密:明文JSON → CiphertextPackage
            CiphertextPackage requestCiphertextPackage = MsgPayloadUtils.encryptPayloadTo(json);
            // 2. 构造HTTP请求
            HttpHeaders headers = new HttpHeaders();
            headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
            HttpEntity<CiphertextPackage> entity = new HttpEntity<>(requestCiphertextPackage, headers);
            // 3. 发送POST请求
            ResponseEntity<CiphertextPackage> responseEntity =
                    this.restTemplate.exchange(url, HttpMethod.POST, entity, CiphertextPackage.class);
            // 4. 解密响应(如有压缩则自动解压)
            CiphertextPackage responseciphertextPackage = Objects.requireNonNull(responseEntity.getBody());
            String decryptStr = MsgPayloadUtils.decryptPayloadFrom(responseciphertextPackage);
            return JSON.parseObject(decryptStr, BaseResponsePackage.class);
        }
    }
    
    步骤 作用 关键类/方法
    可选压缩 + 加密 按阈值判断是否 Gzip 压缩,再按配置算法加密 MsgPayloadUtils.encryptPayloadTo()
    构造请求 封装为 Spring HTTP 实体 HttpEntity<CiphertextPackage>
    发送请求 RestTemplate 连接池发送 restTemplate.exchange()
    解密 + 可选解压 按配置算法解密,按标志位判断是否 Gzip 解压 MsgPayloadUtils.decryptPayloadFrom()

    @Retryable 注解让方法在网络抖动时自动重试,无需手写重试逻辑。

    加密算法并非固定为 AES,而是通过配置文件动态选择。SecureUtils 内部通过 InitSecure 的静态初始化块,从配置属性 monitoring.secure.encryption-algorithm-type 加载加密类型,然后通过 SecurerEnums 枚举动态派发到对应的算法实现:

    // SecureUtils 加密入口:根据配置的 SECRET_TYPE 动态选择算法
    public static String encrypt(String str, Charset charset) {
        if (StringUtils.isBlank(SECRET_TYPE)) {
            return str;  // 未配置则不加密
        }
        return SecurerEnums.valueOf(StringUtils.upperCase(SECRET_TYPE)).encrypt(str, charset);
    }
    

    目前支持三种加密算法:

    算法 实现类 说明
    DES DesEncryptUtils 经典对称加密
    AES AesEncryptUtils 高级加密标准
    SM4 Sm4EncryptUtils 国密算法

    若配置中未指定加密类型,系统将自动禁用加密,直接传输明文。

    另外,MsgPayloadUtils 内部通过 ZipUtils.isNeedGzip() 按数据大小阈值判断是否需要 Gzip 压缩——小数据包直接加密,大数据包先压缩再加密。CiphertextPackage 中的 isUnGzipEnabled 标志位告诉接收方是否需要解压。


    六、一个完整的转发周期

    以心跳包为例,完整链路如下:

    客户端SDK
        │
        │ HTTP POST(密文)
        ▼
    HeartbeatController(代理端)
        │ @RequestBody 自动解密反序列化 → 获取明文 HeartbeatPackage
        ▼
    Client HeartbeatService
        │ 调用 MethodExecuteHandler.sendHeartbeatPackage2Server()
        ▼
    MethodExecuteHandler
        │ InvokerHolder.getInvoker(IHeartbeatService.class, "sendHeartbeatPackage")
        │ → 获取 Invoker
        │ → invoker.invoke(heartbeatPackage)  // 反射调用
        ▼
    Server HeartbeatServiceImpl
        │ 添加链路信息 → 序列化 → httpService.sendHttpPost(json, url)
        ▼
    HttpServiceImpl
        │ 可选压缩 + 配置算法加密 → RestTemplate POST → 解密 + 可选解压
        ▼
    服务端(接收并处理)
    

    注意:Controller 层的解密并非手动完成——Spring MVC 框架通过自定义的消息转换器(或请求拦截器)自动将密文请求体解密并反序列化为 Java 对象,Controller 的 @RequestBody 拿到的已经是明文对象。

    整个过程中,Controller 只管接收,Client Service 只管调度,Server Service 只管发送——每一层都只做自己该做的事。


    七、传输通道的演进:从 HTTP 到 WebSocket

    值得关注的是,代理端的数据传输通道正在经历一次重要的架构演进。目前源码中,多个 HTTP 方式的服务接口(如 IHeartbeatServiceIAlarmServiceIServerService)及其实现类、MethodExecuteHandler 中的多个 sendXxxPackage2Server() 方法已标注 @Deprecated

    以 Docker 数据为例,DockerServiceImpl 已不再走 HTTP 通道,而是改为 WebSocket 方式传输:

    @Service
    public class DockerServiceImpl implements IDockerService {
    
        @Override
        public BaseResponsePackage sendDockerPackage(DockerPackage dockerPackage) {
            if (!DataExchanger.isReady()) {
                Result result = Result.builder().isSuccess(false).msg("数据交换器未准备好,请稍后再试!").build();
                return this.agentPackageConstructor.structureBaseResponsePackage(result);
            }
            // 添加链路信息
            dockerPackage.setChain(this.agentPackageConstructor.getChain(dockerPackage));
            // 通过 WebSocket 通道发送,而非 HTTP
            WebSocketPackage requestPackage = new WebSocketPackage();
            requestPackage.setClassName(DockerPackage.class.getName());
            requestPackage.setPayload(dockerPackage);
            DataExchanger.sendMessage(requestPackage);
            Result result = Result.builder().isSuccess(true).msg(ResultMsgConstants.SUCCESS).build();
            return this.agentPackageConstructor.structureBaseResponsePackage(result);
        }
    }
    

    相比 HTTP 的请求-响应模式,WebSocket 通道具有以下优势:

    • 实时性更强:Docker Stats 等持续监控数据天然适合流式推送,无需反复建连
    • 双向通信:服务端可以主动下发命令,不再受限于 HTTP 的「客户端发起」模型
    • 连接复用:一条长连接承载多种数据类型,减少 TCP 握手和 TLS 协商开销

    这一演进趋势表明,Phoenix 正在将数据传输逐步从 HTTP 迁移到 WebSocket,命令执行器模式(Invoker 机制)作为调用层的抽象,在这一迁移过程中充当了稳定的「缓冲层」——上层调用方式不变,只需替换底层 Server Service 的实现。


    八、小结

    本篇深入分析了代理端的数据转发机制,核心要点如下:

    • 三层转发架构:Controller → Client Service → Server Service,层次清晰、职责单一,接收与发送彻底解耦
    • 命令执行器模式:Invoker + InvokerHolder + BusinessAgencyScanner 三件套,实现了转发逻辑的松耦合调用
    • MethodExecuteHandler 枢纽:统一入口、统一异常处理,新增数据类型只需扩展不需修改;最新版本的泛型 send 方法进一步提升了抽象程度
    • AgentPackageConstructor 双重身份:同时作为 Spring Bean 和饿汉单例,打通了 Spring 管理类与静态工具类之间的访问通道
    • 加密传输与自动重试:HttpServiceImpl 通过可选 Gzip 压缩 + 可配置加密算法(支持 DES/AES/SM4 三种)保证安全,@Retryable 保证可靠
    • 注解驱动零配置注册:@TargetInf + @TargetMethod + BeanPostProcessor,Spring 启动时全自动完成 Invoker 注册,并内置重复注册检查
    • 传输通道演进:部分数据类型(如 Docker)已从 HTTP 迁移到 WebSocket,Invoker 机制作为稳定的调用抽象层,使得底层传输方式的切换对上层透明

    下一篇我们将转向服务端,探讨服务端的数据接收架构——代理端把数据发出去了,服务端是如何通过 Controller 层设计统一接收并分发处理这些数据的?敬请期待。


    项目地址
    https://gitcode.com/monitoring-platform/phoenix
    https://gitee.com/monitoring-platform/phoenix
    https://github.com/709343767/phoenix

    欢迎关注微信公众号获取更多技术干货
    微信公众号·披锋斩棘

    end
  1. 作者: 锋哥 (联系作者)
  2. 发表时间: 2026-04-20 15:04
  3. 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  4. 转载声明:如果是转载博主转载的文章,请附上原文链接
  5. 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  6. 评论

    站长头像 知录

    你一句春不晚,我就到了真江南!

    文章0
    浏览0

    文章分类

    标签云