1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > 【开源物联网】MQTT物联网网关Broker与Java开源实现

【开源物联网】MQTT物联网网关Broker与Java开源实现

时间:2019-05-08 19:10:21

相关推荐

【开源物联网】MQTT物联网网关Broker与Java开源实现

1、概述

MQTT是一种面向物联网通信的主流协议,源和目标之间通过中间代理实现面向主题的发布/订阅通信方式,如下图所示:

发布者发布某个主题的消息,通过MQTT代理Broker处理,已订阅该主题的接收者即可收到消息。为了控制消息的转发流程,需要对MQTT代理Broker逻辑进行定制。因此,需要研究MQTT代理Broker的工作原理。本项目研发了一套开源框架奇辰Open-API,物联网网关系统作为其中一个组件集成了MQTT代理Broker功能,实现MQTT开箱即用。

2、实现原理

MQTT底层是通过TCP/IP协议实现的,采用Java的非阻塞的通信框架Netty库实现。Netty开发的基础概念是Channel通道,Channel封装隐藏好了底层Socket的工作。物联网网关在实现MQTT代理Broker时为接入Broker的发布者、订阅者建立Channel,通过对Channel的管理及Channel消息内容的处理实现应用所需的网关业务功能。

3、开源实现

3.1程序入口

基于Java Netty库实现的物联网网关入口如下:

class ChThread extends Thread {private Server server;public ChThread(Server server) {this.server = server;}public void run() {server.startup();}}@SpringBootApplicationpublic class GatewayApplication {public static void main(String[] args) throws InterruptedException {SpringApplication.run(GatewayApplication.class, args);MqttBroker mqttBroker = new MqttBroker();ChThread mqtt_t = new ChThread(mqttBroker);mqtt_t.start();WSServer wSServer = new WSServer();ChThread ws_t = new ChThread(wSServer);ws_t.start();}}

考虑到物联网设备可能采用多种协议接入物联网网关,为不同协议开启不同线程进行运行,如第18-20行和第22-24行分别开启了原生MQTT协议服务和基于WebSocket的MQTT协议服务。本文重点介绍原生MQTT协议Broker实现。

3.2Broker启动

Broker的启动如下:

public class MqttBroker extends Server {private int port = 1883;private NioEventLoopGroup bossGroup;private NioEventLoopGroup workGroup;/*** 启动服务* * @throws InterruptedException*/public void startup() {try {bossGroup = new NioEventLoopGroup(1);workGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workGroup);bootstrap.channel(NioServerSocketChannel.class);bootstrap.option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).option(ChannelOption.SO_RCVBUF, 10485760);bootstrap.childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) {ChannelPipeline channelPipeline = ch.pipeline();// 设置读写空闲超时时间channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);channelPipeline.addLast("decoder", new MqttDecoder());channelPipeline.addLast(new MqttHandler());}});ChannelFuture f = bootstrap.bind(port).sync();f.channel().closeFuture().sync();} catch (Exception e) {System.out.println("start exception" + e.toString());}}/*** 关闭服务*/public void shutdown() throws InterruptedException {if (workGroup != null && bossGroup != null) {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();System.out.println("shutdown success");}}}

关键属性

Broker的运行实例化了Netty的ServerBootstrap服务,服务的两个关键属性bossGroup和workGroup分别管理了Broker的核心线程组与工作线程组。其中:

bossGroup核心线程组负责Broker的核心业务,包括监听客户端的连接并进行相应处理;workGroup承载了Broker为每个客户端连接建立的Channel的运行。

参数设置

调用ServerBootstrap的option和childOption分别为Channel创建时的属性进行设置。

3.3Channel消息处理

Broker在处理Channel的消息时采用Pipeline管道的方式,Pipeline由一系列Handler处理器构成,Handler分为Input输入和Output处理器,分别继承Netty的ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter。顾名思义,当消息流入网关Broker是依次调用Input Handler进行处理;当消息从网关Broker流出时依次调用Output Handler进行处理。

MQTT协议处理

为了实现对MQTT协议的处理,为Broker添加MQTT解码MqttDecoder和MQTT编码MqttEncoder分别作用于输入MQTT消息和以MQTT协议输出的内容,见Broker启动代码的第35、36行。

3.4MQTT网关业务

前面工作基于Netty框架实现了基础的消息处理框架,针对不同场景需要自定义网关处理逻辑。在MQTT的Channel处理Pipeline里添加自定义MqttHandler,见Broker启动代码第37行。

MqttHandler的具体处理逻辑如下:

public class MqttHandler extends ChannelInboundHandlerAdapter {private Logger log = LoggerFactory.getLogger(this.getClass());private ChannelSupervise channelSupervise = ChannelSupervise.getBean(ChannelSupervise.class);private RestTemplate restTemplate = new RestTemplate();private ReceiveHandler receiveHandler = new ReceiveHandler();// @Autowired// KafkaProducer kafkaProducer;/*** 客户端与服务端第一次建立连接时执行 在channelActive方法之前执行*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {super.channelRegistered(ctx);channelSupervise.addChannel(ctx.channel());}/*** 客户端与服务端 断连时执行 channelInactive方法之后执行*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {super.channelUnregistered(ctx);channelSupervise.removeChannel(ctx.channel());}/*** 从客户端收到新的数据时,这个方法会在收到消息时被调用*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {MqttMessage mqttMessage = (MqttMessage) msg;log.info("info--" + mqttMessage.toString());MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();Channel channel = ctx.channel();if (mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)) {// 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接// to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息MqttConnectPayload payload = (MqttConnectPayload) mqttMessage.payload();IotChannel iotChannel = channelSupervise.findChannel(channel.id().asLongText());if (iotChannel != null) {iotChannel.set_connected(true);iotChannel.setClient_id(payload.clientIdentifier());iotChannel.setUsername(payload.userName());iotChannel.setPassword(payload.passwordInBytes());try {// RestTemplate restTemplate = RestTemplate.getBean("restTemplate");// kafkaProducer.send(mqttMessage);ResponseEntity<String> responseEntity = restTemplate.getForEntity("/auth/device",String.class);HttpStatus statusCode = responseEntity.getStatusCode(); // 获取响应码if (statusCode == HttpStatus.OK) {iotChannel.set_registered(true);} else {iotChannel.set_registered(true);}} catch (Exception e) {iotChannel.set_registered(false);}}MqttMsgBack.connack(channel, mqttMessage);}switch (mqttFixedHeader.messageType()) {case PUBLISH: // 客户端发布消息// PUBACK报文是对QoS 1等级的PUBLISH报文的响应// System.out.println("123");// KafkaProducer kafkaProducer = KafkaProducer.getBean("kafkaProducer");// kafkaProducer.send(mqttMessage);receiveHandler.receiveMessage(mqttMessage);MqttMsgBack.puback(channel, mqttMessage);break;case PUBREL: // 发布释放// PUBREL报文是对PUBREC报文的响应// to doMqttMsgBack.pubcomp(channel, mqttMessage);break;case SUBSCRIBE: // 客户端订阅主题// 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。// 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。// SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端// to doMqttSubscribePayload payload = (MqttSubscribePayload) mqttMessage.payload();List<MqttTopicSubscription> topicSubscriptions = payload.topicSubscriptions();for (MqttTopicSubscription mqttTopicSubscription : topicSubscriptions) {channelSupervise.subTopic(mqttTopicSubscription.topicName(), channel);}MqttMsgBack.suback(channel, mqttMessage);break;case UNSUBSCRIBE: // 客户端取消订阅// 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题// to doMqttUnsubscribeMessage mqttUnsubscribeMessage = (MqttUnsubscribeMessage) mqttMessage;channelSupervise.unsubTopic(mqttUnsubscribeMessage.payload().topics(), channel);MqttMsgBack.unsuback(channel, mqttMessage);break;case PINGREQ: // 客户端发起心跳// 客户端发送PINGREQ报文给服务端的// 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着// 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开MqttMsgBack.pingresp(channel, mqttMessage);break;case DISCONNECT: // 客户端主动断开连接// DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0// to dobreak;default:break;}}/*** 从客户端收到新的数据、读取完成时调用*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws IOException {}/*** 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);ctx.close();}/*** 客户端与服务端第一次建立连接时执行*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);}/*** 客户端与服务端 断连时执行*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {super.channelInactive(ctx);}/*** 服务端 当读超时时 会调用这个方法*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {super.userEventTriggered(ctx, evt);ctx.close();}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {super.channelWritabilityChanged(ctx);}}

MqttHandler继承自ChannelInboundHandlerAdapter实现对输入Mqtt消息进行自定义处理。核心业务代码见第36行,当检测到Channel有数据后读取Mqtt消息内容类型进行相应处理。定义MqttMsgBack和ReceiveHandler分别对收到Mqtt消费进行消息反馈和消息进一步处理。

Mqtt消息订阅

收到Mqtt消息识别出消息主题后,为了将消息转发给订阅者,在ReceiveHandler里面实现此功能如下:

public class ReceiveHandler {private ChannelSupervise channelSupervise = ChannelSupervise.getBean(ChannelSupervise.class);public void receiveMessage(MqttMessage mqttMessage) {MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;String topic = mqttPublishMessage.variableHeader().topicName();ChannelGroup channelGroup = channelSupervise.subChannels(topic);if ((!(channelGroup == null)) && channelGroup.size() > 0) {MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false,mqttPublishMessage.fixedHeader().qosLevel(), false,mqttPublishMessage.fixedHeader().remainingLength());// 构建返回报文, 可变报头MqttPublishVariableHeader mqttMessageIdVariableHeader = new MqttPublishVariableHeader(topic,mqttPublishMessage.variableHeader().packetId());MqttPublishMessage mqttMessagePublish = new MqttPublishMessage(mqttFixedHeader, mqttMessageIdVariableHeader,mqttPublishMessage.payload());channelGroup.writeAndFlush(mqttMessagePublish);}}}

在第8行获取所有订阅过当前处理主题的Channel,将消息已Mqtt协议方法发送出去。

4、更多

开源项目:Open-Api

更多信息:

5、后续完善

到此已实现一个开源MQTT网关基础框架。后续可以继续完善的地方主要有:考虑到前端技术大量使用http、web方式,研究在Websocket之上如何传输MQTT消息,这样可以打通前端主流技术和底层设备的MQTT接入。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。