cxl
Published on 2025-06-09 / 18 Visits
0
0

MQTT 协议-客户端

MQTT(Message Queuing Telemetry Transport)是一种基于发布 / 订阅模式的轻量级消息传输协议,专为资源受限的设备和低带宽、高延迟或不可靠的网络设计。本文将介绍 MQTT 协议中的客户端概念、适用场景,并提供 Python、Java 和 Node.js 三种编程语言的实现示例。

MQTT 客户端概述

MQTT 客户端是使用 MQTT 协议与 MQTT 代理(Broker)进行通信的实体,可以是设备、应用程序或服务。客户端的主要功能包括:

  • 连接到 MQTT 代理

  • 发布消息到特定主题

  • 订阅感兴趣的主题以接收消息

  • 断开与代理的连接

MQTT 客户端具有以下特点:

  • 轻量级,资源占用少

  • 支持多种 QoS(服务质量)级别

  • 支持保留消息和遗嘱消息

  • 支持 TLS 加密

MQTT 客户端适用场景

MQTT 客户端广泛应用于以下场景:

  1. 物联网(IoT):连接传感器、智能设备和云平台,实现数据采集和远程控制

  2. 工业监控:实时监控生产设备状态和环境参数

  3. 智能家居:连接智能家电、门锁、摄像头等设备,实现家庭自动化

  4. 移动应用:实现即时通讯、推送通知等功能

  5. 远程医疗:传输患者生命体征数据,支持远程诊断

  6. 车联网:车辆状态监控、远程控制和自动驾驶数据传输

MQTT 客户端实现示例

下面分别使用 Python、Java 和 Node.js 实现 MQTT 客户端连接示例,broker代理端使用了上篇在本地搭建好的服务

1. Python 实现

使用了 conda 创建的python 3.12版本运行,需先安装对应mtqq包再运行下面脚本,pip install paho-mqtt

  import paho.mqtt.client as mqtt
  import time
  
  # 连接成功回调
  def on_connect(client, userdata, flags, rc):
      print(f"Connected with result code {rc}")
      # 订阅主题
      client.subscribe("test/topic")
  
  # 消息接收回调
  def on_message(client, userdata, msg):
      print(f"{msg.topic} {msg.payload.decode()}")
  
  # 创建客户端实例
  client = mqtt.Client()
  
  # 指定回调函数
  client.on_connect = on_connect
  client.on_message = on_message
  
  # 连接MQTT代理
  client.connect("localhost", 1883, 60)
  
  # 启动循环处理网络流量
  client.loop_start()
  
  try:
      while True:
          # 发布消息
          client.publish("test/topic", "Hello, MQTT!")
          time.sleep(2)
  except KeyboardInterrupt:
      print("Exiting...")
      # 停止循环
      client.loop_stop()
      # 断开连接
      client.disconnect()

2. Java 实现

下列代码我用了jdk21执行,首先创建个maven项目并引入相关依赖,之后便直接运行main方法。

  <!-- 加入mqtt依赖 -->
  <dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
  </dependency>
  import org.eclipse.paho.client.mqttv3.*;
  import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  
  public class MqttJavaClient {
      public static void main(String[] args) {
          String broker = "tcp://localhost:1883";
          String clientId = "JavaClient";
          String topic = "test/topic";
          String content = "Hello, Java MQTT!";
          int qos = 1;
          MemoryPersistence persistence = new MemoryPersistence();
  
          try {
              // 创建MQTT客户端实例
              MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
              
              // 设置回调
              sampleClient.setCallback(new MqttCallbackExtended() {
                  @Override
                  public void connectComplete(boolean reconnect, String serverURI) {
                      System.out.println("Connected to " + serverURI);
                      try {
                          // 连接成功后订阅主题
                          sampleClient.subscribe(topic);
                      } catch (MqttException e) {
                          e.printStackTrace();
                      }
                  }
  
                  @Override
                  public void connectionLost(Throwable cause) {
                      System.out.println("Connection lost");
                  }
  
                  @Override
                  public void messageArrived(String topic, MqttMessage message) throws Exception {
                      System.out.println("Message received: " + new String(message.getPayload()));
                  }
  
                  @Override
                  public void deliveryComplete(IMqttDeliveryToken token) {
                      System.out.println("Delivery complete");
                  }
              });
              
              // 连接选项,可设置账号密码或遗嘱消息等
              MqttConnectOptions connOpts = new MqttConnectOptions();
              connOpts.setCleanSession(true);
              System.out.println("Connecting to broker: " + broker);
              
              // 连接到代理
              sampleClient.connect(connOpts);
              System.out.println("Connected");
              
              // 发布消息
              MqttMessage message = new MqttMessage(content.getBytes());
              message.setQos(qos);
              sampleClient.publish(topic, message);
              System.out.println("Message published");
              
              // 保持主线程运行一段时间,接收消息
              Thread.sleep(5000);
              
              // 断开连接
              sampleClient.disconnect();
              System.out.println("Disconnected");
              System.exit(0);
          } catch (MqttException me) {
              System.out.println("reason " + me.getReasonCode());
              System.out.println("msg " + me.getMessage());
              System.out.println("loc " + me.getLocalizedMessage());
              System.out.println("cause " + me.getCause());
              System.out.println("excep " + me);
              me.printStackTrace();
          }catch (InterruptedException e) {
              throw new RuntimeException(e);
          }
      }
  }

3. Node.js 实现

下列代码我用了node的22.16.0版本,运行前需先安装mqtt包,相关api参考文档

  const mqtt = require('mqtt');

  // 连接选项
  const options = {
    clean: true, // 设置为false将启用持久会话
    connectTimeout: 4000, // 连接超时时间(毫秒)
    clientId: 'nodejsClient',
  };
  
  // 创建客户端实例
  const client = mqtt.connect('mqtt://localhost', options);
  
  // 连接成功事件
  client.on('connect', () => {
    console.log('Connected');
    
    // 订阅主题
    client.subscribe('test/topic', (err) => {
      if (!err) {
        console.log('Subscribed to test/topic');
      } else {
        console.error('Subscribe error:', err);
      }
    });
    
    // 发布消息
    client.publish('test/topic', 'Hello, MQTT from Node.js!', { qos: 1 });
  });
  
  // 接收消息事件
  client.on('message', (topic, message) => {
    // message是Buffer类型
    console.log(`Received message on ${topic}: ${message.toString()}`);
  });
  
  // 错误事件
  client.on('error', (err) => {
    console.error('Connection error:', err);
    client.end();
  });
  
  // 断开连接事件
  client.on('close', () => {
    console.log('Connection closed');
  });
  
  // 5秒后断开连接
  setTimeout(() => {
    client.end();
  }, 5000);

执行以上三种语言代码时,我同时还开了一个可视化MQTTX客户端,用于订阅它们发送的消息

总结

MQTT 协议因其轻量级、灵活性和可靠性,成为物联网和实时通信领域的首选协议。本文介绍了 MQTT 客户端的基本概念和适用场景,并提供了三种编程语言的实现示例。通过这些示例,你可以快速开始使用 MQTT 协议开发应用程序,实现设备间的高效通信。

在实际应用中,你可能需要根据具体需求调整 QoS 级别、添加认证和加密、处理重连逻辑等。MQTT 协议的更多高级特性,如遗嘱消息、保留消息和共享订阅等,也可以根据项目需求进行深入探索和应用。


Comment