写在前面
别嫌我啰嗦,我想聊几句vertx
使用vertx
框架已经好多年了,其本身是基于大名鼎鼎的netty
,所以为什么我一直很看好vertx
想必不用过多解释了。
从IO
设计角度考虑vertx
是比springboot tomcat-embed
性能更加高效的。重点是vertx
的core
是单实例,有没有想到redis
?殊途同归。
问题
我在使用vertx-mqtt
作为mqtt
客户端,发现当字节超过一定长度就会发生阻塞,导致当前客户端实例必须重启才能重新使用。于是我给其开发团队提了一个bug,最终在一个大兄弟的指点下,研究了其数据编码的代码
private void initChannel(ChannelPipeline pipeline) {
// add into pipeline netty's (en/de)coder
pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
if (this.options.getMaxMessageSize() > 0) {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
} else {
// max message size not set, so the default from Netty MQTT codec is used
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
}
这个 new MqttDecoder()
默认private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092
。也就是说你不指定mqttClientOptions.setMaxMessageSize
时,默认值就是-1
,到了上面这个initChannel
方法,最终最大长度都是8092
,这里我仍然认为是框架的code
错误,这个默认值太不合理了。
解决
查看了mqtt
服务器MaxMessageSize
有这么一段说明:
# This option sets the maximum publish payload size that the broker will allow.
# Received messages that exceed this size will not be accepted by the broker.
# The default value is 0, which means that all valid MQTT messages are
# accepted. MQTT imposes a maximum payload size of 268435455 bytes.
#message_size_limit 0
于是在vertx-mqtt
组件声明的时候,加上mqttClientOptions.setMaxMessageSize(268435455);
就行了
完整例子:
MqttClientOptions mqttClientOptions = new MqttClientOptions();
mqttClientOptions.setMaxMessageSize(268435455);
mqttClient = MqttClient.create(applicationContext.getVertx(), mqttClientOptions);
mqttClient.connect(config.getMqttPort(), config.getMqttIp(), c -> {
if (c.succeeded()) {
mqttClient.subscribe(config.getMqttSubscribe(), 2);
log.info("Connected to a server");
} else {
log.error("Failed to connect to a server");
log.error("error", c.cause());
}
})
.publishHandler(pub -> {
Buffer buffer = pub.payload();
log.info("Content(as string) of the message: " + buffer.toString());
applicationContext.getHandleAction().handle(buffer.toJsonObject());
});
评论区