这里有一个很好的例子:https://thingsboard.io/use-cases/fleet-tracking/
这篇文章想完成这些事情:
如之前一样,启动 thingsboard :
bashmkdir -p ~/.mytb-data && sudo chown -R 799:799 ~/.mytb-data
mkdir -p ~/.mytb-logs && sudo chown -R 799:799 ~/.mytb-logs
docker run -it -p 8080:9090 -p 7070:7070 -p 1883:1883 -p 5683-5688:5683-5688/udp -v ~/.mytb-data:/data \
-v ~/.mytb-logs:/var/log/thingsboard --name mytb --restart always thingsboard/tb-postgres
添加设备后,网页会弹出这个页面,让我们手动检查连通性:
可以在Linux执行这个指令,使用HTTP往设备里推送温度为35度,可以多推送几个温度。
bashcurl -v -X POST http://10.100.80.98:8080/api/v1/sXXZEPAjK2AeMgEzaXBQ/telemetry --header Content-Type:application/json --data "{temperature:35}"
也可以使用MQTT往这个设备推送数据,推送方法是:
sudo apt-get install curl mosquitto-clients # 安装mosquitto客户端 mosquitto_pub -d -q 1 -h 10.100.80.98 -p 1883 -t v1/devices/me/telemetry -u "sXXZEPAjK2AeMgEzaXBQ" -m "{temperature:25}" # mosquitto mqtt通信发送
这条指令是使用 mosquitto_pub
命令通过 MQTT 协议向指定的 MQTT 代理(服务器)发布一条消息。
参数说明:
-d
• 启用调试模式(debug),会显示更多连接和发布的日志信息。
-q 1
• 设置消息的 QoS(服务质量)级别为 1
。
• QoS 级别:
◦ 0
:最多一次(可能丢失)。
◦ 1
:至少一次(确保送达,但可能重复)。
◦ 2
:恰好一次(确保送达且不重复)。
-h 10.100.80.98
• 指定 MQTT 代理(服务器)的 IP 地址为 10.100.80.98
。
-p 1883
• 指定 MQTT 代理的端口号为 1883
(MQTT 默认端口)。
-t v1/devices/me/telemetry
• 设置消息的主题(Topic)为 v1/devices/me/telemetry
。
• 订阅此主题的客户端将收到这条消息。
-u "sXXZEPAjK2AeMgEzaXBQ"
• 提供用户名(用于身份验证),这里是 sXXZEPAjK2AeMgEzaXBQ
。
• 如果服务器需要密码,需通过 -P
参数指定(本例未使用)。
-m "{temperature:25}"
• 设置消息内容(payload)为 JSON 格式的字符串 {temperature:25}
,表示上报温度值 25。
运行 gps_simulator.py
这个代码是一个GPS模拟器,通过MQTT协议向ThingsBoard平台发送模拟的GPS位置数据。它能够生成一个正方形路径的GPS坐标点,并按照设定的时间间隔循环发送这些坐标点,模拟设备沿着特定路径移动的情况。
点击仪表板,新建仪表板。
点击添加部件,添加腾讯地图时序部件:
进一步,选择数据源为设备,数据键也选出来,这个名字和腾讯地图是对应的,不然在外观里还需要进一步设置:
完成后,可以看到地图里在移动的设备:
电子围栏要借助规则链,太复杂,这里不研究。
GPS设备数据mqtt给ThingsBoard 之后,ThingsBoard会把数据存入自己的 postgres 数据库。
客户端可以通过以下几种方式获取设备通过 MQTT 上传到postgres数据库的数据:
ThingsBoard 提供了一套完整的 REST API,允许客户端通过 HTTP 请求获取设备数据:
• API 文档:参考官方 REST API 文档。
自己的文档:
http://10.100.80.98:8080/swagger-ui.html
获取JWT_TOKEN:
# curl -X POST "http://10.100.80.98:8080/api/auth/login" \ -H "Content-Type: application/json" \ -d '{"username":"[email protected]", "password":"tenant"}' {"token":"eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJ0ZW5hbnRAdGhpbmdzYm9hcmQub3JnIiwidXNlcklkIjoiMmVkZDI0OTAtMDg4Ni0xMWYwLWFlODctMWQ0NGEwMTYyNjE4Iiwic2NvcGVzIjpbIlRFTkFOVF9BRE1JTiJdLCJzZXNzaW9uSWQiOiIyYTk4NWMzMi0yODg2LTRkNDItOWMwOC0wMWZiOTg2ODMzZWQiLCJleHAiOjE3NDUwNTYxOTMsImlzcyI6InRoaW5nc2JvYXJkLmlvIiwiaWF0IjoxNzQ1MDQ3MTkzLCJlbmFibGVkIjp0cnVlLCJpc1B1YmxpYyI6ZmFsc2UsInRlbmFudElkIjoiMmU4M2I5ZjAtMDg4Ni0xMWYwLWFlODctMWQ0NGEwMTYyNjE4IiwiY3VzdG9tZXJJZCI6IjEzODE0MDAwLTFkZDItMTFiMi04MDgwLTgwODA4MDgwODA4MCJ9.JvWVR0QqwICHImxW4l4pPAxwhX6v4tygmIqOYFC9GZYalge6fQwWLssGWtZhxgUxVkd9RESf7TVXBzAFO9lUew","refreshToken":"eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJ0ZW5hbnRAdGhpbmdzYm9hcmQub3JnIiwidXNlcklkIjoiMmVkZDI0OTAtMDg4Ni0xMWYwLWFlODctMWQ0NGEwMTYyNjE4Iiwic2NvcGVzIjpbIlJFRlJFU0hfVE9LRU4iXSwic2Vzc2lvbklkIjoiMmE5ODVjMzItMjg4Ni00ZDQyLTljMDgtMDFmYjk4NjgzM2VkIiwiZXhwIjoxNzQ1NjUxOTkzLCJpc3MiOiJ0aGluZ3Nib2FyZC5pbyIsImlhdCI6MTc0NTA0NzE5MywiaXNQdWJsaWMiOmZhbHNlLCJqdGkiOiIzYzlhNGUzYS02ODMyLTQ4NTQtYTJlYy0xY2E3NjViZjY2YjgifQ.nO0lZAuMZP2UhBp7oe6TxSZqOrJMsGQynONMg4ttlQlA1VKHOZWPOn71qs-QxHbT6bTXP1sr9lrZdRJ1WMqNBQ"}
调用这个接口获取数据:
bashPOST /api/v1/{deviceToken}/telemetry Post time series data (postTelemetry)
适用于需要实时数据推送的场景(如仪表盘):
• 连接方式:
• WebSocket URL: wss://{your-thingsboard-url}/api/ws/plugins/telemetry?token={JWT_TOKEN}
• 订阅指定设备的遥测数据:
```json { "tsSubCmds": [ { "entityType": "DEVICE", "entityId": "{deviceId}", "scope": "LATEST_TELEMETRY", "cmdId": 1 } ] } ```
• 文档:参考 WebSocket API。
设备可以通过 MQTT 订阅服务端下发的数据:
• 主题示例:
• 订阅共享属性更新:v1/devices/me/sharedattributes
• 订阅客户端属性:v1/devices/me/attributes
• 适用场景:设备需要接收服务端下发的指令或配置。
• 直接嵌入:通过 ThingsBoard 仪表盘的 iframe
嵌入到第三方页面。
• 自定义组件:使用 Widgets API 开发定制化组件,通过 JavaScript 调用数据。
• 仅适用于私有化部署:直接访问 ThingsBoard 的底层数据库(PostgreSQL/Cassandra/TimescaleDB)。
• 风险:绕过 ThingsBoard 的安全机制,需谨慎操作。
deviceId
。pythonimport requests
# 配置参数
TB_URL = "https://your-thingsboard-url"
DEVICE_ID = "your-device-id"
JWT_TOKEN = "your-jwt-token"
# 获取设备温度数据
response = requests.get(
f"{TB_URL}/api/plugins/telemetry/DEVICE/{DEVICE_ID}/values/timeseries?keys=temperature",
headers={"X-Authorization": f"Bearer {JWT_TOKEN}"}
)
print(response.json()) # 输出:{"temperature": [{"ts": 1620000000000, "value": 25.5}]}
• 权限控制:确保客户端用户有对应设备的访问权限(通过租户/客户角色分配)。
• 数据过滤:使用 startTs
/endTs
参数按时间范围查询。
• 性能优化:高频数据建议使用 WebSocket 避免轮询。
在 ThingsBoard 接收设备通过 MQTT 上传的数据后,若需将数据同步到自己的 MySQL 数据库,可以通过以下方案实现:
适用场景:实时同步数据,无需额外开发中间服务。
步骤:
配置 Rule Chain:
• 进入 ThingsBoard 规则链(Rule Chains) → 编辑根规则链(Root Rule Chain)。
• 添加一个 "Save Timeseries" 节点(保留原始数据到 ThingsBoard 数据库)。
• 添加 "REST API Call" 或 "External Script" 节点,将数据转发到你的 MySQL 接口。
编写外部接口(接收数据并写入 MySQL):
• 示例(Node.js + Express):
javascriptconst express = require('express');
const mysql = require('mysql2');
const app = express();
app.use(express.json());
// MySQL 连接配置
const db = mysql.createConnection({
host: 'localhost',
user: 'your_user',
password: 'your_password',
database: 'your_db'
});
// 接收 ThingsBoard 转发数据的接口
app.post('/api/telemetry', (req, res) => {
const { deviceId, temperature, humidity } = req.body; // 根据实际字段调整
const sql = `INSERT INTO device_data (device_id, temperature, humidity) VALUES (?, ?, ?)`;
db.query(sql, [deviceId, temperature, humidity], (err) => {
if (err) return res.status(500).send(err);
res.status(200).send('OK');
});
});
app.listen(3000, () => console.log('Server running on port 3000'));
配置 REST API 调用节点:
• 在 Rule Chain 中添加 "REST API Call" 节点,指向你的接口(如 http://your-server:3000/api/telemetry
)。
• 设置请求方法为 POST
,并配置消息体模板(示例):
json{
"deviceId": "${deviceId}",
"temperature": "${temperature}",
"humidity": "${humidity}"
}
适用场景:允许少量延迟,适合批量同步。
步骤:
通过 ThingsBoard REST API 定时拉取数据: • 使用脚本定时调用 ThingsBoard 的遥测数据 API(如每小时一次):
bashGET /api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature,humidity
• 将返回的数据插入到 MySQL。
示例脚本(Python):
pythonimport requests
import mysql.connector
from datetime import datetime, timedelta
# ThingsBoard 配置
TB_URL = "http://your-thingsboard-url"
DEVICE_ID = "your-device-id"
JWT_TOKEN = "your-jwt-token"
# MySQL 配置
db = mysql.connector.connect(
host="localhost",
user="your_user",
password="your_password",
database="your_db"
)
cursor = db.cursor()
# 获取最近1小时的数据
end_ts = int(datetime.now().timestamp() * 1000)
start_ts = end_ts - 3600000 # 1小时前
response = requests.get(
f"{TB_URL}/api/plugins/telemetry/DEVICE/{DEVICE_ID}/values/timeseries",
params={"keys": "temperature,humidity", "startTs": start_ts, "endTs": end_ts},
headers={"X-Authorization": f"Bearer {JWT_TOKEN}"}
)
data = response.json()
# 写入 MySQL
for key, values in data.items():
for item in values:
ts = datetime.fromtimestamp(item['ts'] / 1000)
sql = "INSERT INTO device_data (device_id, metric, value, timestamp) VALUES (%s, %s, %s, %s)"
cursor.execute(sql, (DEVICE_ID, key, item['value'], ts))
db.commit()
适用场景:需要完全绕过 ThingsBoard,直接处理原始 MQTT 数据。
步骤:
v1/devices/me/telemetry
。
• 使用 MQTT 客户端库(如 paho-mqtt
)监听消息:
pythonimport paho.mqtt.client as mqtt
def on_message(client, userdata, msg):
payload = msg.payload.decode()
print("Received:", payload) # 示例: {"temperature":25, "humidity":60}
# 解析并写入 MySQL...
client = mqtt.Client()
client.username_pw_set("your_device_token") # 使用设备访问令牌
client.connect("your-thingsboard-mqtt-host", 1883)
client.subscribe("v1/devices/me/telemetry")
client.on_message = on_message
client.loop_forever()
INSERT INTO ... VALUES (...), (...), ...
)。方案 | 实时性 | 开发复杂度 | 依赖 ThingsBoard |
---|---|---|---|
Rule Chain 转发 | 高 | 中 | 是 |
定时拉取 API | 低 | 低 | 是 |
直接监听 MQTT | 高 | 高 | 否 |
根据你的业务需求(实时性 vs. 维护成本)选择合适方案。
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!