赞
踩
上一篇RabbitMQ 搭建MQTT服务 中,创建了admin-test 用户,并创建 test-host 虚拟主机 专门处理MQTT业务,接着我们自己开发一个连接RabbitMQ MQTT服务的客户端
- var mqttFactory = new MqttFactory();
- mqttClient = mqttFactory.CreateMqttClient() as MqttClient;
- var options = new MqttClientOptions
- {
- ClientId = clientId,
- ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311,
- ChannelOptions = new MqttClientTcpOptions
- {
- Server = tcpServer,
- Port = tcpPort
- },
- WillDelayInterval = 10,
- WillMessage = new MqttApplicationMessage
- {
- Topic = $"LastWill/{clientId}",
- Payload = Encoding.UTF8.GetBytes("I Lost the Connection!"),
- QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce
- }
- };
-
- if (options.ChannelOptions == null)
- {
- throw new InvalidOperationException();
- }
-
- if (!string.IsNullOrWhiteSpace(mqttUser))
- {
- options.Credentials = new MqttClientCredentials
- {
- Username = mqttUser,
- Password = Encoding.UTF8.GetBytes(mqttPassword)
- };
- }
-
- // 设置为false,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
- // 设置为true,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。
- options.CleanSession = true;
-
- // 连接保活心跳
- options.KeepAlivePeriod = TimeSpan.FromSeconds(10);

- mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Action<MqttClientConnectedEventArgs>(e =>
- {
- LogManager.WriteLogEx(LOGLEVEL.INFO, "客户端已连接");
- }));
-
- mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(new Action<MqttClientDisconnectedEventArgs>(e =>
- {
- LogManager.WriteLogEx(LOGLEVEL.INFO, "客户端已断开连接");
- }));
-
- mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(e =>
- {
- string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
- string topic = e.ApplicationMessage.Topic;
- string qos = e.ApplicationMessage.QualityOfServiceLevel.ToString();
- string retained = e.ApplicationMessage.Retain.ToString();
-
- LogManager.WriteLogEx(LOGLEVEL.INFO, $"客户端接收消息 >>Topic:{topic}; Qos:{qos}; Retained:{retained}");
- LogManager.WriteLogEx(LOGLEVEL.INFO, $"客户端接收消息 >>Msg:{text}");
- }));

- await mqttClient.ConnectAsync(options);
- LogManager.WriteLogEx(LOGLEVEL.INFO, $"客户端{options.ClientId}尝试连接...");
await mqttClient.SubscribeAsync(topic);
- var message = new MqttApplicationMessage
- {
- Topic = topic,
- Payload = Encoding.UTF8.GetBytes(payload),
- QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce,
- Retain = true
- };
-
- await mqttClient.PublishAsync(message, CancellationToken.None);
https://docs.emqx.cn/broker/v4.3/development/protocol.html#mqtt%E5%8D%8F%E8%AE%AE
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。