MQTT类 - C# - .NET Core

hmister
2025-02-10 / 0 评论 / 53 阅读 / 正在检测是否收录...
一个常用的MQTT的操作类
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Security.Cryptography.X509Certificates;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Ssl;

public class MqttClientService
{
    private IMqttClient _client;
    private MqttClientOptions _options;
    private readonly string _broker = "your-mqtt-broker";  // MQTT 服务器地址
    private readonly int _port = 1883;                     // MQTT 端口(1883: TCP, 8883: TLS)
    private readonly string _clientId = "CSharpClient";    // MQTT 客户端ID
    private readonly string _username = "your-username";   // MQTT 账号
    private readonly string _password = "your-password";   // MQTT 密码
    private readonly int _reconnectDelay = 5000;           // 断线重连间隔(毫秒)

    public event Action<string, string> OnMessageReceived; // 事件回调(主题,消息)
    public event Action<bool> OnConnectionChanged;         // 连接状态回调(true=已连接,false=断开)

    public bool IsConnected { get; private set; } = false; // 连接状态

    public MqttClientService()
    {
        InitializeMqttClient();
    }

    private void InitializeMqttClient()
    {
        var factory = new MqttFactory();
        _client = factory.CreateMqttClient();

        _client.UseConnectedHandler(async e =>
        {
            Console.WriteLine("MQTT 已连接到服务器");
            IsConnected = true;
            OnConnectionChanged?.Invoke(true);  // 触发连接成功事件
        });

        _client.UseDisconnectedHandler(async e =>
        {
            Console.WriteLine("MQTT 连接丢失,尝试重连...");
            IsConnected = false;
            OnConnectionChanged?.Invoke(false); // 触发连接断开事件

            await Task.Delay(_reconnectDelay);
            await ConnectAsync();
        });

        _client.UseApplicationMessageReceivedHandler(e =>
        {
            string topic = e.ApplicationMessage.Topic;
            string payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
            OnMessageReceived?.Invoke(topic, payload); // 触发消息接收事件
        });

        _options = new MqttClientOptionsBuilder()
            .WithTcpServer(_broker, _port)
            .WithClientId(_clientId)
            .WithCredentials(_username, _password)
            .WithCleanSession()
            .Build();
    }

    /// <summary>
    /// 普通连接(非 SSL/TLS)
    /// </summary>
    public async Task ConnectAsync()
    {
        if (!_client.IsConnected)
        {
            try
            {
                await _client.ConnectAsync(_options, CancellationToken.None);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"MQTT 连接失败: {ex.Message}");
                await Task.Delay(_reconnectDelay);
                await ConnectAsync();
            }
        }
    }

    public async Task SubscribeAsync(string topic)
    {
        if (_client.IsConnected)
        {
            await _client.SubscribeAsync(new MqttTopicFilterBuilder()
                .WithTopic(topic)
                .Build());
            Console.WriteLine($"已订阅主题: {topic}");
        }
        else
        {
            Console.WriteLine("MQTT 未连接,无法订阅主题");
        }
    }

    public async Task PublishAsync(string topic, string message)
    {
        if (_client.IsConnected)
        {
            var msg = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(message)
                .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
                .Build();

            await _client.PublishAsync(msg);
            Console.WriteLine($"已发布消息: 主题={topic}, 内容={message}");
        }
        else
        {
            Console.WriteLine("MQTT 未连接,无法发布消息");
        }
    }

    public async Task DisconnectAsync()
    {
        if (_client.IsConnected)
        {
            await _client.DisconnectAsync();
            IsConnected = false;
            OnConnectionChanged?.Invoke(false); // 触发连接断开事件
            Console.WriteLine("MQTT 已断开连接");
        }
    }
}
使用示例
using System;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        var mqttService = new MqttClientService();

        // 订阅消息接收事件
        mqttService.OnMessageReceived += HandleMqttMessage;

        // 订阅连接状态变化事件
        mqttService.OnConnectionChanged += HandleConnectionStatus;

        // **普通连接**
        await mqttService.ConnectAsync();

      

        await mqttService.SubscribeAsync("test/topic");

        // 定时检查连接状态
        Task.Run(async () =>
        {
            while (true)
            {
                Console.WriteLine($"当前 MQTT 连接状态: {(mqttService.IsConnected ? "已连接" : "未连接")}");
                await Task.Delay(5000);
            }
        });

        // 发布消息
        await mqttService.PublishAsync("test/topic", "Hello MQTT Secure!");

        Console.ReadLine();
        await mqttService.DisconnectAsync();
    }

    // **单独的回调方法 - 处理收到的消息**
    static void HandleMqttMessage(string topic, string message)
    {
        Console.WriteLine($"收到消息: 主题={topic}, 内容={message}");
    }

    // **单独的回调方法 - 处理连接状态变化**
    static void HandleConnectionStatus(bool isConnected)
    {
        Console.WriteLine($"MQTT 连接状态变化: {(isConnected ? "已连接" : "断开连接")}");
    }
}
0

评论 (0)

取消