最新要闻

广告

手机

高温橙色预警8连发,晴热高温天气持续……

高温橙色预警8连发,晴热高温天气持续……

八月第三周,神采飞扬,魅力十足的星座

八月第三周,神采飞扬,魅力十足的星座

家电

springboot~ApplicationContextAware和Interceptor产生了真感情

来源:博客园


(资料图片)

看着题目,有点一头污水吧,事实上,没有经历过,很难去说ApplicationContextAware在什么时候会用到,直接在一个bean对象里,你可以直接使用构造方法注入或者Autowired属性注入的方式来使用其它的bean对象,这在springboot里是非常自然的,也是天然支持的;但如果你的这个bean不是由spring ioc自动注入的,而是通过拦截器动态配置的,这时你使用@Autowired时,是无法获取到其它bean对象的;这时你需要使用ApplicationContextAware接口,再定义一个静态的ApplicationContext实例,在你的拦截器执行方法里使用它就可以了。【应该和拦截器里的动态代理有关】

一个kafka的ConsumerInterceptor实例

在这个例子中,我们通过ConsumerInterceptor实现了一个TTL的延时队列,当topic过期时,再通过KafkaTemplate将消息转发到其它队列里

  • DelayPublisher.publish发送延时topic的方法
/** * 发送延时消息 * @param message 消息体 * @param delaySecondTime 多个秒后过期 * @param delayTopic 过期后发送到的话题 */public void publish(String message, long delaySecondTime, String delayTopic) {ProducerRecord producerRecord = new ProducerRecord<>(topic, 0, System.currentTimeMillis(), delayTopic, message,new RecordHeaders().add(new RecordHeader("ttl", toBytes(delaySecondTime))));kafkaTemplate.send(producerRecord);}
  • ConsumerInterceptorTTL
/** * @author lind * @date 2023/8/18 8:33 * @since 1.0.0 */@Componentpublic class ConsumerInterceptorTTL implements ConsumerInterceptor, ApplicationContextAware {// 静态化的上下文,用于获取bean,因为ConsumerInterceptor是通过反射创建的,所以无法通过注入的方式获取beanprivate static ApplicationContext applicationContext;@Overridepublic ConsumerRecords onConsume(ConsumerRecords records) {long now = System.currentTimeMillis();Map>> newRecords = new HashMap<>();for (TopicPartition tp : records.partitions()) {List> tpRecords = records.records(tp);List> newTpRecords = new ArrayList<>();for (ConsumerRecord record : tpRecords) {Headers headers = record.headers();long ttl = -1;for (Header header : headers) {if (header.key().equals("ttl")) {ttl = toLong(header.value());}}// 消息超时判定if (ttl > 0 && now - record.timestamp() < ttl * 1000) {// 可以放在死信队列中System.out.println("消息超时了,需要发到topic:" + record.key());KafkaTemplate kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);kafkaTemplate.send(record.key(), record.value());}else { // 没有设置TTL,不需要超时判定newTpRecords.add(record);}}if (!newRecords.isEmpty()) {newRecords.put(tp, newTpRecords);}}return new ConsumerRecords<>(newRecords);}@Overridepublic void onCommit(Map offsets) {offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));}@Overridepublic void close() {}@Overridepublic void configure(Map configs) {}// 它的时机是在KafkaListenerAnnotationBeanPostProcessor的postProcessAfterInitialization方法中,applicationContext应该定时成static,否则在实例对象中,它的值可能是空@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}}
  • 配置文件中注入拦截器
spring:  kafka:    consumer:      properties:        interceptor.classes: com.example.ConsumerInterceptorTTL 

关键词: