加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

springboot集成mqtt的实践开发

发布时间:2020-12-14 19:54:18 所属栏目:Java 来源:网络整理
导读:序 MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景。这里简单介绍一下如何在springboot中集成。 maven dependency groupIdorg.springframework.boot/groupId artif


MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景。这里简单介绍一下如何在springboot中集成。

maven

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-stream</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
    </dependency>

配置client factory

  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setServerURIs("tcp://demo:1883");
//    factory.setUserName("guest");
//    factory.setPassword("guest");
    return factory;
  }

配置consumer

  @Bean
  public IntegrationFlow mqttInFlow() {
    return IntegrationFlows.from(mqttInbound())
        .transform(p -> p + ",received from MQTT")
        .handle(logger())
        .get();
  }

  private LoggingHandler logger() {
    LoggingHandler loggingHandler = new LoggingHandler("INFO");
    loggingHandler.setLoggerName("siSample");
    return loggingHandler;
  }

  @Bean
  public MessageProducerSupport mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",mqttClientFactory(),"siSampleTopic");
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    return adapter;
  }

配置producer

@Bean
  public IntegrationFlow mqttOutFlow() {
    //console input
//    return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),//        e -> e.poller(Pollers.fixedDelay(1000)))
//        .transform(p -> p + " sent to MQTT")
//        .handle(mqttOutbound())
//        .get();
    return IntegrationFlows.from(outChannel())
        .handle(mqttOutbound())
        .get();
  }
  
  @Bean
  public MessageChannel outChannel() {
    return new DirectChannel();
  }

  @Bean
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher",mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic("siSampleTopic");
    return messageHandler;
  }

配置MessagingGateway

@MessagingGateway(defaultRequestChannel = "outChannel")
public interface MsgWriter {
  void write(String note);
}

这样就大功告成了

doc

spring-integration-mqtt 

spring-integration-samples-mqtt

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程小技巧。

您可能感兴趣的文章:

  • vue使用stompjs实现mqtt消息推送通知

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读