加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

java – Spring AMQP(Rabbit)监听器在出现异常时进入循环

发布时间:2020-12-15 01:46:39 所属栏目:大数据 来源:网络整理
导读:@BeanRabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory()); template.setMessageConverter(messageConverter); template.setExchange(amqpProperties.getRabbitMqTopicExchangeName()); return tem

@Bean
RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
    template.setMessageConverter(messageConverter);
    template.setExchange(amqpProperties.getRabbitMqTopicExchangeName());
    return template;
}

@Bean
@Conditional (OperationsCondition.class)
 SimpleMessageListenerContainer opsMessageListenerContainer() {
    return listenerContainer(amqpProperties.getRabbitMqOperationsQueue(),amqpProperties.getInitialRabbitOperationsConsumerCount(),amqpProperties.getMaximumRabbitOperationsConsumerCount(),opsReceiver());
}

@Bean
@Conditional (OperationsCondition.class)
OperationsListener opsReceiver() {
    return new OperationsListener();
}

private SimpleMessageListenerContainer listenerContainer(String queue,int initConsumers,int maxConsumers,MessageListener listener)
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueueNames(queue);
    container.setMessageListener(listener);
    container.setConcurrentConsumers(initConsumers);
    container.setMaxConcurrentConsumers(maxConsumers);
    container.setMessageConverter(messageConverter);
    return container;
}

消息监听器是:

public class OperationsListener  implements MessageListener
{
    public static final Logger logger = Logger.getInstance(OperationsListener.class);

    @Autowired (required=true)
    private OperationsProcessor processor;
    @Autowired (required=true)
    private ObjectMapper objectMapper;

    public void onMessage(Message message)
    {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();        
        converter.setJsonObjectMapper(objectMapper);
        OperationsMessage request = (OperationsMessage)converter.fromMessage(message);
        processor.createMessage(request);
        //This is throwing a JPA database exception
        processor.createOperation(request);
    }
}

processor.createOperation()因数据库问题而抛出异常.问题是消息监听器进入循环并且消息不断回来.

我的处理器类:

@Component
@Transactional (propagation = Propagation.REQUIRES_NEW) 
public class OperationsProcessor
{
...............

    public void createOperation(OperationsMessage message)
    {
            try
            {
                .............
                .............
                //this call throws exception.
                opsRepo.create(operation,null);
            }
            catch (Exception e)
            {
                logger.error(e);
            }

    }
}

opsRepo.create抛出异常.即使我正在捕捉错误,我希望春天amqp不再发送该消息.不确定为什么同样的消息不断回来.

编辑:

我想我找到了一些关于如何处理这个问题的建议.原因是春天在失败时重新发生事件,这是默认的性质.
找到了一个有用的主题here和here.

最佳答案
要确认您找到了什么,这是clearly documented in the reference manual section “Exception Handling”.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读