您的位置:首页 >聚焦 >

快播:mica-mqtt client,最好用的 java mqtt 客户端

2022-07-27 06:28:33    来源:程序员客栈
简介

mica-mqtt 基于 t-io 实现的简单、低延迟、高性能 的 mqtt 物联网开源组件。

mica-mqtt server 更加易于集成到已有服务和二次开发,降低自研物联网平台开发成本。

mica-mqtt client 是简单、易用的 java mqtt 客户端,更加容易集成到自己的业务代码中。今天笔者主要要介绍的就是 mica-mqtt client 的使用。


(资料图片仅供参考)

使用mica-mqtt-client Spring boot starter

添加依赖

net.dreamlumica-mqtt-client-spring-boot-starter1.3.7

配置项说明

mqtt:client:enabled:true#是否开启客户端,默认:false 使用到的场景有限,非必要请不要启用ip:127.0.0.1#连接的服务端 ip ,默认:127.0.0.1port:1883#端口:默认:1883name:Mica-Mqtt-Client#名称,默认:Mica-Mqtt-ClientclientId:000001#客户端Id(非常重要,一般为设备sn,不可重复)user-name:mica#认证的用户名password:123456#认证的密码timeout:5#超时时间,单位:秒,默认:5秒reconnect:true#是否重连,默认:truere-interval:5000#重连时间,默认5000毫秒version:MQTT_5# mqtt 协议版本,默认:3.1.1read-buffer-size:8KB#接收数据的 buffer size,默认:8kmax-bytes-in-message:10MB#消息解析最大 bytes 长度,默认:10Mbuffer-allocator:heap#堆内存和堆外内存,默认:堆内存keep-alive-secs:60# keep-alive 时间,单位:秒clean-session:true# mqtt clean session,默认:trueuse-ssl:false#是否启用 ssl,默认:false

连接状态监听

@ServicepublicclassMqttClientConnectListener{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(MqttClientConnectListener.class);@AutowiredprivateMqttClientCreatormqttClientCreator;@EventListenerpublicvoidonConnected(MqttConnectedEventevent){logger.info("MqttConnectedEvent:{}",event);}@EventListenerpublicvoidonDisconnect(MqttDisconnectEventevent){//离线时更新重连时的密码,适用于类似阿里云mqttclientId连接带时间戳的方式logger.info("MqttDisconnectEvent:{}",event);//在断线时更新 clientId、username、passwordmqttClientCreator.clientId("newClient"+System.currentTimeMillis()).username("newUserName").password("newPassword");}}

自定义配置java(可选)

@Configuration(proxyBeanMethods=false)publicclassMqttClientCustomizerConfiguration{@BeanpublicMqttClientCustomizermqttClientCustomizer(){returnnewMqttClientCustomizer(){@Overridepublicvoidcustomize(MqttClientCreatorcreator){//此处可自定义配置creator,会覆盖yml中的配置System.out.println("----------------MqttServerCustomizer-----------------");}};}}

订阅示例

@ServicepublicclassMqttClientSubscribeListener{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(MqttClientSubscribeListener.class);@MqttClientSubscribe("/test/#")publicvoidsubQos0(Stringtopic,ByteBufferpayload){logger.info("topic:{}payload:{}",topic,ByteBufferUtil.toString(payload));}@MqttClientSubscribe(value="/qos1/#",qos=MqttQoS.AT_LEAST_ONCE)publicvoidsubQos1(Stringtopic,ByteBufferpayload){logger.info("topic:{}payload:{}",topic,ByteBufferUtil.toString(payload));}}

MqttClientTemplate 使用示例

@ServicepublicclassMainService{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(MainService.class);@AutowiredprivateMqttClientTemplateclient;publicbooleanpublish(){//发布消息示例client.publish("/test/client",ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));returntrue;}publicbooleansub(){//订阅消息示例client.subQos0("/test/#",(topic,payload)->{logger.info(topic+"\t"+ByteBufferUtil.toString(payload));});returntrue;}}

共享订阅 topic 说明

mica-mqtt client 支持两种共享订阅方式:

共享订阅:订阅前缀 $queue/,多个客户端订阅了 $queue/topic,发布者发布到topic,则只有一个客户端会接收到消息。

分组订阅:订阅前缀 $share//,组客户端订阅了$share/group1/topic、$share/group2/topic..,发布者发布到topic,则消息会发布到每个group中,但是每个group中只有一个客户端会接收到消息。

jfinal mica-mqtt client(1.3.7 开始支持)

添加依赖

net.dreamlujfinal-mica-mqtt-client1.3.7

删除 jfinal-demo 中的 slf4j-nop 依赖

添加 slf4j-log4j12

org.slf4jslf4j-log4j121.7.33

在 jfinal Config configPlugin 中添加 mica-mqtt client 插件

MqttClientPluginmqttClientPlugin=newMqttClientPlugin();mqttClientPlugin.config(mqttClientCreator->{//设置mqtt连接配置信息mqttClientCreator.clientId("clientId")//按需配置,相同的会互踢.ip("mqtt.dreamlu.net").port(1883).connectListener(Aop.get(MqttClientConnectListener.class));});me.add(mqttClientPlugin);

在 jfinal Config onStart 启动完成之后添加 mqtt 订阅

@OverridepublicvoidonStart(){IMqttClientMessageListenerclientMessageListener=Aop.get(TestMqttClientMessageListener.class);MqttClientKit.subQos0("#",clientMessageListener);}

使用 MqttClientKit 发送消息

MqttClientKit.publish("mica","hello".getBytes(StandardCharsets.UTF_8));

示例代码 MqttClientConnectListener

publicclassMqttClientConnectListenerimplementsIMqttClientConnectListener{@OverridepublicvoidonConnected(ChannelContextchannelContext,booleanisReconnect){if(isReconnect){System.out.println("重连mqtt服务器重连成功...");}else{System.out.println("连接mqtt服务器成功...");}}@OverridepublicvoidonDisconnect(ChannelContextchannelContext,Throwablethrowable,Stringremark,booleanisRemove){System.out.println("mqtt链接断开remark:"+remark+"isRemove:"+isRemove);}}

示例 TestMqttClientMessageListener

publicclassTestMqttClientMessageListenerimplementsIMqttClientMessageListener{@OverridepublicvoidonMessage(Stringtopic,MqttPublishMessagemqttPublishMessage,ByteBufferbyteBuffer){System.out.println("收到消息topic:"+topic+"内容:\n"+ByteBufferUtil.toString(byteBuffer));}}

其它 java 项目

添加依赖

net.dreamlumica-mqtt-core1.3.7org.t-iotio-websocket-servernet.dreamlumica-mqtt-modelcom.alibabafastjson

使用

//初始化mqtt客户端MqttClientclient=MqttClient.create().ip("127.0.0.1")//mqtt服务端ip地址.port(1883)//默认:1883.username("admin")//账号.password("123456")//密码.version(MqttVersion.MQTT_5)//默认:3_1_1.clientId("xxxxxx")//非常重要务必手动设置,一般设备 sn 号,默认:MICA-MQTT-前缀和 36进制的纳秒数.bufferAllocator(ByteBufferAllocator.DIRECT)//堆内存和堆外内存,默认:堆内存.readBufferSize(512)//消息一起解析的长度,默认:为 8092 (mqtt 消息最大长度).maxBytesInMessage(1024*10)//最大包体长度,如果包体过大需要设置此参数,默认为:10M (10*1024*1024).keepAliveSecs(120)//默认:60s.timeout(10)//超时时间,t-io配置,可为null,为null时,t-io默认为5.reconnect(true)//是否重连,默认:true.reInterval(5000)//重连重试时间,reconnect 为 true 时有效,t-io 默认为:5000.willMessage(builder->{builder.topic("/test/offline").messageText("down");//遗嘱消息}).connectListener(newIMqttClientConnectListener(){@OverridepublicvoidonConnected(ChannelContextcontext,booleanisReconnect){logger.info("链接服务器成功...");}@OverridepublicvoidonDisconnect(ChannelContextchannelContext,Throwablethrowable,Stringremark,booleanisRemove){logger.info("与链接服务器断开连接...");}}).properties()//mqtt5properties.connect();//异步连接,还支持同步connectSync()//消息订阅,同类方法subxxxclient.subQos0("/test/#",(topic,payload)->{logger.info(topic+"\t"+ByteBufferUtil.toString(payload));});//取消订阅client.unSubscribe("/test/#");//发送消息client.publish("/test/client",ByteBuffer.wrap("mica-mqtt牛皮".getBytes(StandardCharsets.UTF_8)));//断开连接client.disconnect();//重连client.reconnect();

鸣谢

mica-mqtt 从一个试验性的项目逐渐完善,目前 gitee 上已有 800 多颗星。

mica-mqtt 的成长也离不开大伙使用和积极反馈,感谢 @冷月宫主、@willianfu、@hjkJOJO、@Symous、@hongfeng11、@胡萝博、@杨钊、@一醉化千愁、@toskeyfine、@亡羊补牛等同学,谢谢大家!!!

关键词: 发送消息 非常重要 断开连接

相关阅读