侧边栏壁纸
博主头像
术业有道之编程博主等级

亦是三月纷飞雨,亦是人间惊鸿客。亦是秋霜去叶多,亦是风华正当时。

  • 累计撰写 99 篇文章
  • 累计创建 50 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

使用vertx-mqtt组件的一个问题

Administrator
2021-08-16 / 0 评论 / 0 点赞 / 568 阅读 / 3392 字

写在前面

别嫌我啰嗦,我想聊几句vertx

使用vertx框架已经好多年了,其本身是基于大名鼎鼎的netty,所以为什么我一直很看好vertx想必不用过多解释了。

IO设计角度考虑vertx是比springboot tomcat-embed性能更加高效的。重点是vertxcore是单实例,有没有想到redis?殊途同归。

问题

我在使用vertx-mqtt作为mqtt客户端,发现当字节超过一定长度就会发生阻塞,导致当前客户端实例必须重启才能重新使用。于是我给其开发团队提了一个bug,最终在一个大兄弟的指点下,研究了其数据编码的代码

 private void initChannel(ChannelPipeline pipeline) {

    // add into pipeline netty's (en/de)coder
    pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);

    if (this.options.getMaxMessageSize() > 0) {
      pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
    } else {
      // max message size not set, so the default from Netty MQTT codec is used
      pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
    }

这个 new MqttDecoder() 默认private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092。也就是说你不指定mqttClientOptions.setMaxMessageSize时,默认值就是-1,到了上面这个initChannel方法,最终最大长度都是8092,这里我仍然认为是框架的code错误,这个默认值太不合理了。

解决

查看了mqtt服务器MaxMessageSize有这么一段说明:

# This option sets the maximum publish payload size that the broker will allow.
# Received messages that exceed this size will not be accepted by the broker.
# The default value is 0, which means that all valid MQTT messages are
# accepted. MQTT imposes a maximum payload size of 268435455 bytes. 
#message_size_limit 0

于是在vertx-mqtt组件声明的时候,加上mqttClientOptions.setMaxMessageSize(268435455);就行了

完整例子:

MqttClientOptions mqttClientOptions = new MqttClientOptions();
    mqttClientOptions.setMaxMessageSize(268435455);
    mqttClient = MqttClient.create(applicationContext.getVertx(), mqttClientOptions);
    mqttClient.connect(config.getMqttPort(), config.getMqttIp(), c -> {
      if (c.succeeded()) {
        mqttClient.subscribe(config.getMqttSubscribe(), 2);
        log.info("Connected to a server");
      } else {
        log.error("Failed to connect to a server");
        log.error("error", c.cause());
      }
    })
      .publishHandler(pub -> {
        Buffer buffer = pub.payload();
        log.info("Content(as string) of the message: " + buffer.toString());
        applicationContext.getHandleAction().handle(buffer.toJsonObject());
      });

个人公众号

0

评论区