最新要闻

广告

手机

光庭信息跌4.57% 2021上市超募11亿2022扣非降74% 时快讯

光庭信息跌4.57% 2021上市超募11亿2022扣非降74% 时快讯

搜狐汽车全球快讯 | 大众汽车最新专利曝光:仪表支持拆卸 可用手机、平板替代-环球关注

搜狐汽车全球快讯 | 大众汽车最新专利曝光:仪表支持拆卸 可用手机、平板替代-环球关注

家电

Spring事件监听在业务使用中的优化

来源:博客园

事件驱动的开发方式在业务系统开发中扮演着关键角色,若能妥善运用,将对系统的拓展性产生至关重要的影响。Spring框架对事件驱动进行了出色的封装,然而在实际业务应用中,我们发现了一些可优化的领域,因此针对这些问题进行了封装和优化工作。

Spring事件驱动的使用

在电商订单创建的场景中,一旦订单成功创建,必须执行一系列附加的业务操作。这些操作包括向用户发送相关信息,并将订单信息同步到相关系统中。考虑到此场景的特点,事件驱动的开发方式被广泛认为是最合适的选择,因其能够在不对业务系统造成负面影响的前提下进行处理。


【资料图】

事件定义

在订单创建事件中,可以定义包含订单对象以及与订单相关的属性,例如订单ID等。通过此事件,能够有效地传递订单信息,并为其他业务操作提供必要的数据依据。

@Data@AllArgsConstructorpublic class CreateTradeEvent{        // 可以在此定义订单创建事件中的相关属性    private Trade trade;    }

事件发布

在订单成功创建之后,我们可以获取到订单实体或其他相关属性。基于这些内容,我们可以创建一个订单创建事件的对象,并利用Spring框架提供的事件发布机制进行订单创建事件的发布。通过这种方式,我们能够有效地将订单创建事件传递出去,以便其他业务逻辑可以相应地对该事件进行处理。

public class TradeServiceImpl{        @Autowired    private ApplicationEventPublisher applicationEventPublisher;         public void createTrade(CreateTradeCmd crateTradeCmd){                // 一些业务操作后得到订单实体        Trade trade = new Trade();                // 发布订单创建的事件        applicationEventPublisher.publishEvent(new CreateTradeEvent(trade));            }}

事件监听

一旦订单创建事件被发布,我们需要对其进行监听进行一些额外的业务操作。Spring框架提供了出色的事件监听机制,只需在方法上添加@EventListener注解即可实现对特定事件的监听。这种方式使得对订单创建事件的监听和处理变得便捷,无需繁琐的配置和编写冗余代码。

@Componentpublic class TradeEventProcessor {    @EventListener    public void handleCreateTradeEventForNotify(CreateTradeEvent createTradeEvent) {        // 处理订单创建事件,并将订单信息发送到通知系统中            }    @EventListener    public void handleCreateTradeEventForEs(CreateTradeEvent createTradeEvent) {        // 处理订单创建事件,并将订单信息同步到Es中            }}

需要注意的是,尽管上述示例在代码结构上实现了业务的分离,但监听者仍然是在同一线程中按顺序执行的。如果我们需要自定义多个监听者的执行顺序,可以使用@Order注解来指定优先级。通过为不同的监听者添加不同的优先级值,可以控制它们的执行顺序。这种方式允许我们精确地定义监听者的执行顺序,以满足特定的业务需求。

@Componentpublic class TradeEventProcessor {    @EventListener    @Order(1)    public void handleCreateTradeEventForNotify(CreateTradeEvent createTradeEvent) {        // 处理订单创建事件,并将订单信息发送到通知系统中            }    @EventListener    @Order(2)    public void handleCreateTradeEventForEs(CreateTradeEvent createTradeEvent) {        // 处理订单创建事件,并将订单信息同步到Es中            }}

针对那些在订单创建完成后并不影响主要流程的次要操作,例如向用户发送通知信息,我们倾向于使用异步线程进行执行,以提高系统的响应速度。

在Spring框架中,我们可以通过添加@EnableAsync注解来启用异步操作功能,并在监听者方法上使用@Async注解来指定任务的异步执行。通过这种方式,我们能够将次要操作与主要流程分离,异步执行它们,从而实现更快的系统响应速度。

需要注意的是,在使用@Async注解时,最好自定义一个线程池进行注入。默认情况下,Spring会使用一个简单的线程池来处理异步方法调用。通过注入自定义线程池,我们能够更精确地控制并发执行的线程数量以及线程的生命周期。

@Component@EnableAsyncpublic class TradeEventProcessor {    @EventListener    @Order(1)    @Async("customThreadPool")    public void handleCreateTradeEventForNotify(CreateTradeEvent createTradeEvent) {        // 处理订单创建事件,并将订单信息发送到通知系统中            }    @EventListener    @Order(2)    public void handleCreateTradeEventForEs(CreateTradeEvent createTradeEvent) {        // 处理订单创建事件,并将订单信息同步到Es中            }}

使用过程中的思考

在事件驱动的开发方式中,通常有许多监听者处理的是不影响主要业务流程的业务。针对这种情况,我们倾向于将这些监听者的执行异步化,以提高系统的响应性能。然而,在使用@Async注解将这些监听者方法异步执行时,可能会遇到一些问题需要注意。

  1. 尽管我们可以在事件中定义一些与事件相关的属性,但在监听者进行业务操作时,这些属性往往不足以满足需求,需要在业务监听者中进行数据查询。然而,这带来了一个问题:在异步执行期间,如果主线程在对相关数据进行操作时被阻塞(例如锁表),而事务尚未成功提交,那么异步线程读取的可能是操作之前的数据状态,这将对后续监听者的操作产生错误的影响。

  2. 在异步任务执行过程中,由于网络问题或其他因素的影响,监听者的执行可能会失败。尽管这些操作并不影响主流程业务,但我们仍然希望能够尽最大努力确保监听者的成功执行,并在发生错误时进行相应的通知。为了实现这一目标,我们需要对这些业务监听进行持久化,以便可以进行重试操作。如果没有进行持久化,还可能因为重启等操作导致任务丢失。

    通过将业务监听持久化,我们能够提供可靠的任务重试机制,保证监听者的执行成功性。这样即使出现了故障或异常情况,系统能够在恢复正常运行后重新执行未成功的任务。这种持久化机制可以保证任务的可靠性和一致性,从而提供更可靠的系统行为。

  3. 在项目开发中,通常会将一些公共属性(例如当前用户信息)放入线程变量中,并在接口调用结束后清除线程变量的数据,以避免内存溢出。然而,在异步执行时,我们无法直接获取主线程中的线程变量数据,这给业务代码编写带来了一些挑战。

对于异步线程执行任务的场景下,我们需要能够做到以下几点:

  1. 为了解决异步线程中的查询问题,我们采用了一种策略:只有在主线程事务成功提交后,才执行异步线程中的业务操作。

    通过这样的处理方式,确保了在主线程执行相关数据操作并成功提交后,再触发异步线程的执行。这样可以避免异步线程在查询数据时读取到未提交的数据状态,从而保证了查询结果的准确性和一致性。

    这种异步处理策略有效地解决了异步线程中的查询问题,并确保了数据的正确性。同时,它不会对Spring框架原有的功能产生任何负面影响,保持了系统的可靠性和稳定性。

  2. 在任务执行出错的情况下,我们采用持久化操作来确保任务不会丢失,并为后续重试提供支持。通过将执行的任务进行持久化,我们可以将其记录到持久存储系统(如数据库或消息队列)中。

    通过这一策略,我们能够确保任务的持久性和可靠性,在发生错误时能够尽最大努力地进行重试,从而提高系统的稳定性和可靠性,并为任务的重试提供了可行的手段。

  3. 针对错误任务的持久化,我们需要支持多种持久化方法,包括常见的数据库持久化,并能够兼容不同的持久化框架(如JDBC、MyBatis、JPA等)。

    通过这样的设计,我们可以在持久化任务时根据需求选择合适的持久化方法和框架。无论是使用关系型数据库、非关系型数据库,还是其他持久化方案,都能够进行任务的持久化操作。

    这种灵活性和扩展性的设计使得我们能够适应各种数据存储需求,并与不同的持久化框架协同工作,为任务的持久化提供了广泛的支持和可选性。

  4. 我们需要支持多种线程变量数据的传递,无论业务系统在线程变量中存放什么类型的数据,我们都需要能够支持。

技术分析

通过以上的问题分析以及业务要求,我们如果想要优化Spring的监听者在异步执行下场景,那么会遇到下面的开发问题

  1. 如何对相应的事件监听进行增强
  2. 如何支持不同的持久化方案,如数据库,消息队列等
  3. 如何提供公用的补偿机制
  4. 如何向异步线程中传递数据问题,不同系统线程变量不同,如何处理?

初步分析考虑对Spring的EventListener创建一个切面进行增强,考虑到我们需要支持同步和异步的调用方式,考虑创建一个自定义的注解,然后对这个自定义注解进行增强。那么这个自定义的注解需要支持原本EventListener所有的功能,并且能够支持对任务的排序,异步执行以及同步执行。

其次,我们需要将业务监听者的类名、方法名以及参数等进行持久化,并且支持不同的持久化方案。

从业务使用场景下要求我们能够支持不同的持久化方案,我们可以考虑定义统一的接口,然后由不同的业务系统去定义实现,业务系统如何持久化由个业务系统自行处理。如果这样处理,我们在对注解增强的地方如何通过接口去调用实现,完成持久化呢?初步有两个方案,一是使用Java的SPI机制,二是通过Spring容器直接获取相应的实现类完成持久化操作。

完成持久化后,如何提供公用的补偿机制?首先能够查询出来,然后通过反射调用的方式能够实现对监听者的调用。查询可以和持久化一样,通过定义接口,各业务系统定义实现的方式进行。

最后还有线程变量的问题,向异步线程中传递数据相对简单,可以在主线程中先获取出数据,然后传递到子线程中,子线程结束时清除子线程中的数据,这样看的话,也是有三步,获取数据,设置数据,清除数据。那么我们也是可以定义公共的接口,由各业务系统去完成这三个实现,以支持各业务系统线程变量不同的要求。

功能实现

通过以上分析,我们决定创建一个项目来扩展原有的Spring事件监听功能。幸运的是,Spring的starter机制使得集成类似的功能变得非常简便。

基于这个决策,我们可以利用Spring的starter机制,创建一个自定义的starter项目。该项目将提供一套扩展的事件监听功能,包括异步执行、错误处理和持久化等特性。

通过创建自定义的starter项目,我们能够轻松地将这些功能集成到现有的Spring应用程序中。开发者只需添加我们自定义的starter依赖,并进行适当的配置即可使用这些扩展功能。

自定义注解

为了确保不影响原有的Spring功能,我们计划自定义一个注解,以实现与原有Spring事件监听相同的功能和配置。

通过自定义注解并实现原有Spring事件监听的全部功能和配置,我们能够在不影响原有Spring功能的前提下,提供更多的定制化选项和增强功能,以满足特定应用场景的需求。

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@EventListener@Orderpublic @interface EventEnhanceListener {    /**     * 是否异步执行,默认同步执行     */    boolean async() default true;    /**     * 自定义线程池,用于异步执行任务     */    String threadPool() default "";    /**     * 执行顺序     */    @AliasFor(annotation = Order.class, attribute = "value")    int order() default Ordered.LOWEST_PRECEDENCE;    /**     * 持久化接口     */    String persistenceService() default "";    // EventListener的属性    @AliasFor(annotation = EventListener.class, attribute = "classes")    Class[] value() default {};    @AliasFor(annotation = EventListener.class, attribute = "value")    Class[] classes() default {};    @AliasFor(annotation = EventListener.class, attribute = "condition")    String condition() default "";}

对自定义注解的增强

创建注解类对象的后置处理器

@Configuration@Role(BeanDefinition.ROLE_SUPPORT)public class EventEnhanceConfiguration implements ApplicationContextAware {    private ApplicationContext applicationContext;    @Nullable    protected Supplier executor;    @Nullable    protected Supplier exceptionHandler;    @Bean    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)    public EventEnhanceListenerBeanPostProcessor advisor(){        EventEnhanceListenerBeanPostProcessor bpp = new EventEnhanceListenerBeanPostProcessor();        bpp.configure(this.executor, this.exceptionHandler, applicationContext);        return bpp;    }    @Autowired(required = false)    void setConfigurers(Collection configurers) {        if (CollectionUtils.isEmpty(configurers)) {            return;        }        if (configurers.size() > 1) {            throw new IllegalStateException("Only one AsyncConfigurer may exist");        }        AsyncConfigurer configurer = configurers.iterator().next();        this.executor = configurer::getAsyncExecutor;        this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;    }    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.applicationContext = applicationContext;    }}

后置处理器中创建一个切面类

public class EventEnhanceListenerBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {    @Nullable    private Supplier executor;    @Nullable    private Supplier exceptionHandler;    private ApplicationContext applicationContext;    public EventEnhanceListenerBeanPostProcessor() {        setBeforeExistingAdvisors(true);    }    public void configure(@Nullable Supplier executor,                          @Nullable Supplier exceptionHandler,                          ApplicationContext applicationContext) {        this.executor = executor;        this.exceptionHandler = exceptionHandler;        this.applicationContext = applicationContext;    }    @Override    public void setBeanFactory(BeanFactory beanFactory) {        super.setBeanFactory(beanFactory);        EventEnhanceListenerAdvisor advisor = new EventEnhanceListenerAdvisor(this.executor, this.exceptionHandler, this.applicationContext);        advisor.setBeanFactory(beanFactory);        this.advisor = advisor;    }}

切面类中通过EventEnhanceListenerExecutionInterceptor对自定义注解进行增强

public class EventEnhanceListenerAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {    private Advice advice;    private Pointcut pointcut;    private ApplicationContext applicationContext;    public EventEnhanceListenerAdvisor(@Nullable Supplier executor,                                       @Nullable Supplier exceptionHandler,                                       ApplicationContext applicationContext) {        Set> annotationTypes = new LinkedHashSet<>(2);        annotationTypes.add(EventEnhanceListener.class);        this.applicationContext = applicationContext;        this.advice = buildAdvice(executor, exceptionHandler);        this.pointcut = buildPointcut(annotationTypes);    }    protected Advice buildAdvice(@Nullable Supplier executor,                                 @Nullable Supplier exceptionHandler) {        EventEnhanceListenerExecutionInterceptor interceptor = new EventEnhanceListenerExecutionInterceptor(null, this.applicationContext);        interceptor.configure(executor, exceptionHandler);        return interceptor;    }    protected Pointcut buildPointcut(Set> asyncAnnotationTypes) {        ComposablePointcut result = null;        for (Class annotationType : asyncAnnotationTypes) {            Pointcut cpc = new AnnotationMatchingPointcut(annotationType, true);            Pointcut mpc = new AnnotationMatchingPointcut(null, annotationType, true);            if (result == null) {                result = new ComposablePointcut(cpc);            } else {                result.union(cpc);            }            result = result.union(mpc);        }        return (result != null ? result : Pointcut.TRUE);    }    @Override    public Pointcut getPointcut() {        return this.pointcut;    }    @Override    public Advice getAdvice() {        return this.advice;    }    @Override    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {        if (this.advice instanceof BeanFactoryAware) {            ((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);        }    }}
public class EventEnhanceListenerExecutionInterceptor extends AsyncExecutionInterceptor {    private ApplicationContext applicationContext;    private AsyncTaskDataTransfer asyncTaskDataTransfer;    public EventEnhanceListenerExecutionInterceptor(@Nullable Executor defaultExecutor,                                                    ApplicationContext applicationContext) {        super(defaultExecutor);        this.applicationContext = applicationContext;    }    @Override    public Object invoke(MethodInvocation invocation) throws Throwable {        EventEnhanceListener eventEnhanceListener = AnnotatedElementUtils.findMergedAnnotation(invocation.getMethod(), EventEnhanceListener.class);        Assert.notNull(eventEnhanceListener, "method @CustomEventListener is null");        if (eventEnhanceListener.async()) {            return asyncSubmitTask(invocation, eventEnhanceListener);        } else {            return invocation.proceed();        }    }    private Object asyncSubmitTask(MethodInvocation invocation, EventEnhanceListener eventEnhanceListener) {        Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);        if (executor == null) {            throw new IllegalStateException(                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");        }        Object mainThreadData = getMainThreadData();        // 将执行的任务持久化        EventListenerAsyncTask eventListenerAsyncTask = buildAsyncTask(invocation, targetClass, userDeclaredMethod, mainThreadData);        String persistenceServiceName = eventEnhanceListener.persistenceService();        eventListenerAsyncTask.getData().setPersistenceService(persistenceServiceName);        AsyncTaskPersistenceService asyncTaskPersistenceService = applicationContext.getBean(persistenceServiceName, AsyncTaskPersistenceService.class);        asyncTaskPersistenceService.saveForTask(eventListenerAsyncTask);        Callable task = () -> {            try {                if (Objects.nonNull(asyncTaskDataTransfer)) {                    asyncTaskDataTransfer.setAsyncThreadData(mainThreadData);                }                Object result = invocation.proceed();                if (result instanceof Future) {                    return ((Future) result).get();                }                asyncTaskPersistenceService.removeForTask(eventListenerAsyncTask);            } catch (Throwable ex) {                handleError(ex, userDeclaredMethod, invocation.getArguments());                eventListenerAsyncTask.setExceptionMsg(ex.toString());                asyncTaskPersistenceService.saveForTask(eventListenerAsyncTask);            } finally {                if (Objects.nonNull(asyncTaskDataTransfer)) {                    asyncTaskDataTransfer.clear();                }            }            return null;        };        return doSubmit(task, executor, invocation.getMethod().getReturnType());    }    @Override    @Nullable    protected String getExecutorQualifier(Method method) {        EventEnhanceListener eventEnhanceListener = AnnotatedElementUtils.findMergedAnnotation(method, EventEnhanceListener.class);        if (eventEnhanceListener == null) {            eventEnhanceListener = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), EventEnhanceListener.class);        }        return (eventEnhanceListener != null ? eventEnhanceListener.threadPool() : null);    }}

通用的补偿接口

考虑对外提供通用的补偿http接口,如何调度由业务系统自行决定。

public class AsyncCompensateJob implements ApplicationContextAware {    private ApplicationContext applicationContext;    private AsyncTaskDataTransfer asyncTaskDataTransfer;    public void execute() {        Map asyncTaskPersistenceServiceMap = applicationContext.getBeansOfType(AsyncTaskPersistenceService.class);        if (CollectionUtils.isEmpty(asyncTaskPersistenceServiceMap)) {            return;        }        if (asyncTaskDataTransfer == null) {            ServiceLoader serviceLoader = ServiceLoader.load(AsyncTaskDataTransfer.class);            if (serviceLoader.findFirst().isPresent()) {                asyncTaskDataTransfer = serviceLoader.findFirst().get();            }        }        for (Map.Entry entry : asyncTaskPersistenceServiceMap.entrySet()) {            AsyncTaskPersistenceService asyncTaskPersistenceService = entry.getValue();            List taskList = asyncTaskPersistenceService.findTaskList();            for (EventListenerAsyncTask eventListenerAsyncTask : taskList) {                EventTaskInfo eventTaskInfo = eventListenerAsyncTask.getData();                Object threadContext = eventTaskInfo.getThreadContext();                String className = eventTaskInfo.getClassName();                String methodName = eventTaskInfo.getMethod();                String argumentString = eventTaskInfo.getArguments();                if (Objects.nonNull(asyncTaskDataTransfer)) {                    asyncTaskDataTransfer.setAsyncThreadData(threadContext);                }                try {                    Class clazz = Class.forName(className);                    Method method = getMethod(clazz, methodName);                    Assert.notNull(method, "根据类限定名以及方法名未反射找到对应的方法");                    Class[] parameterTypes = method.getParameterTypes();                    Object[] actualArgumentObjects = generateArguments(parameterTypes, argumentString);                    // 反射调用方法执行                    Object targetObject = applicationContext.getBean(clazz);                    ReflectionUtils.invokeMethod(method, targetObject, actualArgumentObjects);                    // 执行成功删除任务                    asyncTaskPersistenceService.removeForTask(eventListenerAsyncTask);                } catch (Exception ex) {                    log.error("异步任务错误补偿执行过程报错,异步任务信息:{}", eventListenerAsyncTask, ex);                } finally {                    if (Objects.nonNull(asyncTaskDataTransfer)) {                        asyncTaskDataTransfer.clear();                    }                }            }        }    }    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.applicationContext = applicationContext;    }}

待完善处

  1. 为了实现接口参数的序列化和反序列化,我们采用JSON格式进行数据的转换。这要求业务接口的参数对象必须具备无参构造函数,并提供GETTER和SETTER方法,以便能够正确地进行序列化和反序列化操作。然而,这会对业务系统带来一定程度的侵入性。
  2. 通用的补偿任务完成后,当前实现中使用了删除持久化记录的方式,但是接口命名不应该是删除,而应该是后置处理,补偿任务完成后的具体处理方式应由业务系统自行决定。此外,在调用接口发生异常时,应定义一个异常处理接口,并将其提供给业务系统,以使其了解补偿失败的情况,并能够进行适当的后续处理,例如设定补偿失败的阈值等。
  3. 在通用的补偿任务中,我们通过反射调用需要补偿的接口,由于业务接口的类通常都会有些依赖,为方便构造反射对象,我们通过Spring的容器直接获取相应的对象,然后反射调用相应的方法。这会带来一个问题,由于通过容器调用会触发我们在该类上创建的切面增强。后续优化考虑通过对从容器中获取的对象创建一个代理,通过代理调用容器对象中的方法,使用这种方式避免触发切面相应的业务
  4. 目前对于线程变量数据的传递,采用的是定义公共接口,由业务系统实现具体逻辑的方式,目前在拦截器中使用的是Java的SPI机制,后续考虑在自定义注解上增加一个属性,定义线程变量传递实现的name,通过Spring容器获取实现的方式实现,这样业务系统在使用时会更加方便一些,和持久化方案一致,逻辑上更加统一。

思考

代码写到最后发现,其实实现的功能解决的是在异步线程中处理任务所遇到的问题,与其说是对@EventListener的增强,还不如说是对异步线程任务的增强,后续考虑将功能的实现放在@Async上,这样既不破坏原有Spring的事件监听机制,也增大了功能的适用范围。

关键词: