EMQX
EMQX 是业界领先的 MQTT Broker,深受开发者喜爱,目前在 GitHub 上已获得超过 12k 个 Star。EMQX 项目始于 2012 年,遵循 Apache 2.0 开源协议。它基于 Erlang/OTP 开发,这是一种能够构建大规模可扩展软实时系统的编程语言。
EMQX 是全球最具扩展性的 MQTT Broker,支持 MQTT 5.0、MQTT-SN 和 MQTT over QUIC 等协议和其他先进功能。它采用无主集群架构,实现了高可用性和水平扩展性。自 5.0 版本开始,EMQX 能够在一个由 23 个节点组成的集群中创建高达 1 亿个并发 MQTT 连接。
Mosquitto、NanoMQ 使用简单,特别是Mosquitto是最简单,但是只是支撑了基本的Broker,我希望有一个前端页面,可以看到在线的MQTT设备、对MQTT消息进行过滤或者存储、对MQTT Broker进行用户权限管理,这就只能找别的项目了,比如这个EMQX。
bashmkdir EMQX && cd EMQX
docker run -d \
--user root \
-p 1883:1883 -p 8083:8083 \
-p 8084:8084 -p 8883:8883 \
-p 18083:18083 \
-v $PWD/data:/opt/emqx/data \
-v $PWD/log:/opt/emqx/log \
-v $PWD/etc:/opt/emqx/etc \
emqx/emqx
端口 | 协议 | 用途 |
---|---|---|
1883 | MQTT over TCP | 非加密的 MQTT 通信 |
8083 | MQTT over WebSocket | 非加密的 MQTT over WebSocket 通信 |
8084 | MQTT over WebSocket (TLS) | 加密的 MQTT over WebSocket 通信 |
8883 | MQTT over TLS | 加密的 MQTT 通信 |
18083 | HTTP | EMQX 管理控制台 |
非加密 MQTT 连接:mqtt://broker.example.com:1883
加密 MQTT 连接: mqtts://broker.example.com:8883 (mqtts 表示使用 TLS/SSL 加密的 MQTT 协议)
默认用户和密码:
admin
public
paho-mqtt
库paho-mqtt
库首先,你需要安装 paho-mqtt
库。你可以使用以下命令安装它:
bashpip install paho-mqtt
以下是一个订阅消息的示例:
pythonimport paho.mqtt.client as mqtt
# 定义MQTT服务器地址和端口
broker = "198.12.95.182"
port = 1883
# 定义回调函数,当接收到消息时会调用这个函数
def on_message(client, userdata, msg):
print(f"Received message: {msg.payload.decode()} on topic {msg.topic}")
# 创建MQTT客户端
client = mqtt.Client()
# 设置回调函数
client.on_message = on_message
# 连接到MQTT服务器
client.connect(broker, port, 60)
# 订阅主题
topic = "test/topic"
client.subscribe(topic)
# 开始循环,等待接收消息
client.loop_forever()
以下是一个发布消息的示例:
pythonimport paho.mqtt.client as mqtt
# 定义MQTT服务器地址和端口
broker = "198.12.95.182"
port = 1883
# 创建MQTT客户端
client = mqtt.Client()
# 连接到MQTT服务器
client.connect(broker, port, 60)
# 发布消息
topic = "test/topic"
message = "Hello, EMQX!"
client.publish(topic, message)
# 断开连接
client.disconnect()
使用 WebSocket 作为通信协议只是改变了传输层,而 MQTT Broker 的处理机制保持不变。提供 WebSocket 接口主要是为了方便 JavaScript 开发和在浏览器中使用 MQTT。
pythonimport paho.mqtt.client as mqtt
# 定义回调函数,当连接成功时会调用这个函数
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client.subscribe("test/topic")
# 定义回调函数,当接收到消息时会调用这个函数
def on_message(client, userdata, msg):
print(f"Received message: {msg.payload.decode()} on topic {msg.topic}")
# 创建MQTT客户端
client = mqtt.Client(transport="websockets")
# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message
# 连接到MQTT服务器
broker = "198.12.95.182"
port = 8083
client.connect(broker, port, 60)
# 发布消息
topic = "test/topic"
message = "Hello, EMQX over WebSocket!"
client.publish(topic, message)
# 开始循环,等待接收消息
client.loop_forever()
pythonimport paho.mqtt.client as mqtt
# MQTT Broker地址
broker = "198.12.95.182"
port = 1883
username = "testuser" # 使用您在Dashboard中创建的用户名
password = "testpassword" # 使用您在Dashboard中创建的密码
# 创建MQTT客户端
client = mqtt.Client()
# 设置用户名和密码
client.username_pw_set(username, password)
# 连接到MQTT Broker
client.connect(broker, port)
# 发布消息
topic = "test/topic"
message = "Hello, EMQX! USER"
client.publish(topic, message)
# 断开连接
client.disconnect()
https://juejin.cn/post/7113430766305935367
集群概览: 集群概览是指 EMQX 集群的总体状态和健康状况。通过集群概览,您可以查看集群中各个节点的运行状态、负载情况、连接数等信息。这对于监控和管理整个集群的运行情况非常重要。
客户端: 客户端是指连接到 EMQX 服务器的 MQTT 客户端设备。每个客户端都有一个唯一的客户端标识符(Client ID),并且可以发布消息、订阅主题以及接收消息。EMQX 服务器会管理这些客户端的连接状态、会话信息等。
订阅管理: 订阅管理是指对客户端订阅的主题进行管理。客户端可以订阅一个或多个主题,以接收发布到这些主题的消息。EMQX 服务器会维护一个订阅列表,记录每个客户端订阅的主题,并在有消息发布到这些主题时,将消息转发给相应的客户端。
保留消息: 保留消息是指在某个主题上保留的最新消息。当一个新的客户端订阅该主题时,EMQX 服务器会立即将该保留消息发送给客户端。保留消息的目的是确保新订阅的客户端能够立即获得主题的最新状态,而不必等待下一个消息发布。
客户端认证: 客户端认证是指验证连接到 EMQX 服务器的客户端的身份。EMQX 支持多种认证机制,包括用户名和密码认证、客户端证书认证、JWT(JSON Web Token)认证等。通过客户端认证,EMQX 可以确保只有经过验证的客户端才能连接到服务器,从而提高系统的安全性。
客户端授权: 客户端授权是指控制经过认证的客户端可以执行哪些操作(如发布、订阅等)以及可以访问哪些主题。授权机制可以基于角色、ACL(访问控制列表)等方式实现。通过客户端授权,EMQX 可以确保客户端只能访问其被允许的资源,从而保护消息的安全性和隐私。
黑名单: 黑名单是指阻止特定客户端连接到 EMQX 服务器的机制。被列入黑名单的客户端将无法连接到服务器或进行任何操作。黑名单可以基于客户端 ID、IP 地址等进行配置。通过黑名单,EMQX 可以防止恶意客户端或不受信任的客户端访问服务器,从而提高系统的安全性和稳定性。
Webhooks: Webhooks 是一种允许外部系统接收 EMQX 事件通知的机制。当特定事件(如客户端连接、断开连接、消息发布等)发生时,EMQX 可以通过 HTTP 请求将事件数据发送到预定义的 URL。Webhooks 使得 EMQX 可以与其他系统(如数据库、监控系统、通知服务等)进行集成,实时传递事件信息。
规则: 规则引擎是 EMQX 中用于处理和转发消息的强大工具。通过规则引擎,您可以定义复杂的规则来过滤、转换和转发消息到不同的目标(如数据库、消息队列、HTTP 服务等)。规则引擎使得 EMQX 能够灵活地处理消息流,并与其他系统进行集成。
连接器: 连接器是 EMQX 用于与外部系统进行数据交换的组件。通过连接器,EMQX 可以将消息发布到外部系统,或从外部系统接收数据。连接器支持多种协议和数据源,如数据库、消息队列、HTTP 服务等,使得 EMQX 能够与各种异构系统进行无缝集成。
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!