空色天绘/NEO TOKYO NOIR 03
1655 字
8 分钟
... 次访问
MQTT
MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

相关概念
- Publisher(发布者):消息的发出者,负责发送消息。
- Subscriber(订阅者):消息的订阅者,负责接收并处理消息。
- Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。
- Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。
- Payload(负载);可以理解为发送消息的内容。
- QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有
QoS 0、QoS 1、QoS 2三个等级,下面分别介绍下:- QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;
- QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;
- QoS 2(Exactly Once):只有一次,确保消息只到达一次。
RabbitMQ启用MQTT功能
RabbitMQ启用MQTT功能,需要先安装然后再启用插件。
rabbitmq-plugins enable rabbitmq_mqtt- 启用RabbitMQ的MQTT插件了,默认是不启用的,使用如下命令开启即可

- 开启成功后,查看管理控制台,我们可以发现MQTT服务运行在
1883端口上了。

MQTT客户端




前端直接实现即时通讯
rabbitmq-plugins enable rabbitmq_web_mqtt- WEB端与MQTT服务进行通讯需要使用一个叫
MQTT.js的库,项目地址:https://github.com/mqttjs/MQTT.js - 第一个订阅主题
testTopicA,访问地址:http://localhost:8088/page/index?topic=testTopicA - 第二个订阅主题
testTopicB,访问地址:http://localhost:8088/page/index?topic=testTopicB

在SpringBoot中使用
没有特殊业务需求的时候,前端可以直接和RabbitMQ对接实现即时通讯。但是有时候我们需要通过服务端去通知前端,此时就需要在应用中集成MQTT了,接下来我们来讲讲如何在SpringBoot应用中使用MQTT。
依赖
<!--Spring集成MQTT--><dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId></dependency>在application.yml中添加MQTT相关配置
# 应用服务 WEB 访问端口server: port: 8080# mqttrabbitmq: mqtt: url: tcp://localhost:1883 username: guest password: guest defaultTopic: testTopic代码
Java配置类
@Data@EqualsAndHashCode(callSuper = false)@Component@ConfigurationProperties(prefix = "rabbitmq.mqtt")public class MqttConfig { /** * RabbitMQ连接用户名 */ private String username; /** * RabbitMQ连接密码 */ private String password; /** * RabbitMQ的MQTT默认topic */ private String defaultTopic; /** * RabbitMQ的MQTT连接地址 */ private String url;}MQTT消息订阅者相关配置
使用@ServiceActivator注解声明一个服务激活器,通过MessageHandler来处理订阅消息
@Slf4j@Configurationpublic class MqttInboundConfig { @Autowired private MqttConfig mqttConfig;
@Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); }
@Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient", mqttConfig.getDefaultTopic()); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); //设置消息质量:0->至多一次;1->至少一次;2->只有一次 adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; }
@Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() {
@Override public void handleMessage(Message<?> message) throws MessagingException { //处理订阅消息 log.info("handleMessage : {}",message.getPayload()); }
}; }}MQTT消息发布者相关配置
使用@ServiceActivator注解声明一个服务激活器,通过MessageHandler来发布消息
@Configurationpublic class MqttOutboundConfig {
@Autowired private MqttConfig mqttConfig;
@Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { mqttConfig.getUrl()}); options.setUserName(mqttConfig.getUsername()); options.setPassword(mqttConfig.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; }
@Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publisherClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic()); return messageHandler; }
@Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }}添加MQTT网关,用于向主题中发送消息
@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway { /** * 发送消息到默认topic */ void sendToMqtt(String payload);
/** * 发送消息到指定topic */ void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
/** * 发送消息到指定topic并设置QOS */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}测试接口
@RestController@RequestMapping("/mqtt")public class MqttController {
@Autowired private MqttGateway mqttGateway;
/** * sendToDefaultTopic * @param payload */ @PostMapping("/sendToDefaultTopic") public void sendToDefaultTopic(String payload) { mqttGateway.sendToMqtt(payload); }
/** * sendToTopic * @param payload * @param topic */ @PostMapping("/sendToTopic") public void sendToTopic(String payload, String topic) { mqttGateway.sendToMqtt(payload, topic); }}

MQTT和Websocket的区别
MQTT(Message Queuing Telemetry Transport)和WebSocket都是在物联网(IoT)和实时Web应用中广泛使用的协议,但它们的设计目的、工作方式以及应用场景有所不同。
MQTT:
- 设计目的: MQTT最初是为低带宽、高延迟或不可靠的网络连接而设计的,特别适用于资源受限的设备之间的通信,比如传感器网络。
- 协议层级: MQTT运行在传输层之上,可以使用TCP/IP作为其传输协议。它是一个轻量级的消息协议,非常适合于机器对机器(M2M)和物联网通信。
- 发布/订阅模型: MQTT采用发布/订阅模式,允许设备(客户端)向一个主题(Topic)发布消息,其他订阅了该主题的设备可以接收到这些消息。这种模式减少了点对点连接的需求,提高了效率。
- 节电与带宽优化: MQTT支持QoS(Quality of Service)等级,确保消息的可靠传递,同时允许在不可靠的网络环境中优化带宽使用。
- 应用场景: 常用于远程监控、智能城市、工业自动化、农业环境监测等物联网领域。
WebSocket:
- 设计目的: WebSocket是一种在单个TCP连接上进行全双工通信的协议,旨在提供浏览器与服务器间的低延迟、双向实时通信,以实现实时Web应用。
- 协议层级: WebSocket建立在HTTP协议之上,通过一个HTTP握手升级为WebSocket连接,之后的数据交换不再使用HTTP,而是使用自己的帧格式。
- 双向通信: WebSocket支持服务器主动向客户端推送数据,无需客户端发起请求,实现了真正的双向通信。
- 灵活性: WebSocket本身不规定消息格式,开发者可以根据需要选择JSON、XML或其他格式来封装数据。
- 应用场景: 常用于在线聊天、协作编辑、实时游戏、股票交易、体育赛事直播等需要低延迟交互的Web应用。
总结:
- 如果你的应用场景侧重于低功耗、远程设备通信和需要在不稳定网络环境下运作,MQTT可能是更好的选择。
- 对于需要在Web浏览器和服务器之间实现高效、双向实时通信的应用,WebSocket则更为合适。
还在用WebSocket实现实时消息推送?试试MQTT吧,真香!
评论