加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

Spring Boot @Async 异步任务执行

发布时间:2020-12-15 01:16:50 所属栏目:大数据 来源:网络整理
导读:1、任务执行和调度 Spring 用 TaskExecutor 和 TaskScheduler 接口提供了异步执行和调度任务的抽象。 Spring 的 TaskExecutor 和 java.util.concurrent.Executor 接口时一样的,这个接口只有一个方法 execute(Runnable task) 。 1.1、TaskExecutor类型 Sprin

1、任务执行和调度

SpringTaskExecutorTaskScheduler接口提供了异步执行和调度任务的抽象。

SpringTaskExecutorjava.util.concurrent.Executor接口时一样的,这个接口只有一个方法execute(Runnable task)

1.1、TaskExecutor类型

Spring已经内置了许多TaskExecutor的实现,你没有必要自己去实现:

  • SimpleAsyncTaskExecutor ?这种实现不会重用任何线程,每次调用都会创建一个新的线程。
  • SyncTaskExecutor ?这种实现不会异步的执行
  • ConcurrentTaskExecutor ?这种实现是java.util.concurrent.Executor的一个adapter
  • SimpleThreadPoolTaskExecutor ?这种实现实际上是Quartz的SimpleThreadPool的一个子类,它监听Spring的声明周期回调。
  • ThreadPoolTaskExecutor ?这是最常用最通用的一种实现。它包含了java.util.concurrent.ThreadPoolExecutor的属性,并且用TaskExecutor进行包装。

1.2、注解支持调度和异步执行

To enable support for @Scheduled and @Async annotations add @EnableScheduling and @EnableAsync to one of your @Configuration classes:

@Configuration
@EnableAsync
@EnableScheduling
public class AppConfig {
}

特别注意

The default advice mode for processing @Async annotations is "proxy" which allows for interception of calls through the proxy only; local calls within the same class cannot get intercepted that way. For a more advanced mode of interception,consider switching to "aspectj" mode in combination with compile-time or load-time weaving.

默认是用代理去处理@Async的,因此,相同类中的方法调用带@Async的方法是无法异步的,这种情况仍然是同步。

举个例子:下面这种,在外部直接调用sayHi()是可以异步执行的,而调用sayHello()时sayHi()仍然是同步执行

 A {
  
    void sayHello() {
        sayHi();
    }

    @Async
     sayHi() {

    }   
  
}

1.3、@Async注解

在方法上加@Async注解表示这是一个异步调用。换句话说,方法的调用者会立即得到返回,并且实际的方法执行是想Spring的TaskExecutor提交了一个任务。

In other words,the caller will return immediately upon invocation and the actual execution of the method will occur in a task that has been submitted to a Spring TaskExecutor.

@Async
 doSomething() {
    // this will be executed asynchronously
}
 doSomething(String s) {
    @Async
Future<String> returnSomething(int i) {
     this will be executed asynchronously
}

注意:

@Async methods may not only declare a regular java.util.concurrent.Future return type but also Spring’s org.springframework.util.concurrent.ListenableFuture or,as of Spring 4.2,JDK 8’s java.util.concurrent.CompletableFuture: for richer interaction with the asynchronous task and for immediate composition with further processing steps.

1.4、@Async限定Executor

默认情况下,当在方法上加@Async注解时,将会使用一个支持注解驱动的Executor。然而,@Async注解的value值可以指定一个别的Executor

@Async("otherExecutor")
 this will be executed asynchronously by "otherExecutor"
}

这里,otherExecutor是Spring容器中任意Executor bean的名字。

1.5、@Async异常管理

当一个@Async方法有一个Future类型的返回值时,就很容易管理在调Future的get()方法获取任务的执行结果时抛出的异常。如果返回类型是void,那么异常是不会被捕获到的。

class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {

    @Override
     handleUncaughtException(Throwable ex,Method method,Object... params) {
         handle exception
    }
}

?2、线程池配置

 1
 3 import org.springframework.context.annotation.Bean;
 4  org.springframework.context.annotation.Configuration;
 5  org.springframework.scheduling.annotation.EnableAsync;
 6  org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 7 
 8 @Configuration
 9 @EnableAsync
10  TaskExecutorConfig {
11 
12     private Integer corePoolSize = 30;
13 
14     private Integer maxPoolSize = 5015 
16     private Integer keepAliveSeconds = 30017 
18     private Integer queueCapacity = 2000;
19 
20     @Bean("myThreadPoolTaskExecutor"21     public ThreadPoolTaskExecutor myThreadPoolTaskExecutor() {
22         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
23         executor.setCorePoolSize(corePoolSize);
24         executor.setMaxPoolSize(maxPoolSize);
25         executor.setKeepAliveSeconds(keepAliveSeconds);
26         executor.setQueueCapacity(queueCapacity);
27         executor.setWaitForTasksToCompleteOnShutdown(true);
28         executor.initialize();
29         return executor;
30     }
31 
32 }

调用

 1     @Async("myThreadPoolTaskExecutor" 2     @Override
 3      present(CouponPresentLogEntity entity) {
 4         try {
 5             CouponBaseResponse rst = couponSendRpcService.send(entity.getUserId(),entity.getCouponBatchKey(),"1",entity.getVendorId());
 6             if (null != rst && rst.isSuccess()) {
 7                 entity.setStatus(PresentStatusEnum.SUCCESS.getType());
 8             }else 9                 String reason = (null == rst) ? "响应异常" : rst.getMsg();
                entity.setFailureReason(reason);
11                 entity.setStatus(PresentStatusEnum.FAILURE.getType());
12             }
13         }catch (Exception ex) {
14             log.error(ex.getMessage(),ex);
15             entity.setFailureReason(ex.getMessage());
16             entity.setStatus(PresentStatusEnum.FAILURE.getType());
17         }
        couponPresentLogDao.update(entity);
19     }

结果

[INFO ] 2018-05-09 16:27:39.887 [myThreadPoolTaskExecutor-1] [com.ourhours.coupon.rpc.dubbo.ReceiveLogFilter] - receive method-name:send; arguments:[10046031,"4d7cc32f8f7e4b00bca56f6bf4b3b658","1",10001]
[INFO ] 2018-05-09 16:27:39.889 [myThreadPoolTaskExecutor-2] [com.ourhours.coupon.rpc.dubbo.ReceiveLogFilter] - receive method-name:send; arguments:[10046031,10001]

?

?

?参考:

Spring Framework Reference Documentation 4.3.17.RELEASE

?

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读