搭建Node-RED + MQTT Broker實現AI大模型交互
- 搭建Node-RED + MQTT Broker實現AI大模型交互
- 一、系統架構
- 二、環境準備與安裝
- 1. 安裝Node.js
- 2. 安裝Mosquitto MQTT Broker
- 3. 配置Mosquitto
- 4. 安裝Node-RED
- 5. 配置Node-RED監聽所有網絡接口
- 6. 啟動Node-RED
- 三、Node-RED流程配置
- 1. 創建新流程
- 2. 添加并配置MQTT In節點
- 3. 添加并配置處理數據Function節點
- 4. 添加并配置HTTP Request節點
- 5. 添加并配置處理響應Function節點
- 6. 添加并配置MQTT Out節點
- 7. 添加錯誤處理
- 8. 連接節點
- 9. 部署流程
- 四、測試系統
- 1. 創建測試腳本
- 2. 安裝MQTT客戶端庫
- 3. 運行測試
- 五、在其他Linux客戶端使用MQTT與系統交互
- 1. 安裝MQTT客戶端工具
- 2. 訂閱消息(接收響應)
- 3. 發送消息(請求AI處理)
- 4. 使用Python進行交互(可選)
- 六、系統安全與擴展
- 安全配置
- 系統擴展
- 七、故障排除
- 八、總結
搭建Node-RED + MQTT Broker實現AI大模型交互
本文檔詳細記錄了使用Node-RED和MQTT Broker構建一個可與DeepSeek AI大模型交互的物聯網平臺的完整過程。
一、系統架構
[接收設備消息] --> [處理數據] --> [調用DeepSeek API] --> [處理API響應] --> [MQTT Out]
| ^
| |
v |
[錯誤消息] --> [錯誤消息接收] --> [MQTT err]
系統主要組件:
- 設備/客戶端:向MQTT Broker發送請求消息
- MQTT Broker:消息代理,處理發布/訂閱
- Node-RED:流程編排引擎,處理邏輯和API交互
- DeepSeek API:AI大模型服務
數據流向:
- 設備發布消息到
device/data
主題 - Node-RED訂閱并處理消息
- Node-RED調用DeepSeek API
- Node-RED將響應發布到
device/response
主題 - 設備接收響應
二、環境準備與安裝
1. 安裝Node.js
首先確保安裝了Node.js v18或更高版本:
curl -fsSL https://deb.nodesource.com/setup_18.x | bash -
apt-get install -y nodejs
驗證安裝:
node -v
2. 安裝Mosquitto MQTT Broker
apt-get update
apt-get install -y mosquitto mosquitto-clients
3. 配置Mosquitto
創建配置文件,允許匿名連接:
cat > /etc/mosquitto/conf.d/default.conf << EOF
listener 1883
allow_anonymous true
EOF
重啟Mosquitto服務:
systemctl restart mosquitto
4. 安裝Node-RED
npm install -g --unsafe-perm node-red
5. 配置Node-RED監聽所有網絡接口
修改配置文件:
cat > /root/.node-red/settings.js << EOF
module.exports = {uiPort: process.env.PORT || 1880,uiHost: "0.0.0.0",
}
EOF
6. 啟動Node-RED
nohup node-red > node-red.log 2>&1 &
三、Node-RED流程配置
訪問Node-RED界面:http://服務器IP:1880/
1. 創建新流程
- 點擊"+"按鈕創建新的流程
- 將流程命名為"DeepSeek AI交互"
2. 添加并配置MQTT In節點
- 從節點面板中拖動"mqtt in"節點到工作區
- 雙擊節點進行配置:
- 服務器:點擊編輯按鈕添加新的MQTT Broker
- 名稱:本地MQTT Broker
- 服務器:localhost
- 端口:1883
- 主題:
device/data
- QoS:2
- 輸出:自動檢測(JSON對象、字符串或buffer)
- 名稱:接收設備消息
- 服務器:點擊編輯按鈕添加新的MQTT Broker
3. 添加并配置處理數據Function節點
- 從節點面板中拖動"function"節點到工作區
- 雙擊節點進行配置:
- 名稱:處理數據
- 函數代碼:
// 處理接收到的設備數據
const deviceData = msg.payload;// 構建發送給DeepSeek API的請求
msg.payload = {"model": "deepseek-chat","messages": [{"role": "system", "content": "你是一個助手。"},{"role": "user", "content": deviceData.message}]
};// 設置請求頭
msg.headers = {"Content-Type": "application/json"
};// 設置超時時間為60秒
msg.requestTimeout = 60000;return msg;
4. 添加并配置HTTP Request節點
- 從節點面板中拖動"http request"節點到工作區
- 雙擊節點進行配置:
- 名稱:調用DeepSeek API
- 方法:POST
- URL:
https://api.deepseek.com/chat/completions
- 返回:解析為JSON對象
- 在認證選項卡中:
- 使用:Bearer Authentication
- Token:您的DeepSeek API Token (例如: sk-b30b58c4056e4149872d87eb9228ed54)
- 添加請求頭:
- Content-Type: application/json
5. 添加并配置處理響應Function節點
- 從節點面板中拖動"function"節點到工作區
- 雙擊節點進行配置:
- 名稱:處理API響應
- 函數代碼:
// 處理DeepSeek API的響應
const response = msg.payload;// 提取AI回復內容
let aiResponse = "";
if (response && response.choices && response.choices.length > 0) {aiResponse = response.choices[0].message.content;
} else {aiResponse = "無法獲取有效回復";node.warn("API響應格式不符合預期: " + JSON.stringify(response));
}// 構建回復消息
msg.payload = {"status": "success","response": aiResponse,"timestamp": new Date().toISOString()
};return msg;
6. 添加并配置MQTT Out節點
- 從節點面板中拖動"mqtt out"節點到工作區
- 雙擊節點進行配置:
- 服務器:選擇之前創建的本地MQTT Broker
- 主題:
device/response
- QoS:1
- 保留:否
- 名稱:MQTT Out
7. 添加錯誤處理
-
從節點面板中拖動"catch"節點到工作區
-
雙擊節點進行配置:
- 名稱:錯誤消息
-
添加處理錯誤的Function節點:
- 名稱:錯誤消息接收
- 函數代碼:
// 記錄錯誤
node.error("處理錯誤: " + JSON.stringify(msg.error));// 構建錯誤響應
msg.payload = {"status": "error","message": msg.error ? (msg.error.message || "未知錯誤") : "處理請求時發生錯誤","code": msg.statusCode || 500,"timestamp": new Date().toISOString()
};// 設置主題(確保錯誤消息發送到正確的主題)
msg.topic = "device/error";return msg;
- 添加用于錯誤的MQTT Out節點:
- 服務器:選擇之前創建的本地MQTT Broker
- 主題:
device/error
- QoS:1
- 保留:否
- 名稱:MQTT err
8. 連接節點
按照以下順序連接節點:
- 接收設備消息 → 處理數據
- 處理數據 → 調用DeepSeek API
- 調用DeepSeek API → 處理API響應
- 處理API響應 → MQTT Out
- 錯誤消息 → 錯誤消息接收
- 錯誤消息接收 → MQTT err
9. 部署流程
點擊右上角的"部署"按鈕使配置生效。
四、測試系統
1. 創建測試腳本
創建一個簡單的Node.js腳本來測試系統:
cat > /root/test-mqtt.js << EOF
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883');client.on('connect', function () {console.log('已連接到MQTT Broker');client.subscribe('device/response');client.subscribe('device/error');const testMessage = { deviceId: 'test-001', message: '介紹一下Node-RED的基本功能', timestamp: new Date().toISOString() };console.log('發送測試消息:', testMessage);client.publish('device/data', JSON.stringify(testMessage));
});client.on('message', function (topic, message) {console.log('收到消息,主題:', topic);try { console.log(JSON.parse(message.toString())); } catch(e) { console.log(message.toString()); }
});setTimeout(function() { client.end(); console.log('測試完成,已斷開連接');
}, 120000);
EOF
2. 安裝MQTT客戶端庫
npm install mqtt
3. 運行測試
node /root/test-mqtt.js
輸出結果示例:
已連接到MQTT Broker
發送測試消息: { deviceId: 'test-001',message: '介紹一下Node-RED的基本功能',timestamp: '2025-05-15T06:51:24.069Z' }
收到消息,主題: device/response
{status: 'success',response: 'Node-RED 是一個基于 Node.js 開發的低代碼/可視化編程工具,主要用于連接硬件設備、API 和在線服務,構建物聯網(IoT)應用或自動化工作流。其核心特點是通過拖放節點(Nodes)和連線(Flows)快速實現數據流處理,無需深入編碼。以下是它的基本功能:\n\n1. 可視化流程編排\n - 節點(Nodes):預置了大量功能模塊\n - 連線(Flows):用連線將節點按邏輯順序連接\n\n2. 豐富的節點類型\n - 輸入節點:如 HTTP 請求、MQTT 訂閱等\n - 處理節點:函數、延遲、切換等\n - 輸出節點:數據庫、API 調用、郵件通知等\n\n3. 易于集成和擴展\n - 支持各種協議和服務的集成\n - 可通過npm安裝擴展節點',timestamp: '2025-05-15T06:51:42.361Z'
}
測試完成,已斷開連接
五、在其他Linux客戶端使用MQTT與系統交互
1. 安裝MQTT客戶端工具
# Debian/Ubuntu系統
sudo apt-get install mosquitto-clients# RHEL/CentOS系統
sudo yum install mosquitto-clients# Arch系統
sudo pacman -S mosquitto
2. 訂閱消息(接收響應)
# 訂閱響應主題
mosquitto_sub -h 服務器IP -t "device/response" -v# 訂閱錯誤主題
mosquitto_sub -h 服務器IP -t "device/error" -v
3. 發送消息(請求AI處理)
# 發送消息
mosquitto_pub -h 服務器IP -t "device/data" -m '{"deviceId":"linux-001","message":"什么是物聯網?","timestamp":"'$(date -Iseconds)'"}'
4. 使用Python進行交互(可選)
import paho.mqtt.client as mqtt
import json
import time
from datetime import datetime# MQTT服務器信息
broker_address = "服務器IP"
port = 1883
pub_topic = "device/data"
sub_topics = ["device/response", "device/error"]# 回調函數 - 連接成功
def on_connect(client, userdata, flags, rc):print("已連接到MQTT Broker")# 訂閱主題for topic in sub_topics:client.subscribe(topic)print(f"已訂閱主題: {topic}")# 回調函數 - 接收消息
def on_message(client, userdata, msg):print(f"\n收到消息 主題: {msg.topic}")try:payload = json.loads(msg.payload.decode())print(json.dumps(payload, indent=2, ensure_ascii=False))except:print(msg.payload.decode())# 創建客戶端
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message# 連接到Broker
client.connect(broker_address, port, 60)# 啟動網絡循環
client.loop_start()# 發送測試消息
def send_message(message_text):msg = {"deviceId": "python-device-001","message": message_text,"timestamp": datetime.now().isoformat()}print(f"發送消息: {json.dumps(msg, ensure_ascii=False)}")client.publish(pub_topic, json.dumps(msg))# 等待連接建立
time.sleep(1)# 示例查詢
send_message("請解釋什么是物聯網?")# 保持腳本運行以接收響應
try:while True:user_input = input("\n輸入問題(輸入'exit'退出): ")if user_input.lower() == 'exit':breaksend_message(user_input)time.sleep(1)
except KeyboardInterrupt:print("程序被用戶中斷")# 斷開連接
client.loop_stop()
client.disconnect()
六、系統安全與擴展
安全配置
-
MQTT安全加強
- 添加用戶名密碼認證:修改
/etc/mosquitto/conf.d/default.conf
- 啟用TLS加密:配置證書
- 添加用戶名密碼認證:修改
-
Node-RED安全加強
- 添加登錄認證:修改
settings.js
- 部署HTTPS:配置證書
- 添加登錄認證:修改
系統擴展
-
添加Dashboard
- 安裝Node-RED Dashboard節點
- 創建可視化界面監控系統
-
添加數據存儲
- 連接數據庫(MySQL/MongoDB)存儲交互歷史
-
支持多種AI模型
- 擴展Function節點支持其他AI模型API
-
設備認證與管理
- 開發設備注冊和認證機制
七、故障排除
-
無法連接MQTT Broker
- 檢查防火墻是否開放1883端口
- 檢查Mosquitto服務狀態
-
無法訪問Node-RED
- 檢查防火墻是否開放1880端口
- 檢查Node-RED進程是否運行
-
API調用失敗
- 檢查API Token是否正確
- 檢查網絡連接和API地址
- 檢查HTTP Request節點配置
- 增加請求超時時間
-
消息格式錯誤
- 確保發送JSON格式消息
- 確保包含message字段
八、總結
通過本文檔,我們就完成了一個基于Node-RED和MQTT Broker的AI交互系統搭建,實現了:
- 接收來自設備的MQTT消息
- 處理消息并調用DeepSeek AI API
- 將AI響應通過MQTT返回給設備
- 錯誤處理和日志記錄
這個系統可以作為物聯網設備與AI大模型交互的基礎平臺,可根據實際需求進行擴展和優化。