加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

Spring Boot 事件和监听

发布时间:2020-12-15 01:16:43 所属栏目:大数据 来源:网络整理
导读:Application Events and Listeners 1、自定义事件和监听 1.1、定义事件 1 package com.cjs.boot.event; 2 3 import lombok.Data; 4 org.springframework.context.ApplicationEvent; 5 6 @Data 7 public class BlackListEvent extends ApplicationEvent { 8 9

Application Events and Listeners

1、自定义事件和监听

1.1、定义事件

 1 package com.cjs.boot.event;
 2 
 3 import lombok.Data;
 4  org.springframework.context.ApplicationEvent;
 5 
 6 @Data
 7 public class BlackListEvent extends ApplicationEvent {
 8 
 9     private String address;
10 
11     public BlackListEvent(Object source,String address) {
12         super(source);
13         this.address = address;
14     }
15 }

1.2、定义监听

 org.springframework.context.ApplicationListener;
 org.springframework.context.event.EventListener;
 5  org.springframework.stereotype.Component;
 6 
 7 8 class BlackListListener implements ApplicationListener<BlackListEvent> {
 9 
10     @Override
void onApplicationEvent(BlackListEvent event) {
12         System.out.println("监听到BlackListEvent事件: " + event.getAddress());
try14             Thread.sleep(2000);
15         } catch (InterruptedException e) {
16             e.printStackTrace();
17         }
18 19 }

1.3、注册监听

 com.cjs.boot;
 com.cjs.boot.event.BlackListListener;
 org.springframework.boot.SpringApplication;
 org.springframework.boot.autoconfigure.SpringBootApplication;
 org.springframework.boot.web.server.ErrorPage;
 org.springframework.boot.web.server.ErrorPageRegistrar;
 org.springframework.boot.web.server.ErrorPageRegistry;
 9  org.springframework.cache.annotation.EnableCaching;
 org.springframework.context.annotation.Bean;
11  org.springframework.http.HttpStatus;
12  org.springframework.scheduling.annotation.EnableAsync;
13 
@SpringBootApplication
15 class CjsSpringbootExampleApplication {
16 
17     static  main(String[] args) {
18 
19         SpringApplication springApplication = new SpringApplication(CjsSpringbootExampleApplication.20         springApplication.addListeners(new BlackListListener());
21         springApplication.run(args);
22 
23     }

1.4、发布事件

 com.cjs.boot.controller;
 com.cjs.boot.event.BlackListEvent;
 org.springframework.beans.factory.annotation.Autowired;
 org.springframework.context.ApplicationContext;
 org.springframework.context.ApplicationEventPublisher;
 org.springframework.web.bind.annotation.GetMapping;
 org.springframework.web.bind.annotation.RequestMapping;
 org.springframework.web.bind.annotation.RestController;
@RestController
12 @RequestMapping("/activity")
13  ActivityController {
14 
//    @Autowired
    private ApplicationEventPublisher publisher;
17 
19      ApplicationContext publisher;
20 
21     @GetMapping("/sayHello.json"22      sayHello() {
23 
24         /**
25          * You may register as many event listeners as you wish,but note that by default event listeners receive events synchronously.
26 This means the publishEvent() method blocks until all listeners have finished processing the event.
27          */
28 
29         BlackListEvent event = new BlackListEvent(this,"abc@126.com"30         publisher.publishEvent(event);
31         System.out.println("事件发布成功"32 33 
34 }

2、基于注解的事件监听

 com.cjs.boot.event;

 org.springframework.stereotype.Component;

@Component
 BlackListListener {

    @EventListener
     processBlackListEvent(BlackListEvent event) {
        System.out.println(123);
    }
}

---

 com.cjs.boot;

 org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
 CjsSpringbootExampleApplication {

     main(String[] args) {
        SpringApplication.run(CjsSpringbootExampleApplication.,args);
    }

}

3、异步监听

1 @EventListener
2 @Async
3  processBlackListEvent(BlackListEvent event) {
4      BlackListEvent is processed in a separate thread
5 }

4、应用

 lombok.extern.slf4j.Slf4j;
 org.springframework.scheduling.annotation.Async;
11 
 java.util.ArrayList;
 java.util.List;
 java.util.concurrent.ExecutionException;
 java.util.concurrent.Future;
 java.util.concurrent.atomic.AtomicInteger;
19  * 批量送券
20  @Slf4j
22 @Component
23  BatchSendCouponListener {
24 
26      CouponPresentLogService couponPresentLogService;
27 
28     @Async
29     @EventListener
30      processBatchSendCouponEvent(BatchSendCouponEvent batchSendCouponEvent) {
31         Long cpId = batchSendCouponEvent.getCouponPresentId();
32         log.info("收到BatchSendCouponEvent,cpId={}"33         List<CouponPresentLogEntity> list = couponPresentLogService.selectByPid(cpId);
34 
35         handle(cpId,list,036 37 
38     private void handle(Long cpId,List<CouponPresentLogEntity> list,int times) {
39         if (times >= 2) {
40             log.info("超过重试次数退出,cpId: {},剩余: {}"41             return;
42 43 
44         List<Future<CouponPresentLogEntity>> futureList = new ArrayList<>();
45 
46         for (CouponPresentLogEntity entity : list) {
47             futureList.add(couponPresentLogService.present(entity));
48 49 
50         AtomicInteger count = new AtomicInteger(051           收集失败的
52         List<CouponPresentLogEntity> failList = 53         for (Future<CouponPresentLogEntity> future : futureList) {
54             55                 CouponPresentLogEntity couponPresentLogEntity = future.get();
56                 if (couponPresentLogEntity.getStatus() != PresentStatusEnum.SUCCESS.getType().intValue()) {
57                     failList.add(couponPresentLogEntity);
58                 }
59                 count.getAndIncrement();
60                 if (count.intValue() >= list.size()) {
61                     List<CouponPresentLogEntity> failPresentLogList = couponPresentLogService.selectFailLogByPid(cpId);
62                     if (null != failPresentLogList && failPresentLogList.size() > 063                         times++64                         log.info("第{}次重试,CPID: {},总计: {},失败: {}"65                         handle(cpId,failPresentLogList,times);
66                     }
67 68             } 69                 log.error(e.getMessage(),e);
70             }  (ExecutionException e) {
71 72             }
73 74 75 
76 }
 2  org.springframework.scheduling.annotation.AsyncResult;
 org.springframework.stereotype.Service;
 javax.annotation.Resource;
import java.util.concurrent.*@Service
class CouponPresentLogServiceImpl implements CouponPresentLogService {
15      CouponPresentLogDao couponPresentLogDao;
    @Resource
 CouponSendRpcService couponSendRpcService;
19     @Async("myThreadPoolTaskExecutor"20 21     public Future<CouponPresentLogEntity> present(CouponPresentLogEntity entity) {
22         23             CouponBaseResponse rst = couponSendRpcService.send(entity.getUserId(),entity.getCouponBatchKey(),"1"24             null != rst && rst.isSuccess()) {
                entity.setStatus(PresentStatusEnum.SUCCESS.getType());
                entity.setFailureReason(PresentStatusEnum.SUCCESS.getName());
27             }else28                 String reason = (null == rst) ? "响应异常" : rst.getMsg();
29                 entity.setFailureReason(reason);
30                 entity.setStatus(PresentStatusEnum.FAILURE.getType());
31 32         } (Exception ex) {
33             log.error(ex.getMessage(),ex);
34             entity.setFailureReason(ex.getMessage());
35             entity.setStatus(PresentStatusEnum.FAILURE.getType());
37         couponPresentLogDao.update(entity);
38 
return new AsyncResult<CouponPresentLogEntity>(entity);
40 41 
42 }

?5、统计异步任务执行的进度

利用Future获取执行结果,比如上面的例子中,由于不是直接提交的任务,所以用AsyncResult来返回结果

上面的例子中,一个大任务,然后下面有许多子任务。在主任务中,统计各子任务的执行情况,是成功还是失败,然后统计成功多少,失败多少

也可以这样写:

?

@Autowired
ThreadPoolTaskExecutor taskExecutor;

Future<Object> future = taskExecutor.submit(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        null;
    }
});

?

?

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读