您确定要退出登录吗?

修改密码

原密码
新密码
确认新密码

mica-mqtt

如梦技术开源组织


物联网 框架/工具 t-io tio

一、产品介绍

mica-mqtt 基于 t-io 实现的简单低延迟高性能 的 mqtt 物联网开源组件。使用详见mica-mqtt-example 模块。

二、功能

  •  支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。
  •  MQTT client 客户端。
  •  MQTT server 服务端。
  • 支持 MQTT 遗嘱消息。
  • 支持 MQTT 保留消息。
  •  MQTT 客户端 topic sub,添加阿里云 mqtt demo。
  • 支持自定义消息(mq)处理转发实现集群。
  •  支持 GraalVM 编译成本机可执行程序。

三、文档

四、使用

经过压测和测试对比 netty 同类产品响应速度更快,t-io 确实很稳mica-mqtt-example 中有 mqtt 服务端和客户端测试代码, main 方法运行即可。

4.1 添加依赖

<dependency>
  <groupId>net.dreamlu</groupId>
  <artifactId>mica-mqtt-core</artifactId>
  <version>${最新版本}</version>
</dependency>

4.2 客户端

// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
    .ip("127.0.0.1")
    .port(1883)						// 默认:1883
    .username("admin")
    .password("123456")
    .version(MqttVersion.MQTT_5) 	// 默认:3_1_1
    .clientId("xxxxxx")				// 默认:MICA-MQTT- 前缀和 36进制的纳秒数
    .bufferAllocator(ByteBufferAllocator.DIRECT) // 堆内存和堆外内存,默认:堆内存
    .readBufferSize(512) 			// 消息一起解析的长度,默认:为 8092 (mqtt 消息最大长度)
    .keepAliveSecs(120// 默认:60s
    .timeout(10)					// 超时时间,t-io 配置,可为 null,为 null 时,t-io 默认为 5
    .reconnect(true)				// 是否重连,默认:true
    .reInterval(5000)				// 重连重试时间,reconnect 为 true 时有效,t-io 默认为:5000
    .willMessage(builder -> {
    builder.topic("/test/offline").messageText("hello");	// 遗嘱消息
    })
    // .properties()				// mqtt5 properties
    .connect();						// 连接


    // 消息订阅,同类方法 subxxx
    client.subQos0("/test/#", (topic, payload) -> {
        logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
    });
    // 取消订阅
    client.unSubscribe("/test/#");


    // 发送消息
    client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));


    // 断开连接
    client.disconnect();
    // 重连
    client.reconnect();
    // 停止
    client.stop();

4.3 服务端

// 注意:为了能接受更多链接(降低内存),请添加 jvm 参数 -Xss129k
MqttServer mqttServer = MqttServer.create()
	// 默认:127.0.0.1
	.ip("127.0.0.1")
	// 默认:1883
	.port(1883)
	// 默认为: 8092(mqtt 默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大 t-io 会尝试解析多次(建议根据实际业务情况而定)
	.readBufferSize(512)
	// 自定义认证
	.authHandler((clientId, userName, password) -> true)
	// 消息监听
	.messageListener((clientId, topic, mqttQoS, payload) -> {
		logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
	})
	// 堆内存和堆外内存选择,默认:堆内存
	.bufferAllocator(ByteBufferAllocator.HEAP)
	// 心跳超时时间,默认:120s
	.heartbeatTimeout(120_1000L)
	// ssl 配置
	.useSsl("", "", "")
	// 自定义客户端上下线监听
	.connectStatusListener(new IMqttConnectStatusListener() {
		@Override
		public void online(String clientId) {


		}


		@Override
		public void offline(String clientId) {


		}
	})
	// 自定义消息转发,可用 mq 广播实现集群化处理
	.messageDispatcher(new IMqttMessageDispatcher() {
		@Override
		public void config(MqttServer mqttServer) {


		}


		@Override
		public boolean send(Message message) {
			return false;
		}


		@Override
		public boolean send(String clientId, Message message) {
			return false;
		}
	})
	.debug() // 开启 t-io debug 信息日志
	.start();


// 发送给某个客户端
mqttServer.publish("clientId","/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()), MqttQoS.EXACTLY_ONCE);


// 发送给所有在线监听这个 topic 的客户端
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()), MqttQoS.EXACTLY_ONCE);


// 停止服务
mqttServer.stop();

五、参考vs借鉴

六、工具

七、微信公众号

八、对t-io的评价

t-io确实很稳



登录 和大家一起讨论吧!

重置 发表评论


确定要删除该条评论吗?

案例不存在!