@EventPublisher+@Async异步事件流详解
- 游戏开发
- 2025-08-21 09:57:01

本文主要介绍Spring事件流和@Async异步线程池处理,以及@Async默认线程池可能会导致的问题及解决方法。
事件流Spring可以使用以观察者模式实现的事件流操作,将业务逻辑解耦,达到职责分离的效果
Spring事件流的详解
发布事件:
public class EmailService implements ApplicationEventPublisherAware { private ApplicationEventPublisher publisher; public void sendEmail(String address, String content) { publisher.publishEvent(new BlackListEvent(this, address, content)); // send email... } }监听事件:
@EventListener(condition = "#blEvent.content == 'foo'") public void processBlackListEvent(BlackListEvent blEvent) { // notify appropriate parties via notificationAddress... }注意在默认情况下,事件监听器会同步接收事件。这意味着publishEvent()方法将阻塞,直到所有侦听器都已完成对事件的处理为止。
@Async用@Async注解bean的一个方法,就会让它在一个单独的线程中执行。换句话说,调用者不会等待被调用方法的完成
@Async有两个限制:
它必须仅应用于public方法自调用(从同一个类中调用异步方法)将不起作用原因:该方法需要为public才可以被代理。而自调用是不生效的,因为它绕过了代理,直接调用了底层方法。
异步返回参数可以通过将实际返回包装在Future中,将@Async应用于具有返回类型的方法
示例详见How To Do @Async in Spring
@Async public Future<String> asyncMethodWithReturnType() { System.out.println("Execute method asynchronously - " + Thread.currentThread().getName()); try { Thread.sleep(5000); return new AsyncResult<String>("hello world !!!!"); } catch (InterruptedException e) { // } return null; }Spring 还提供了一个实现Future的AsyncResult类。我们可以使用它来跟踪异步方法执行的结果。
现在让我们调用上述方法并使用Future对象检索异步过程的结果。
public void testAsyncAnnotationForMethodsWithReturnType() throws InterruptedException, ExecutionException { System.out.println("Invoking an asynchronous method. " + Thread.currentThread().getName()); Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType(); while (true) { if (future.isDone()) { System.out.println("Result from asynchronous process - " + future.get()); break; } System.out.println("Continue doing something else. "); Thread.sleep(1000); } } 异步监听器如果要特定的侦听器异步处理事件,只需重用常规@Async支持:
@EventListener @Async public void processBlackListEvent(BlackListEvent event) { // BlackListEvent is processed in a separate thread }使用异步事件时,请注意以下限制:
如果事件监听器抛出Exception,它将不会传播给调用者,详见AsyncUncaughtExceptionHandler此类事件监听器无法发送答复事件。如果您需要发送另一个事件作为处理结果,请注入ApplicationEventPublisher以手动发送事件。 @EventPublisher + @Async 阻塞在@Async注解在使用时,不指定线程池的名称,默认SimpleAsyncTaskExecutor线程池。
默认的线程池配置为核心线程数为8,等待队列为无界队列,即当所有核心线程都在执行任务时,后面的任务会进入队列等待,若逻辑执行速度较慢会导致线程池阻塞,从而出现监听器抛弃和无响应的结果
spring默认线程池配置参数org.springframework.boot.autoconfigure.task.TaskExecutionProperties
/** * Configuration properties for task execution. * * @author Stephane Nicoll * @since 2.1.0 */ @ConfigurationProperties("spring.task.execution") public class TaskExecutionProperties { private final Pool pool = new Pool(); /** * Prefix to use for the names of newly created threads. */ private String threadNamePrefix = "task-"; public static class Pool { /** * Queue capacity. An unbounded capacity does not increase the pool and therefore * ignores the "max-size" property. */ private int queueCapacity = Integer.MAX_VALUE; /** * Core number of threads. */ private int coreSize = 8; /** * Maximum allowed number of threads. If tasks are filling up the queue, the pool * can expand up to that size to accommodate the load. Ignored if the queue is * unbounded. */ private int maxSize = Integer.MAX_VALUE; /** * Whether core threads are allowed to time out. This enables dynamic growing and * shrinking of the pool. */ private boolean allowCoreThreadTimeout = true; /** * Time limit for which threads may remain idle before being terminated. */ private Duration keepAlive = Duration.ofSeconds(60); //getter/setter } } 自定义线程池在@Async注解中value参数使用自定义线程池,能让开发工程师更加明确线程池的运行规则,选取适合的线程策略,规避资源耗尽的风险
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功 @Configuration public class ThreadConfig { @Bean("msgThread") public ThreadPoolTaskExecutor getMsgSendTaskExecutor(){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(25); taskExecutor.setQueueCapacity(800); taskExecutor.setAllowCoreThreadTimeOut(false); taskExecutor.setAwaitTerminationSeconds(60); taskExecutor.setThreadNamePrefix("msg-thread-"); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } }监听事件异步处理
@EventListener(value = MsgEvent.class, condition = "#root.args[0].type == 0") @Async("msgThread") public void commonEvent(MsgEvent event) { //logic }@Async使用自定义线程池的其他方式
参考资料:
Spring事件流@Async优化How To Do @Async in SpringSpring使用@Async注解@EventPublisher+@Async异步事件流详解由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“@EventPublisher+@Async异步事件流详解”