加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

mosquitto的TLS功能: (使用paho.mqtt.golang和JAVA版实现客户端

发布时间:2020-12-16 19:15:15 所属栏目:大数据 来源:网络整理
导读:1、SSL/TLS简介 SSL(SecureSocket Layer)安全套接层,是网景公司提出的用于保证Server与client之间安全通信的一种协议,该协议位于TCP/IP协议与各应用层协议之间,即SSL独立于各应用层协议,因此各应用层协议可以透明地调用SSL来保证自身传输的安全性。目

1、SSL/TLS简介

  SSL(SecureSocket Layer)安全套接层,是网景公司提出的用于保证Server与client之间安全通信的一种协议,该协议位于TCP/IP协议与各应用层协议之间,即SSL独立于各应用层协议,因此各应用层协议可以透明地调用SSL来保证自身传输的安全性。目前,SSL被大量应用于http的安全通信中,MQTT协议与http协议同样属于应用层协议,因此也可以像http协议一样使用ssl为自己的通信提供安全保证。

  SSL与TLS(Transport LayerSecurity Protocol)之间的关系:TLS(TransportLayer Security,传输层安全协议)是IETF(InternetEngineering Task Force,Internet工程任务组)制定的一种新的协议,它建立在SSL 3.0协议规范之上,是SSL 3.0的后续版本。在TLS与SSL3.0之间存在着显著的差别,主要是它们所支持的加密算法不同,所以TLS与SSL3.0不能互操作。

2、使用Openssl创建tls证书

  SSL在身份认证过程中需要有一个双方都信任的CA签发的证书,CA签发证书是需要收费的,但是在测试过程中,可以自己产生一个CA,然后用自己产生的CA签发证书,下面的mosquitto的ssl功能的测试过程就是采用这一方式,其过程如下:

步骤一:产生自己的CA

openssl req -new -x509 -days 36500 -extensions v3_ca -keyout ca.key -out ca.crt
openssl req -new -x509 -days 36500 -extensions v3_ca -keyout ca.key -out ca.pem

步骤二:产生服务端证书

openssl genrsa -des3 -out server.key 2048
openssl req -out server.csr -key server.key -new
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 36500

步骤三:产生客户端证书

openssl genrsa -out client-key.pem 2048
openssl req -out client.csr -key client-key.pem -new
openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client-crt.pem -days 36500

经过上面8条命令后,即可生成所需的所有证书文件,其中:

客户端使用:ca.pem、client-crt.pem、client-key.pem

服务端使用:ca.crt、server.crt、server.key

3、mosquitto.conf配置如下:

4、golang客户端测试代码

  1 package cmd
  2 
  3 import (
  4     "crypto/tls"
  5     "crypto/x509"
  6     fmt "fmt"
  7     "io/ioutil"
  8     "os"
  9     "time"
 10 
 11     "github.com/apex/log"
 12     MQTT "github.com/eclipse/paho.mqtt.golang"
 13 )
 14 
 15 var ctx log.Interface
 16 
 17 const QoS = 0x02
 18 
 19 func init() {
 20     fmt.Printf("init mqtt testn")
 21 
 22 }
 23 
 24 func RunMqttClient() {
 25     fmt.Printf("Run mqtt testn")
 26     var logLevel = log.InfoLevel
 27     ctx = &log.Logger{
 28         Level:   logLevel, 29         Handler: NewLogHanler(os.Stdout),128);line-height:1.5;"> 30     }
 31 
 32     mqttClient := NewClient(
 33         ctx,128);line-height:1.5;"> 34         "ttnhdl",128);line-height:1.5;"> 35         "",128);line-height:1.5;"> 36         "",128);line-height:1.5;"> 37         fmt.Sprintf("ssl://%s","192.168.195.201:8883"),128);line-height:1.5;"> 38     )
 39 
 40     var err = mqttClient.Connect()
 41     if err != nil {
 42         ctx.WithError(err).Fatal("Could not connect to MQTT")
 43         fmt.Printf("Could not connect to MQTTn")
 44     } else {
 45         fmt.Printf("Success connect to MQTTn")
 46     }
 47 
 48     mqttClient.PublishUplink("test","hello mqtt!")
 49     mqttClient.SubscribeUplink("test")
 50 
 51     for true {
 52 
 53     }
 54 }
 55 
 56 // Client connects to the MQTT server and can publish/subscribe on uplink,downlink and activations from devices
 57 type Client interface {
 58     Connect() error
 59     Disconnect()
 60 
 61     IsConnected() bool
 62 
 63      Uplink pub/sub
 64     PublishUplink(topic string,msg string) Token
 65     SubscribeUplink(topic string) Token
 66 }
 67 
 68 type Token  69     Wait() bool
 70     WaitTimeout(time.Duration) bool
 71     Error() error
 72 }
 73 
 74 type simpleToken struct {
 75     err error
 76 }
 77 
 78  Wait always returns true
 79 func (t *simpleToken) Wait() bool {
 80     return true
 81 }
 82 
 83  WaitTimeout always returns true
 84 func (t *simpleToken) WaitTimeout(_ time.Duration) bool {
 85      86 }
 87 
 88  Error contains the error if present
 89 func (t *simpleToken) Error() error {
 90     return t.err
 91 }
 92 
 93 type defaultClient struct {
 94     mqtt MQTT.Client
 95     ctx  log.Interface
 96 }
 97 
 98 func NewClient(ctx log.Interface,id,username,password string,brokers ...string) Client {
 99     tlsconfig := NewTLSConfig()
100 
101     mqttOpts := MQTT.NewClientOptions()
102 
103     for _,broker := range brokers {
104         mqttOpts.AddBroker(broker)
105     }
106 
107     mqttOpts.SetClientID("ypf_dewqfvcdeqfcdqwcdq")
108     mqttOpts.SetUsername(username)
109     mqttOpts.SetPassword(password)
110 
111      TODO: Some tuning of these values probably won't hurt:
112     mqttOpts.SetKeepAlive(30 * time.Second)
113     mqttOpts.SetPingTimeout(10 * time.Second)
114 
115      Usually this setting should not be used together with random ClientIDs,but
116      we configured The Things Network's MQTT servers to handle this correctly.
117     mqttOpts.SetCleanSession(false)
118 
119     mqttOpts.SetDefaultPublishHandler(func(client MQTT.Client,msg MQTT.Message) {
120         ctx.WithField("message",msg).Warn("Received unhandled message")
121     })
122 
123     mqttOpts.SetConnectionLostHandler(func(client MQTT.Client,err error) {
124         ctx.WithError(err).Warn("Disconnected,reconnecting...")
125     })
126 
127     mqttOpts.SetOnConnectHandler(func(client MQTT.Client) {
128         ctx.Debug("Connected")
129     })
130 
131     mqttOpts.SetTLSConfig(tlsconfig)
132 
133     return &defaultClient{
134         mqtt: MQTT.NewClient(mqttOpts),128);line-height:1.5;">135         ctx:  ctx,128);line-height:1.5;">136     }
137 }
138 
139 var (
140      ConnectRetries says how many times the client should retry a failed connection
141     ConnectRetries = 10
142      ConnectRetryDelay says how long the client should wait between retries
143     ConnectRetryDelay = time.Second
144 )
145 
146 func (c *defaultClient) Connect() error {
147     if c.mqtt.IsConnected() {
148         return nil
149     }
150     var err error
151     for retries := 0; retries < ConnectRetries; retries++ {
152         token := c.mqtt.Connect()
153         token.Wait()
154         err = token.Error()
155         if err == nil {
156             break
157         }
158         <-time.After(ConnectRetryDelay)
159     }
160     161         return fmt.Errorf("Could not connect: %s",err)
162     }
163     164 }
165 
166 func (c *defaultClient) Disconnect() {
167     if !c.mqtt.IsConnected() {
168         return
169     }
170     c.mqtt.Disconnect(25)
171 }
172 
173 func (c *defaultClient) IsConnected() bool {
174     return c.mqtt.IsConnected()
175 }
176 
177 func (c *defaultClient) PublishUplink(topic string,msg string) Token {
178     return c.mqtt.Publish(topic,QoS,false,msg)
179 }
180 
181 func (c *defaultClient) SubscribeUplink(topic string) Token {
182     return c.mqtt.Subscribe(topic,func(mqtt MQTT.Client,128);line-height:1.5;">183          Determine the actual topic
184         fmt.Printf("Success SubscribeUplink with msg:%sn",msg.Payload())
185     })
186 }
187 
188 func NewTLSConfig() *tls.Config {
189      Import trusted certificates from CAfile.pem.
190      Alternatively,manually add CA certificates to
191      default openssl CA bundle.
192     certpool := x509.NewCertPool()
193     pemCerts,err := ioutil.ReadFile("samplecerts/ca.pem")
194     195         certpool.AppendCertsFromPEM(pemCerts)
196     }
197     fmt.Println("0. resd pemCerts Success")
198 
199      Import client certificate/key pair
200     cert,err := tls.LoadX509KeyPair("samplecerts/client-crt.pem","samplecerts/client-key.pem")
201     202         panic(err)
203     }
204     fmt.Println("1. resd cert Success")
205 
206      Just to print out the client certificate..
207     cert.Leaf,err = x509.ParseCertificate(cert.Certificate[0])
208     209         panic(err)
210     }
211     fmt.Println("2. resd cert.Leaf Success")
212 
213      Create tls.Config with desired tls properties
214     return &tls.Config{
215          RootCAs = certs used to verify server cert.
216         RootCAs: certpool,128);line-height:1.5;">217          ClientAuth = whether to request cert from server.
218          Since the server is set up for SSL,this happens
219          anyways.
220         ClientAuth: tls.NoClientCert,128);line-height:1.5;">221          ClientCAs = certs used to validate client cert.
222         ClientCAs: nil,128);line-height:1.5;">223          InsecureSkipVerify = verify that cert contents
224          match server. IP matches what is in cert etc.
225         InsecureSkipVerify: true,128);line-height:1.5;">226          Certificates = list of certs client sends to server.
227         Certificates: []tls.Certificate{cert},128);line-height:1.5;">228     }
229 }

5、测试效果

服务端启动:

客户端运行:

6、JAVA版客户端实现

依赖:org.eclipse.paho.client.mqttv3、bcprov-jdk16-1.45.jar

MqttServiceClient代码:

package com.ypf.main; import java.util.Properties; 4 5 import org.eclipse.paho.client.mqttv3.MqttCallback; 6 import org.eclipse.paho.client.mqttv3.MqttClient; 7 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 8 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; 9 import org.eclipse.paho.client.mqttv3.MqttException; 10 import org.eclipse.paho.client.mqttv3.MqttMessage; 11 import org.eclipse.paho.client.mqttv3.MqttSecurityException; 12 import org.eclipse.paho.client.mqttv3.MqttTopic; 13 import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence; 15 import com.ypf.mqtt.SslUtil; /** 18 * 19 * @author LP by 2014-04-24 20 * 21 */ 22 public class MqttServiceClient implements MqttCallback { 24 private static final String MQTT_HOST = "ssl://192.168.195.201:8884"; 25 final String MQTT_CLIENT = "Test_"; 26 static String caFilePath = "D:/for-iot/LURA/src/mytest/samplecerts/ca.crt"; 27 static String clientCrtFilePath = "D:/for-iot/LURA/src/mytest/samplecerts/client.crt"; 28 static String clientKeyFilePath = "D:/for-iot/LURA/src/mytest/samplecerts/client.key"; 29 30 static MqttServiceClient mqttServiceClient = null; 31 32 private MqttClient client = 33 private MqttConnectOptions options = 34 35 /** 36 * 单例模式构造类 37 38 static MqttServiceClient getInstance() { 39 if (mqttServiceClient == null) { 40 mqttServiceClient = new MqttServiceClient(); 41 } 42 return mqttServiceClient; 43 } 44 45 private MqttServiceClient() { 46 System.out.println("init MQTTClientService"); 47 init(); 48 } 49 The major API implementation follows :- 52 * 初始化 53 54 void init() { 55 try { 56 57 host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 58 client = new MqttClient(MQTT_HOST,MQTT_CLIENT,255);line-height:1.5;">new MemoryPersistence()); 59 MQTT的连接设置 60 options = new MqttConnectOptions(); 61 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 62 options.setCleanSession(true); 63 设置连接的用户名 64 options.setUserName("ypf"); 65 设置连接的密码 66 options.setPassword("ruijie".toCharArray()); 67 设置超时时间 单位为秒 68 options.setConnectionTimeout(50); 69 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 70 options.setKeepAliveInterval(30); 71 TLS连接配置 72 options.setSocketFactory( 73 SslUtil.getSocketFactory(caFilePath,clientCrtFilePath,clientKeyFilePath,"cs123456")); 74 75 设置回调 76 client.setCallback(this); 77 78 } catch (Exception e) { 79 e.printStackTrace(); 80 } 81 } 82 * 连接到MQTT 84 void connect() { 86 System.out.println("Start connect----------"); 87 88 client.connect(options); 89 订阅主题的方法,2为消息的质量 90 client.subscribe("+/#",2); 91 发送消息 92 publish("test","撒打发水电费水电费"); 93 } 94 e.printStackTrace(); 95 } 96 } 97 98 99 * 断开连接到MQTT 100 101 void disconnect() { 102 System.out.println("Start disconnect----------"); 103 104 client.disconnect(); 105 } catch (MqttSecurityException e) { 106 e.printStackTrace(); 107 } catch (MqttException e) { 108 e.printStackTrace(); 109 } 110 } 111 112 113 * 发布消息 114 * @param topic 主题 115 msg 消息 116 117 void publish(String topic,String msg) { 118 System.out.println("Start publish----------"); 119 120 MqttTopic mqttTopic = client.getTopic(topic); 121 2为消息的质量 122 MqttDeliveryToken messageToken = mqttTopic.publish(msg.getBytes(),2,128);line-height:1.5;">123 System.out.println("publish success==>"+messageToken.getMessage()); 124 client.publish(topic,msg); 125 } 126 e.printStackTrace(); 127 } 128 } 129 130 131 -------------------------------------------------回调方法------------------------------------------------------------// 132 134 * 连接断开触发此方法 135 136 @Override 137 void connectionLost(Throwable cause) { 138 System.out.println("Connection Lost---------->" + cause.getMessage()); 139 } 140 141 142 * 消息达到触发此方法 143 144 @Override 145 void messageArrived(MqttTopic topic,MqttMessage message) 146 throws Exception { 147 System.out.println(topic + ":" + message.toString()); 148 } 149 150 151 * 消息发送成功触发此方法 152 153 @Override 154 void deliveryComplete(MqttDeliveryToken token) { 156 System.out.println("deliveryComplete---------" + token.getMessage()); 157 } 158 e.printStackTrace(); 159 } 160 } 161 162 void main(String[] args)164 165 MqttServiceClient.getInstance().disconnect(); 166 MqttServiceClient.getInstance().connect(); 167 new Thread() { 169 void run() { 170 int count = 0; 171 while(true && count < 3) { 172 173 Thread.sleep(1000*3); 174 } catch (InterruptedException e) { 175 e.printStackTrace(); 176 } 177 MqttServiceClient.getInstance().publish("test1/ypf","hello world ! count=" + count); 178 count ++; 179 } 180 }; 181 }.start(); 182 } 183 184 }

SslUtil代码:
1 package com.ypf.mqtt; 2 3 import java.io.ByteArrayInputStream; 4 import java.io.InputStreamReader; 5 import java.nio.file.Files; 6 import java.nio.file.Paths; 7 import java.security.KeyPair; 8 import java.security.KeyStore; 9 import java.security.Security; 10 import java.security.cert.X509Certificate; 11 12 import javax.net.ssl.KeyManagerFactory; 13 import javax.net.ssl.SSLContext; 14 import javax.net.ssl.SSLSocketFactory; 15 import javax.net.ssl.TrustManagerFactory; 16 17 import org.bouncycastle.jce.provider.BouncyCastleProvider; 18 import org.bouncycastle.openssl.*; 19 20 class SslUtil { 21 static SSLSocketFactory getSocketFactory(final String caCrtFile,255);line-height:1.5;">final String crtFile,255);line-height:1.5;">final String keyFile,128);line-height:1.5;">22 final String password) 23 Security.addProvider(new BouncyCastleProvider()); 24 25 load CA certificate 26 PEMReader reader = new PEMReader( 27 new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile))))); 28 X509Certificate caCert = (X509Certificate) reader.readObject(); 29 reader.close(); 30 31 load client certificate 32 reader = new PEMReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(crtFile))))); 33 X509Certificate cert = (X509Certificate) reader.readObject(); 34 reader.close(); 35 36 load client private key 37 reader = new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFile)))),128);line-height:1.5;">38 new PasswordFinder() { 39 @Override 40 char[] getPassword() { 41 return password.toCharArray(); 42 } 43 }); 44 KeyPair key = (KeyPair) reader.readObject(); 45 reader.close(); 46 47 CA certificate is used to authenticate server 48 KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType()); 49 caKs.load(null,255);line-height:1.5;">null); 50 caKs.setCertificateEntry("ca-certificate",caCert); 51 TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); 52 tmf.init(caKs); 53 54 client key and certificates are sent to server so it can authenticate 55 us 56 KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); 57 ks.load(58 ks.setCertificateEntry("certificate",cert); 59 ks.setKeyEntry("private-key",key.getPrivate(),password.toCharArray(),128);line-height:1.5;">60 new java.security.cert.Certificate[] { cert }); 61 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); 62 kmf.init(ks,password.toCharArray()); 63 64 finally,create SSL socket factory 65 SSLContext context = SSLContext.getInstance("TLSv1"); 66 context.init(kmf.getKeyManagers(),tmf.getTrustManagers(),128);line-height:1.5;">67 68 return context.getSocketFactory(); 69 } 70 }

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读