一个常用的MQTT的操作类
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Security.Cryptography.X509Certificates;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Ssl;
public class MqttClientService
{
private IMqttClient _client;
private MqttClientOptions _options;
private readonly string _broker = "your-mqtt-broker"; // MQTT 服务器地址
private readonly int _port = 1883; // MQTT 端口(1883: TCP, 8883: TLS)
private readonly string _clientId = "CSharpClient"; // MQTT 客户端ID
private readonly string _username = "your-username"; // MQTT 账号
private readonly string _password = "your-password"; // MQTT 密码
private readonly int _reconnectDelay = 5000; // 断线重连间隔(毫秒)
public event Action<string, string> OnMessageReceived; // 事件回调(主题,消息)
public event Action<bool> OnConnectionChanged; // 连接状态回调(true=已连接,false=断开)
public bool IsConnected { get; private set; } = false; // 连接状态
public MqttClientService()
{
InitializeMqttClient();
}
private void InitializeMqttClient()
{
var factory = new MqttFactory();
_client = factory.CreateMqttClient();
_client.UseConnectedHandler(async e =>
{
Console.WriteLine("MQTT 已连接到服务器");
IsConnected = true;
OnConnectionChanged?.Invoke(true); // 触发连接成功事件
});
_client.UseDisconnectedHandler(async e =>
{
Console.WriteLine("MQTT 连接丢失,尝试重连...");
IsConnected = false;
OnConnectionChanged?.Invoke(false); // 触发连接断开事件
await Task.Delay(_reconnectDelay);
await ConnectAsync();
});
_client.UseApplicationMessageReceivedHandler(e =>
{
string topic = e.ApplicationMessage.Topic;
string payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
OnMessageReceived?.Invoke(topic, payload); // 触发消息接收事件
});
_options = new MqttClientOptionsBuilder()
.WithTcpServer(_broker, _port)
.WithClientId(_clientId)
.WithCredentials(_username, _password)
.WithCleanSession()
.Build();
}
/// <summary>
/// 普通连接(非 SSL/TLS)
/// </summary>
public async Task ConnectAsync()
{
if (!_client.IsConnected)
{
try
{
await _client.ConnectAsync(_options, CancellationToken.None);
}
catch (Exception ex)
{
Console.WriteLine($"MQTT 连接失败: {ex.Message}");
await Task.Delay(_reconnectDelay);
await ConnectAsync();
}
}
}
public async Task SubscribeAsync(string topic)
{
if (_client.IsConnected)
{
await _client.SubscribeAsync(new MqttTopicFilterBuilder()
.WithTopic(topic)
.Build());
Console.WriteLine($"已订阅主题: {topic}");
}
else
{
Console.WriteLine("MQTT 未连接,无法订阅主题");
}
}
public async Task PublishAsync(string topic, string message)
{
if (_client.IsConnected)
{
var msg = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(message)
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
await _client.PublishAsync(msg);
Console.WriteLine($"已发布消息: 主题={topic}, 内容={message}");
}
else
{
Console.WriteLine("MQTT 未连接,无法发布消息");
}
}
public async Task DisconnectAsync()
{
if (_client.IsConnected)
{
await _client.DisconnectAsync();
IsConnected = false;
OnConnectionChanged?.Invoke(false); // 触发连接断开事件
Console.WriteLine("MQTT 已断开连接");
}
}
}
使用示例
using System;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
var mqttService = new MqttClientService();
// 订阅消息接收事件
mqttService.OnMessageReceived += HandleMqttMessage;
// 订阅连接状态变化事件
mqttService.OnConnectionChanged += HandleConnectionStatus;
// **普通连接**
await mqttService.ConnectAsync();
await mqttService.SubscribeAsync("test/topic");
// 定时检查连接状态
Task.Run(async () =>
{
while (true)
{
Console.WriteLine($"当前 MQTT 连接状态: {(mqttService.IsConnected ? "已连接" : "未连接")}");
await Task.Delay(5000);
}
});
// 发布消息
await mqttService.PublishAsync("test/topic", "Hello MQTT Secure!");
Console.ReadLine();
await mqttService.DisconnectAsync();
}
// **单独的回调方法 - 处理收到的消息**
static void HandleMqttMessage(string topic, string message)
{
Console.WriteLine($"收到消息: 主题={topic}, 内容={message}");
}
// **单独的回调方法 - 处理连接状态变化**
static void HandleConnectionStatus(bool isConnected)
{
Console.WriteLine($"MQTT 连接状态变化: {(isConnected ? "已连接" : "断开连接")}");
}
}
评论 (0)