夯实 kafka 系列|第六章:自定义注解 @EvalEventListener 开发
文章目录
- 夯实 kafka 系列|第六章:自定义注解 @EvalEventListener 开发
- 1.前言
- 2.需求
- 3.思路
- 3.1 `@kafkaListener` 注解的实现
- 3.1.1 KafkaListenerAnnotationBeanPostProcessor(重点)
- 3.1.2 监听器端点注册
- 3.1.3 监听器容器创建
- 3.1.4 动态代理生成(重点)
- 3.1.5 总结
- 3.2 参考实现
- 4.开发
- 3.1 @EvalEventListener 注解
- 3.2 EvalEventAnnotationDefinition 注解定义
- 3.3 EvalEventListenerAnnotationBeanPostProcessor 初始化后处理器
- 3.4 EvalEventListenerAnnotationThread 消费逻辑
- 5.测试
- 5.1 测试代码
- 5.2 源码调试
- 5.3 测试结果
1.前言
在文章 《夯实 kafka 系列|第五章:基于 kafka 分布式事件框架 eval-event》 留了一个功能未实现
@EvalEventListener
监听分布式事件
本文来讨论这个自定义注解如何开发。
源码已更新到 github
- https://github.com/huajiexiewenfeng/eval-event
2.需求
通过 @EvalEventListener
监听到分布式事件 MyEvent(自定义事件)
@EvalEventListener
public void onEvalEvent(MyEvent event){...
}
3.思路
3.1 @kafkaListener
注解的实现
我们先看看 kafka 官方如何实现 @kafkaListener
,然后参照这个来进行实现。
3.1.1 KafkaListenerAnnotationBeanPostProcessor(重点)
- 作用:在 Spring 容器初始化时扫描所有 Bean,识别带有
@KafkaListener
注解的方法。 - 流程:
- 遍历所有 Bean 的类方法。
- 通过反射检查方法是否标注
@KafkaListener
。 - 将符合条件的监听方法封装为
MethodKafkaListenerEndpoint
。
3.1.2 监听器端点注册
- 将解析后的端点信息注册到
KafkaListenerEndpointRegistry
(管理所有监听容器的中央注册表)。
3.1.3 监听器容器创建
- 容器工厂
ConcurrentKafkaListenerContainerFactory
- 作用:根据
@KafkaListener
的配置创建ConcurrentMessageListenerContainer
。
3.1.4 动态代理生成(重点)
- 对带有
@KafkaListener
的方法生成动态代理,使其具备消息处理能力。
3.1.5 总结
一共分为三步
- 在 Bean 的初始化后生命周期阶段,找到所有标注
@KafkaListener
的方法和类 - 整理好所有需要的元数据信息(Bean Method topic 等等),用于动态代理+消费 kafka 消息
- 动态代理,执行类的方法,即标注
@KafkaListener
的方法
3.2 参考实现
我们大致实现流程如下:
-
EvalEventListenerAnnotationBeanPostProcessor 实现 BeanPostProcessor
-
遍历所有 Bean 的类方法,通过反射检查方法是否标注
@EvalEventListener
-
找到 topic 和 class 映射关系
-
启动线程,拉取 topic 消息
-
找到
@EvalEventListener
对应的 class,以及 method -
反射执行目标类的 method 方法
4.开发
3.1 @EvalEventListener 注解
package com.csdn.event.kafka.annotation;import java.lang.annotation.*;@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EvalEventListener {/*** 指定监听的事件类型(默认根据方法参数推断)*/Class<?>[] eventType() default {};
}
3.2 EvalEventAnnotationDefinition 注解定义
用于存储 Bean Method topic 等等信息
public class EvalEventAnnotationDefinition {private String topic;// 事件的目标类private Class<?> targetClass;// 事件的类型private Class<?> eventClass;private Method method;...
}
3.3 EvalEventListenerAnnotationBeanPostProcessor 初始化后处理器
- 找到标注
@EvalEventListener
类和方法 - 循环执行 processEvalEventListener 方法
- 封装 EvalEventAnnotationDefinition 信息
- 启动 EvalEventListenerAnnotationThread 线程
/*** This class processes beans after their initialization to find methods annotated with* {@link EvalEventListener} and register them as event listeners.*/
@Component
public class EvalEventListenerAnnotationBeanPostProcessor<T extends EvalEvent> implements BeanPostProcessor, ApplicationContextAware {...@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);Map<Method, Set<EvalEventListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<EvalEventListener>>) method -> {Set<EvalEventListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (annotatedMethods.isEmpty()) {// No methods foundlogger.debug("No @EvalEventListener annotations found on bean '{}'", beanName);} else {// Non-empty set of methodsfor (Map.Entry<Method, Set<EvalEventListener>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (EvalEventListener listener : entry.getValue()) {processEvalEventListener(listener, method, bean);}}logger.debug(" {} @KafkaListener methods processed on bean '{}'", annotatedMethods.size(), beanName + ": " + annotatedMethods);}return bean;}private void processEvalEventListener(EvalEventListener listener, Method method, Object bean) {// topicClass<?>[] eventTypeArr = listener.eventType();Class<?> eventType = eventTypeArr.length > 0 ? eventTypeArr[0] : null;if (eventType == null) {// 从方法参数中获取eventType = method.getParameterTypes()[0];}if (eventType == null) {throw new IllegalStateException("Event type not specified for @EvalEventListener on method: " + method);}try {Object o = eventType.newInstance();if (!(o instanceof EvalEvent)) {throw new IllegalStateException("Event type must be a subclass of EvalEvent: " + eventType);}EvalEvent event = (EvalEvent) o;EvalEventAnnotationDefinition evalEventDefinition = new EvalEventAnnotationDefinition(event.getTopic(), bean.getClass(), eventType, method);EvalEventListenerAnnotationThread<T> evalEventListenerThread =new EvalEventListenerAnnotationThread<>(evalEventDefinition, eventKafkaConsumerFactory, applicationContext);evalEventListenerThread.start();} catch (Exception e) {throw new RuntimeException(e);}}
...
}
3.4 EvalEventListenerAnnotationThread 消费逻辑
- 创建KafkaConsumer
- 拉取 kafka 消息
- 动态代理执行目标类的方法
- ack 提交
public class EvalEventListenerAnnotationThread<T extends EvalEvent> extends Thread {...@Overridepublic void run() {// 1. 创建KafkaConsumerKafkaConsumer<String, ?> consumer;try {consumer = eventKafkaConsumerFactory.buildKafkaConsumer(evalEventAnnotationDefinition.getTargetClass());List<String> topicList = new ArrayList<>();topicList.add(evalEventAnnotationDefinition.getTopic());consumer.subscribe(topicList);} catch (Exception e) {log.error("KafkaConsumer构造失败", e);e.printStackTrace();return;}// 2. 消费消息try {while (true) {try {// 3. 拉取消息ConsumerRecords<String, ?> records = consumer.poll(Duration.ofMillis(500));if (records.isEmpty()) {continue;}// 4. 处理消息dispatch(records);// 5. 使用异步提交规避阻塞consumer.commitAsync();} catch (Exception e) {log.error("消息处理异常", e);}}} finally {try {// 6.最后一次提交使用同步阻塞式提交consumer.commitSync();} finally {consumer.close();}}}private void dispatch(ConsumerRecords<String, ?> records) {for (ConsumerRecord<String, ?> record : records) {try {Object data = record.value();if (data == null) {log.warn("接收到空消息记录,跳过处理");continue;}// 转换事件数据为指定类型T event = (T) ConvertUtil.convertEvent(data, evalEventAnnotationDefinition.getEventClass());if (event == null) {log.error("事件数据转换失败,跳过处理");continue;}Class<?> targetClass = evalEventAnnotationDefinition.getTargetClass();Method method = evalEventAnnotationDefinition.getMethod();if (targetClass == null || method == null) {log.error("目标类或方法为空,无法处理事件");continue;}// 从 Spring 容器获取 bean 实例Object target;try {// 先尝试从 Spring 容器获取target = applicationContext.getBean(targetClass);} catch (NoSuchBeanDefinitionException e) {// 如果容器中没有,尝试创建新实例log.warn("Spring 容器中未找到 {} 的实例,将创建新实例", targetClass.getSimpleName());target = targetClass.getDeclaredConstructor().newInstance();// 可选:如果需要依赖注入,可以在这里自动装配autowireBean(target);}method.setAccessible(true);// 调用目标方法method.invoke(target, event);} catch (InstantiationException | IllegalAccessException e) {log.error("创建目标类实例失败: {}", e.getMessage(), e);} catch (NoSuchMethodException e) {log.error("找不到目标类的无参构造方法: {}", e.getMessage(), e);} catch (InvocationTargetException e) {log.error("方法调用失败: {}", e.getTargetException().getMessage(), e.getTargetException());} catch (Exception e) {log.error("事件处理失败: {}", e.getMessage(), e);}}}...}
5.测试
5.1 测试代码
@Component
public class UserAnnotationEventListener {@EvalEventListenerpublic void onEvent(UserCreatedEvent event) {System.out.println("Received event: " + JSONObject.toJSONString(event));}
}
5.2 源码调试
断点1 EvalEventListenerAnnotationBeanPostProcessor
- 扫描到 UserAnnotationEventListener 以及对应的方法
断点2
- 封装 EvalEventAnnotationDefinition
断点3 EvalEventListenerAnnotationThread
- 拉取 kafka 消息
断点4 EvalEventListenerAnnotationThread 动态代理执行方法