2025-03-04
单片机
00

目录

各种Broker的比较
比较
启动EMQX
访问dashboard
Python 的 paho-mqtt 库
安装 paho-mqtt 库
订阅消息
发布消息
8083 端口非加密的 MQTT over WebSocket 通信
基于用户密码发布消息
关闭匿名访问
EMQX dashboard介绍

各种Broker的比较

EMQX

EMQX 是业界领先的 MQTT Broker,深受开发者喜爱,目前在 GitHub 上已获得超过 12k 个 Star。EMQX 项目始于 2012 年,遵循 Apache 2.0 开源协议。它基于 Erlang/OTP 开发,这是一种能够构建大规模可扩展软实时系统的编程语言。

EMQX 是全球最具扩展性的 MQTT Broker,支持 MQTT 5.0、MQTT-SN 和 MQTT over QUIC 等协议和其他先进功能。它采用无主集群架构,实现了高可用性和水平扩展性。自 5.0 版本开始,EMQX 能够在一个由 23 个节点组成的集群中创建高达 1 亿个并发 MQTT 连接。

image.png

image.png

比较

Mosquitto、NanoMQ 使用简单,特别是Mosquitto是最简单,但是只是支撑了基本的Broker,我希望有一个前端页面,可以看到在线的MQTT设备、对MQTT消息进行过滤或者存储、对MQTT Broker进行用户权限管理,这就只能找别的项目了,比如这个EMQX。

启动EMQX

bash
mkdir EMQX && cd EMQX docker run -d \ --user root \ -p 1883:1883 -p 8083:8083 \ -p 8084:8084 -p 8883:8883 \ -p 18083:18083 \ -v $PWD/data:/opt/emqx/data \ -v $PWD/log:/opt/emqx/log \ -v $PWD/etc:/opt/emqx/etc \ emqx/emqx
端口协议用途
1883MQTT over TCP非加密的 MQTT 通信
8083MQTT over WebSocket非加密的 MQTT over WebSocket 通信
8084MQTT over WebSocket (TLS)加密的 MQTT over WebSocket 通信
8883MQTT over TLS加密的 MQTT 通信
18083HTTPEMQX 管理控制台

非加密 MQTT 连接:mqtt://broker.example.com:1883

加密 MQTT 连接: mqtts://broker.example.com:8883 (mqtts 表示使用 TLS/SSL 加密的 MQTT 协议)

访问dashboard

http://198.12.95.182:18083/

默认用户和密码:

admin

public

Python 的 paho-mqtt

安装 paho-mqtt

首先,你需要安装 paho-mqtt 库。你可以使用以下命令安装它:

bash
pip install paho-mqtt

订阅消息

以下是一个订阅消息的示例:

python
import paho.mqtt.client as mqtt # 定义MQTT服务器地址和端口 broker = "198.12.95.182" port = 1883 # 定义回调函数,当接收到消息时会调用这个函数 def on_message(client, userdata, msg): print(f"Received message: {msg.payload.decode()} on topic {msg.topic}") # 创建MQTT客户端 client = mqtt.Client() # 设置回调函数 client.on_message = on_message # 连接到MQTT服务器 client.connect(broker, port, 60) # 订阅主题 topic = "test/topic" client.subscribe(topic) # 开始循环,等待接收消息 client.loop_forever()

image.png

发布消息

以下是一个发布消息的示例:

python
import paho.mqtt.client as mqtt # 定义MQTT服务器地址和端口 broker = "198.12.95.182" port = 1883 # 创建MQTT客户端 client = mqtt.Client() # 连接到MQTT服务器 client.connect(broker, port, 60) # 发布消息 topic = "test/topic" message = "Hello, EMQX!" client.publish(topic, message) # 断开连接 client.disconnect()

8083 端口非加密的 MQTT over WebSocket 通信

使用 WebSocket 作为通信协议只是改变了传输层,而 MQTT Broker 的处理机制保持不变。提供 WebSocket 接口主要是为了方便 JavaScript 开发和在浏览器中使用 MQTT。

python
import paho.mqtt.client as mqtt # 定义回调函数,当连接成功时会调用这个函数 def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") client.subscribe("test/topic") # 定义回调函数,当接收到消息时会调用这个函数 def on_message(client, userdata, msg): print(f"Received message: {msg.payload.decode()} on topic {msg.topic}") # 创建MQTT客户端 client = mqtt.Client(transport="websockets") # 设置回调函数 client.on_connect = on_connect client.on_message = on_message # 连接到MQTT服务器 broker = "198.12.95.182" port = 8083 client.connect(broker, port, 60) # 发布消息 topic = "test/topic" message = "Hello, EMQX over WebSocket!" client.publish(topic, message) # 开始循环,等待接收消息 client.loop_forever()

基于用户密码发布消息

python
import paho.mqtt.client as mqtt # MQTT Broker地址 broker = "198.12.95.182" port = 1883 username = "testuser" # 使用您在Dashboard中创建的用户名 password = "testpassword" # 使用您在Dashboard中创建的密码 # 创建MQTT客户端 client = mqtt.Client() # 设置用户名和密码 client.username_pw_set(username, password) # 连接到MQTT Broker client.connect(broker, port) # 发布消息 topic = "test/topic" message = "Hello, EMQX! USER" client.publish(topic, message) # 断开连接 client.disconnect()

关闭匿名访问

https://juejin.cn/post/7113430766305935367

EMQX dashboard介绍

  1. 集群概览: 集群概览是指 EMQX 集群的总体状态和健康状况。通过集群概览,您可以查看集群中各个节点的运行状态、负载情况、连接数等信息。这对于监控和管理整个集群的运行情况非常重要。

  2. 客户端: 客户端是指连接到 EMQX 服务器的 MQTT 客户端设备。每个客户端都有一个唯一的客户端标识符(Client ID),并且可以发布消息、订阅主题以及接收消息。EMQX 服务器会管理这些客户端的连接状态、会话信息等。

  3. 订阅管理: 订阅管理是指对客户端订阅的主题进行管理。客户端可以订阅一个或多个主题,以接收发布到这些主题的消息。EMQX 服务器会维护一个订阅列表,记录每个客户端订阅的主题,并在有消息发布到这些主题时,将消息转发给相应的客户端。

  4. 保留消息: 保留消息是指在某个主题上保留的最新消息。当一个新的客户端订阅该主题时,EMQX 服务器会立即将该保留消息发送给客户端。保留消息的目的是确保新订阅的客户端能够立即获得主题的最新状态,而不必等待下一个消息发布。

  5. 客户端认证: 客户端认证是指验证连接到 EMQX 服务器的客户端的身份。EMQX 支持多种认证机制,包括用户名和密码认证、客户端证书认证、JWT(JSON Web Token)认证等。通过客户端认证,EMQX 可以确保只有经过验证的客户端才能连接到服务器,从而提高系统的安全性。

    • 用户名和密码认证:客户端在连接时提供用户名和密码,EMQX 服务器会验证这些凭据是否正确。
    • 客户端证书认证:使用 SSL/TLS 证书进行双向认证,客户端和服务器都需要提供有效的证书。
    • JWT 认证:客户端在连接时提供 JWT 令牌,EMQX 服务器会验证令牌的有效性和签名。
  6. 客户端授权: 客户端授权是指控制经过认证的客户端可以执行哪些操作(如发布、订阅等)以及可以访问哪些主题。授权机制可以基于角色、ACL(访问控制列表)等方式实现。通过客户端授权,EMQX 可以确保客户端只能访问其被允许的资源,从而保护消息的安全性和隐私。

    • 基于角色的访问控制:为不同的客户端分配不同的角色,每个角色具有不同的权限。
    • ACL(访问控制列表):定义详细的规则,指定哪些客户端可以发布或订阅哪些主题。
  7. 黑名单: 黑名单是指阻止特定客户端连接到 EMQX 服务器的机制。被列入黑名单的客户端将无法连接到服务器或进行任何操作。黑名单可以基于客户端 ID、IP 地址等进行配置。通过黑名单,EMQX 可以防止恶意客户端或不受信任的客户端访问服务器,从而提高系统的安全性和稳定性。

    • 基于客户端 ID 的黑名单:指定某些客户端 ID 不允许连接。
    • 基于 IP 地址的黑名单:阻止来自特定 IP 地址的连接请求。
  8. Webhooks: Webhooks 是一种允许外部系统接收 EMQX 事件通知的机制。当特定事件(如客户端连接、断开连接、消息发布等)发生时,EMQX 可以通过 HTTP 请求将事件数据发送到预定义的 URL。Webhooks 使得 EMQX 可以与其他系统(如数据库、监控系统、通知服务等)进行集成,实时传递事件信息。

    • 事件类型:可以配置 EMQX 触发 Webhooks 的事件类型,如客户端连接、断开连接、消息发布、消息订阅等。
    • 请求格式:Webhooks 通常使用 HTTP POST 请求,事件数据以 JSON 格式发送到目标 URL。
    • 安全性:可以配置认证和加密机制,确保 Webhooks 请求的安全性。
  9. 规则: 规则引擎是 EMQX 中用于处理和转发消息的强大工具。通过规则引擎,您可以定义复杂的规则来过滤、转换和转发消息到不同的目标(如数据库、消息队列、HTTP 服务等)。规则引擎使得 EMQX 能够灵活地处理消息流,并与其他系统进行集成。

    • SQL 语法:规则引擎使用类似 SQL 的语法来定义规则,您可以根据消息的内容、属性等进行过滤和选择。
    • 动作:规则可以定义多个动作,如将消息转发到外部系统、存储到数据库、触发 Webhooks 等。
    • 条件:规则可以包含条件语句,只有满足条件的消息才会触发相应的动作。
  10. 连接器: 连接器是 EMQX 用于与外部系统进行数据交换的组件。通过连接器,EMQX 可以将消息发布到外部系统,或从外部系统接收数据。连接器支持多种协议和数据源,如数据库、消息队列、HTTP 服务等,使得 EMQX 能够与各种异构系统进行无缝集成。

    • 数据库连接器:支持将消息存储到关系型数据库(如 MySQL、PostgreSQL)或 NoSQL 数据库(如 MongoDB)。
    • 消息队列连接器:支持与消息队列系统(如 Kafka、RabbitMQ)进行集成,将消息发布到队列或从队列中接收消息。
    • HTTP 连接器:支持通过 HTTP 协议与外部服务进行通信,发送或接收消息。
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:Dong

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!