MQTT(Message Queuing Telemetry Transport)是一种基于发布 / 订阅模式的轻量级消息传输协议,专为资源受限的设备和低带宽、高延迟或不可靠的网络设计。本文将介绍 MQTT 协议中的客户端概念、适用场景,并提供 Python、Java 和 Node.js 三种编程语言的实现示例。
MQTT 客户端概述
MQTT 客户端是使用 MQTT 协议与 MQTT 代理(Broker)进行通信的实体,可以是设备、应用程序或服务。客户端的主要功能包括:
连接到 MQTT 代理
发布消息到特定主题
订阅感兴趣的主题以接收消息
断开与代理的连接
MQTT 客户端具有以下特点:
轻量级,资源占用少
支持多种 QoS(服务质量)级别
支持保留消息和遗嘱消息
支持 TLS 加密
MQTT 客户端适用场景
MQTT 客户端广泛应用于以下场景:
物联网(IoT):连接传感器、智能设备和云平台,实现数据采集和远程控制
工业监控:实时监控生产设备状态和环境参数
智能家居:连接智能家电、门锁、摄像头等设备,实现家庭自动化
移动应用:实现即时通讯、推送通知等功能
远程医疗:传输患者生命体征数据,支持远程诊断
车联网:车辆状态监控、远程控制和自动驾驶数据传输
MQTT 客户端实现示例
下面分别使用 Python、Java 和 Node.js 实现 MQTT 客户端连接示例,broker代理端使用了上篇在本地搭建好的服务
1. Python 实现
使用了 conda 创建的python 3.12版本运行,需先安装对应mtqq包再运行下面脚本,pip install paho-mqtt
import paho.mqtt.client as mqtt
import time
# 连接成功回调
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# 订阅主题
client.subscribe("test/topic")
# 消息接收回调
def on_message(client, userdata, msg):
print(f"{msg.topic} {msg.payload.decode()}")
# 创建客户端实例
client = mqtt.Client()
# 指定回调函数
client.on_connect = on_connect
client.on_message = on_message
# 连接MQTT代理
client.connect("localhost", 1883, 60)
# 启动循环处理网络流量
client.loop_start()
try:
while True:
# 发布消息
client.publish("test/topic", "Hello, MQTT!")
time.sleep(2)
except KeyboardInterrupt:
print("Exiting...")
# 停止循环
client.loop_stop()
# 断开连接
client.disconnect()
2. Java 实现
下列代码我用了jdk21执行,首先创建个maven项目并引入相关依赖,之后便直接运行main方法。
<!-- 加入mqtt依赖 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttJavaClient {
public static void main(String[] args) {
String broker = "tcp://localhost:1883";
String clientId = "JavaClient";
String topic = "test/topic";
String content = "Hello, Java MQTT!";
int qos = 1;
MemoryPersistence persistence = new MemoryPersistence();
try {
// 创建MQTT客户端实例
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
// 设置回调
sampleClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connected to " + serverURI);
try {
// 连接成功后订阅主题
sampleClient.subscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Message received: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivery complete");
}
});
// 连接选项,可设置账号密码或遗嘱消息等
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: " + broker);
// 连接到代理
sampleClient.connect(connOpts);
System.out.println("Connected");
// 发布消息
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
// 保持主线程运行一段时间,接收消息
Thread.sleep(5000);
// 断开连接
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
3. Node.js 实现
下列代码我用了node的22.16.0版本,运行前需先安装mqtt包,相关api参考文档
const mqtt = require('mqtt');
// 连接选项
const options = {
clean: true, // 设置为false将启用持久会话
connectTimeout: 4000, // 连接超时时间(毫秒)
clientId: 'nodejsClient',
};
// 创建客户端实例
const client = mqtt.connect('mqtt://localhost', options);
// 连接成功事件
client.on('connect', () => {
console.log('Connected');
// 订阅主题
client.subscribe('test/topic', (err) => {
if (!err) {
console.log('Subscribed to test/topic');
} else {
console.error('Subscribe error:', err);
}
});
// 发布消息
client.publish('test/topic', 'Hello, MQTT from Node.js!', { qos: 1 });
});
// 接收消息事件
client.on('message', (topic, message) => {
// message是Buffer类型
console.log(`Received message on ${topic}: ${message.toString()}`);
});
// 错误事件
client.on('error', (err) => {
console.error('Connection error:', err);
client.end();
});
// 断开连接事件
client.on('close', () => {
console.log('Connection closed');
});
// 5秒后断开连接
setTimeout(() => {
client.end();
}, 5000);
执行以上三种语言代码时,我同时还开了一个可视化MQTTX客户端,用于订阅它们发送的消息
总结
MQTT 协议因其轻量级、灵活性和可靠性,成为物联网和实时通信领域的首选协议。本文介绍了 MQTT 客户端的基本概念和适用场景,并提供了三种编程语言的实现示例。通过这些示例,你可以快速开始使用 MQTT 协议开发应用程序,实现设备间的高效通信。
在实际应用中,你可能需要根据具体需求调整 QoS 级别、添加认证和加密、处理重连逻辑等。MQTT 协议的更多高级特性,如遗嘱消息、保留消息和共享订阅等,也可以根据项目需求进行深入探索和应用。