欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > 夯实 kafka 系列|第六章:自定义注解 @EvalEventListener 开发

夯实 kafka 系列|第六章:自定义注解 @EvalEventListener 开发

2025/6/27 1:09:53 来源:https://blog.csdn.net/xiewenfeng520/article/details/146562443  浏览:    关键词:夯实 kafka 系列|第六章:自定义注解 @EvalEventListener 开发

夯实 kafka 系列|第六章:自定义注解 @EvalEventListener 开发

文章目录

  • 夯实 kafka 系列|第六章:自定义注解 @EvalEventListener 开发
    • 1.前言
    • 2.需求
    • 3.思路
      • 3.1 `@kafkaListener` 注解的实现
        • 3.1.1 KafkaListenerAnnotationBeanPostProcessor(重点)
        • 3.1.2 监听器端点注册
        • 3.1.3 监听器容器创建
        • 3.1.4 动态代理生成(重点)
        • 3.1.5 总结
      • 3.2 参考实现
    • 4.开发
      • 3.1 @EvalEventListener 注解
      • 3.2 EvalEventAnnotationDefinition 注解定义
      • 3.3 EvalEventListenerAnnotationBeanPostProcessor 初始化后处理器
      • 3.4 EvalEventListenerAnnotationThread 消费逻辑
    • 5.测试
      • 5.1 测试代码
      • 5.2 源码调试
      • 5.3 测试结果

1.前言

在文章 《夯实 kafka 系列|第五章:基于 kafka 分布式事件框架 eval-event》 留了一个功能未实现

  • @EvalEventListener 监听分布式事件

本文来讨论这个自定义注解如何开发。

源码已更新到 github

  • https://github.com/huajiexiewenfeng/eval-event

2.需求

通过 @EvalEventListener 监听到分布式事件 MyEvent(自定义事件)

@EvalEventListener 
public void onEvalEvent(MyEvent event){...
}

3.思路

3.1 @kafkaListener 注解的实现

我们先看看 kafka 官方如何实现 @kafkaListener,然后参照这个来进行实现。

3.1.1 KafkaListenerAnnotationBeanPostProcessor(重点)
  • 作用:在 Spring 容器初始化时扫描所有 Bean,识别带有 @KafkaListener 注解的方法。
  • 流程
    1. 遍历所有 Bean 的类方法。
    2. 通过反射检查方法是否标注 @KafkaListener
    3. 将符合条件的监听方法封装为 MethodKafkaListenerEndpoint
3.1.2 监听器端点注册
  • 将解析后的端点信息注册到 KafkaListenerEndpointRegistry(管理所有监听容器的中央注册表)。
3.1.3 监听器容器创建
  • 容器工厂 ConcurrentKafkaListenerContainerFactory
  • 作用:根据 @KafkaListener 的配置创建 ConcurrentMessageListenerContainer
3.1.4 动态代理生成(重点)
  • 对带有 @KafkaListener 的方法生成动态代理,使其具备消息处理能力。
3.1.5 总结

一共分为三步

  • 在 Bean 的初始化后生命周期阶段,找到所有标注 @KafkaListener 的方法和类
  • 整理好所有需要的元数据信息(Bean Method topic 等等),用于动态代理+消费 kafka 消息
  • 动态代理,执行类的方法,即标注 @KafkaListener 的方法

3.2 参考实现

我们大致实现流程如下:

  • EvalEventListenerAnnotationBeanPostProcessor 实现 BeanPostProcessor

  • 遍历所有 Bean 的类方法,通过反射检查方法是否标注 @EvalEventListener

  • 找到 topic 和 class 映射关系

  • 启动线程,拉取 topic 消息

  • 找到 @EvalEventListener 对应的 class,以及 method

  • 反射执行目标类的 method 方法

4.开发

3.1 @EvalEventListener 注解

package com.csdn.event.kafka.annotation;import java.lang.annotation.*;@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EvalEventListener {/*** 指定监听的事件类型(默认根据方法参数推断)*/Class<?>[] eventType() default {};
}

3.2 EvalEventAnnotationDefinition 注解定义

用于存储 Bean Method topic 等等信息

public class EvalEventAnnotationDefinition {private String topic;// 事件的目标类private Class<?> targetClass;// 事件的类型private Class<?> eventClass;private Method method;...
}

3.3 EvalEventListenerAnnotationBeanPostProcessor 初始化后处理器

  • 找到标注 @EvalEventListener 类和方法
  • 循环执行 processEvalEventListener 方法
    • 封装 EvalEventAnnotationDefinition 信息
    • 启动 EvalEventListenerAnnotationThread 线程
/*** This class processes beans after their initialization to find methods annotated with* {@link EvalEventListener} and register them as event listeners.*/
@Component
public class EvalEventListenerAnnotationBeanPostProcessor<T extends EvalEvent> implements BeanPostProcessor, ApplicationContextAware {...@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);Map<Method, Set<EvalEventListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<EvalEventListener>>) method -> {Set<EvalEventListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (annotatedMethods.isEmpty()) {// No methods foundlogger.debug("No @EvalEventListener annotations found on bean '{}'", beanName);} else {// Non-empty set of methodsfor (Map.Entry<Method, Set<EvalEventListener>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (EvalEventListener listener : entry.getValue()) {processEvalEventListener(listener, method, bean);}}logger.debug(" {} @KafkaListener methods processed on bean '{}'", annotatedMethods.size(), beanName + ": " + annotatedMethods);}return bean;}private void processEvalEventListener(EvalEventListener listener, Method method, Object bean) {// topicClass<?>[] eventTypeArr = listener.eventType();Class<?> eventType = eventTypeArr.length > 0 ? eventTypeArr[0] : null;if (eventType == null) {// 从方法参数中获取eventType = method.getParameterTypes()[0];}if (eventType == null) {throw new IllegalStateException("Event type not specified for @EvalEventListener on method: " + method);}try {Object o = eventType.newInstance();if (!(o instanceof EvalEvent)) {throw new IllegalStateException("Event type must be a subclass of EvalEvent: " + eventType);}EvalEvent event = (EvalEvent) o;EvalEventAnnotationDefinition evalEventDefinition = new EvalEventAnnotationDefinition(event.getTopic(), bean.getClass(), eventType, method);EvalEventListenerAnnotationThread<T> evalEventListenerThread =new EvalEventListenerAnnotationThread<>(evalEventDefinition, eventKafkaConsumerFactory, applicationContext);evalEventListenerThread.start();} catch (Exception e) {throw new RuntimeException(e);}}
...
}

3.4 EvalEventListenerAnnotationThread 消费逻辑

  • 创建KafkaConsumer
  • 拉取 kafka 消息
  • 动态代理执行目标类的方法
  • ack 提交
public class EvalEventListenerAnnotationThread<T extends EvalEvent> extends Thread {...@Overridepublic void run() {// 1. 创建KafkaConsumerKafkaConsumer<String, ?> consumer;try {consumer = eventKafkaConsumerFactory.buildKafkaConsumer(evalEventAnnotationDefinition.getTargetClass());List<String> topicList = new ArrayList<>();topicList.add(evalEventAnnotationDefinition.getTopic());consumer.subscribe(topicList);} catch (Exception e) {log.error("KafkaConsumer构造失败", e);e.printStackTrace();return;}// 2. 消费消息try {while (true) {try {// 3. 拉取消息ConsumerRecords<String, ?> records = consumer.poll(Duration.ofMillis(500));if (records.isEmpty()) {continue;}// 4. 处理消息dispatch(records);// 5. 使用异步提交规避阻塞consumer.commitAsync();} catch (Exception e) {log.error("消息处理异常", e);}}} finally {try {// 6.最后一次提交使用同步阻塞式提交consumer.commitSync();} finally {consumer.close();}}}private void dispatch(ConsumerRecords<String, ?> records) {for (ConsumerRecord<String, ?> record : records) {try {Object data = record.value();if (data == null) {log.warn("接收到空消息记录,跳过处理");continue;}// 转换事件数据为指定类型T event = (T) ConvertUtil.convertEvent(data, evalEventAnnotationDefinition.getEventClass());if (event == null) {log.error("事件数据转换失败,跳过处理");continue;}Class<?> targetClass = evalEventAnnotationDefinition.getTargetClass();Method method = evalEventAnnotationDefinition.getMethod();if (targetClass == null || method == null) {log.error("目标类或方法为空,无法处理事件");continue;}// 从 Spring 容器获取 bean 实例Object target;try {// 先尝试从 Spring 容器获取target = applicationContext.getBean(targetClass);} catch (NoSuchBeanDefinitionException e) {// 如果容器中没有,尝试创建新实例log.warn("Spring 容器中未找到 {} 的实例,将创建新实例", targetClass.getSimpleName());target = targetClass.getDeclaredConstructor().newInstance();// 可选:如果需要依赖注入,可以在这里自动装配autowireBean(target);}method.setAccessible(true);// 调用目标方法method.invoke(target, event);} catch (InstantiationException | IllegalAccessException e) {log.error("创建目标类实例失败: {}", e.getMessage(), e);} catch (NoSuchMethodException e) {log.error("找不到目标类的无参构造方法: {}", e.getMessage(), e);} catch (InvocationTargetException e) {log.error("方法调用失败: {}", e.getTargetException().getMessage(), e.getTargetException());} catch (Exception e) {log.error("事件处理失败: {}", e.getMessage(), e);}}}...}

5.测试

5.1 测试代码

@Component
public class UserAnnotationEventListener  {@EvalEventListenerpublic void onEvent(UserCreatedEvent event) {System.out.println("Received event: " + JSONObject.toJSONString(event));}
}

5.2 源码调试

断点1 EvalEventListenerAnnotationBeanPostProcessor

  • 扫描到 UserAnnotationEventListener 以及对应的方法

请添加图片描述

断点2

  • 封装 EvalEventAnnotationDefinition

请添加图片描述

断点3 EvalEventListenerAnnotationThread

  • 拉取 kafka 消息

请添加图片描述

断点4 EvalEventListenerAnnotationThread 动态代理执行方法

请添加图片描述

5.3 测试结果

请添加图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词