【搭建Node-RED + MQTT Broker實現AI大模型交互】

搭建Node-RED + MQTT Broker實現AI大模型交互

  • 搭建Node-RED + MQTT Broker實現AI大模型交互
    • 一、系統架構
    • 二、環境準備與安裝
      • 1. 安裝Node.js
      • 2. 安裝Mosquitto MQTT Broker
      • 3. 配置Mosquitto
      • 4. 安裝Node-RED
      • 5. 配置Node-RED監聽所有網絡接口
      • 6. 啟動Node-RED
    • 三、Node-RED流程配置
      • 1. 創建新流程
      • 2. 添加并配置MQTT In節點
      • 3. 添加并配置處理數據Function節點
      • 4. 添加并配置HTTP Request節點
      • 5. 添加并配置處理響應Function節點
      • 6. 添加并配置MQTT Out節點
      • 7. 添加錯誤處理
      • 8. 連接節點
      • 9. 部署流程
    • 四、測試系統
      • 1. 創建測試腳本
      • 2. 安裝MQTT客戶端庫
      • 3. 運行測試
    • 五、在其他Linux客戶端使用MQTT與系統交互
      • 1. 安裝MQTT客戶端工具
      • 2. 訂閱消息(接收響應)
      • 3. 發送消息(請求AI處理)
      • 4. 使用Python進行交互(可選)
    • 六、系統安全與擴展
      • 安全配置
      • 系統擴展
    • 七、故障排除
    • 八、總結

搭建Node-RED + MQTT Broker實現AI大模型交互

本文檔詳細記錄了使用Node-RED和MQTT Broker構建一個可與DeepSeek AI大模型交互的物聯網平臺的完整過程。

一、系統架構

[接收設備消息] --> [處理數據] --> [調用DeepSeek API] --> [處理API響應] --> [MQTT Out]
| ^
| |
v |
[錯誤消息] --> [錯誤消息接收] --> [MQTT err]

系統主要組件:

  1. 設備/客戶端:向MQTT Broker發送請求消息
  2. MQTT Broker:消息代理,處理發布/訂閱
  3. Node-RED:流程編排引擎,處理邏輯和API交互
  4. DeepSeek API:AI大模型服務

數據流向:

  1. 設備發布消息到device/data主題
  2. Node-RED訂閱并處理消息
  3. Node-RED調用DeepSeek API
  4. Node-RED將響應發布到device/response主題
  5. 設備接收響應

二、環境準備與安裝

1. 安裝Node.js

首先確保安裝了Node.js v18或更高版本:

curl -fsSL https://deb.nodesource.com/setup_18.x | bash -
apt-get install -y nodejs

驗證安裝:

node -v

2. 安裝Mosquitto MQTT Broker

apt-get update
apt-get install -y mosquitto mosquitto-clients

3. 配置Mosquitto

創建配置文件,允許匿名連接:

cat > /etc/mosquitto/conf.d/default.conf << EOF
listener 1883
allow_anonymous true
EOF

重啟Mosquitto服務:

systemctl restart mosquitto

4. 安裝Node-RED

npm install -g --unsafe-perm node-red

5. 配置Node-RED監聽所有網絡接口

修改配置文件:

cat > /root/.node-red/settings.js << EOF
module.exports = {uiPort: process.env.PORT || 1880,uiHost: "0.0.0.0",
}
EOF

6. 啟動Node-RED

nohup node-red > node-red.log 2>&1 &

三、Node-RED流程配置

訪問Node-RED界面:http://服務器IP:1880/

1. 創建新流程

  1. 點擊"+"按鈕創建新的流程
  2. 將流程命名為"DeepSeek AI交互"

2. 添加并配置MQTT In節點

  1. 從節點面板中拖動"mqtt in"節點到工作區
  2. 雙擊節點進行配置:
    • 服務器:點擊編輯按鈕添加新的MQTT Broker
      • 名稱:本地MQTT Broker
      • 服務器:localhost
      • 端口:1883
    • 主題:device/data
    • QoS:2
    • 輸出:自動檢測(JSON對象、字符串或buffer)
    • 名稱:接收設備消息

3. 添加并配置處理數據Function節點

  1. 從節點面板中拖動"function"節點到工作區
  2. 雙擊節點進行配置:
    • 名稱:處理數據
    • 函數代碼:
// 處理接收到的設備數據
const deviceData = msg.payload;// 構建發送給DeepSeek API的請求
msg.payload = {"model": "deepseek-chat","messages": [{"role": "system", "content": "你是一個助手。"},{"role": "user", "content": deviceData.message}]
};// 設置請求頭
msg.headers = {"Content-Type": "application/json"
};// 設置超時時間為60秒
msg.requestTimeout = 60000;return msg;

4. 添加并配置HTTP Request節點

  1. 從節點面板中拖動"http request"節點到工作區
  2. 雙擊節點進行配置:
    • 名稱:調用DeepSeek API
    • 方法:POST
    • URL:https://api.deepseek.com/chat/completions
    • 返回:解析為JSON對象
    • 在認證選項卡中:
      • 使用:Bearer Authentication
      • Token:您的DeepSeek API Token (例如: sk-b30b58c4056e4149872d87eb9228ed54)
    • 添加請求頭:
      • Content-Type: application/json

5. 添加并配置處理響應Function節點

  1. 從節點面板中拖動"function"節點到工作區
  2. 雙擊節點進行配置:
    • 名稱:處理API響應
    • 函數代碼:
// 處理DeepSeek API的響應
const response = msg.payload;// 提取AI回復內容
let aiResponse = "";
if (response && response.choices && response.choices.length > 0) {aiResponse = response.choices[0].message.content;
} else {aiResponse = "無法獲取有效回復";node.warn("API響應格式不符合預期: " + JSON.stringify(response));
}// 構建回復消息
msg.payload = {"status": "success","response": aiResponse,"timestamp": new Date().toISOString()
};return msg;

6. 添加并配置MQTT Out節點

  1. 從節點面板中拖動"mqtt out"節點到工作區
  2. 雙擊節點進行配置:
    • 服務器:選擇之前創建的本地MQTT Broker
    • 主題:device/response
    • QoS:1
    • 保留:否
    • 名稱:MQTT Out

7. 添加錯誤處理

  1. 從節點面板中拖動"catch"節點到工作區

  2. 雙擊節點進行配置:

    • 名稱:錯誤消息
  3. 添加處理錯誤的Function節點:

    • 名稱:錯誤消息接收
    • 函數代碼:
// 記錄錯誤
node.error("處理錯誤: " + JSON.stringify(msg.error));// 構建錯誤響應
msg.payload = {"status": "error","message": msg.error ? (msg.error.message || "未知錯誤") : "處理請求時發生錯誤","code": msg.statusCode || 500,"timestamp": new Date().toISOString()
};// 設置主題(確保錯誤消息發送到正確的主題)
msg.topic = "device/error";return msg;
  1. 添加用于錯誤的MQTT Out節點:
    • 服務器:選擇之前創建的本地MQTT Broker
    • 主題:device/error
    • QoS:1
    • 保留:否
    • 名稱:MQTT err

8. 連接節點

按照以下順序連接節點:

  1. 接收設備消息 → 處理數據
  2. 處理數據 → 調用DeepSeek API
  3. 調用DeepSeek API → 處理API響應
  4. 處理API響應 → MQTT Out
  5. 錯誤消息 → 錯誤消息接收
  6. 錯誤消息接收 → MQTT err

9. 部署流程

點擊右上角的"部署"按鈕使配置生效。

四、測試系統

1. 創建測試腳本

創建一個簡單的Node.js腳本來測試系統:

cat > /root/test-mqtt.js << EOF
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883');client.on('connect', function () {console.log('已連接到MQTT Broker');client.subscribe('device/response');client.subscribe('device/error');const testMessage = { deviceId: 'test-001', message: '介紹一下Node-RED的基本功能', timestamp: new Date().toISOString() };console.log('發送測試消息:', testMessage);client.publish('device/data', JSON.stringify(testMessage));
});client.on('message', function (topic, message) {console.log('收到消息,主題:', topic);try { console.log(JSON.parse(message.toString())); } catch(e) { console.log(message.toString()); }
});setTimeout(function() { client.end(); console.log('測試完成,已斷開連接'); 
}, 120000);
EOF

2. 安裝MQTT客戶端庫

npm install mqtt

3. 運行測試

node /root/test-mqtt.js

輸出結果示例:

已連接到MQTT Broker
發送測試消息: { deviceId: 'test-001',message: '介紹一下Node-RED的基本功能',timestamp: '2025-05-15T06:51:24.069Z' }
收到消息,主題: device/response
{status: 'success',response: 'Node-RED 是一個基于 Node.js 開發的低代碼/可視化編程工具,主要用于連接硬件設備、API 和在線服務,構建物聯網(IoT)應用或自動化工作流。其核心特點是通過拖放節點(Nodes)和連線(Flows)快速實現數據流處理,無需深入編碼。以下是它的基本功能:\n\n1. 可視化流程編排\n   - 節點(Nodes):預置了大量功能模塊\n   - 連線(Flows):用連線將節點按邏輯順序連接\n\n2. 豐富的節點類型\n   - 輸入節點:如 HTTP 請求、MQTT 訂閱等\n   - 處理節點:函數、延遲、切換等\n   - 輸出節點:數據庫、API 調用、郵件通知等\n\n3. 易于集成和擴展\n   - 支持各種協議和服務的集成\n   - 可通過npm安裝擴展節點',timestamp: '2025-05-15T06:51:42.361Z'
}
測試完成,已斷開連接

五、在其他Linux客戶端使用MQTT與系統交互

1. 安裝MQTT客戶端工具

# Debian/Ubuntu系統
sudo apt-get install mosquitto-clients# RHEL/CentOS系統
sudo yum install mosquitto-clients# Arch系統
sudo pacman -S mosquitto

2. 訂閱消息(接收響應)

# 訂閱響應主題
mosquitto_sub -h 服務器IP -t "device/response" -v# 訂閱錯誤主題
mosquitto_sub -h 服務器IP -t "device/error" -v

3. 發送消息(請求AI處理)

# 發送消息
mosquitto_pub -h 服務器IP -t "device/data" -m '{"deviceId":"linux-001","message":"什么是物聯網?","timestamp":"'$(date -Iseconds)'"}'

4. 使用Python進行交互(可選)

import paho.mqtt.client as mqtt
import json
import time
from datetime import datetime# MQTT服務器信息
broker_address = "服務器IP"
port = 1883
pub_topic = "device/data"
sub_topics = ["device/response", "device/error"]# 回調函數 - 連接成功
def on_connect(client, userdata, flags, rc):print("已連接到MQTT Broker")# 訂閱主題for topic in sub_topics:client.subscribe(topic)print(f"已訂閱主題: {topic}")# 回調函數 - 接收消息
def on_message(client, userdata, msg):print(f"\n收到消息 主題: {msg.topic}")try:payload = json.loads(msg.payload.decode())print(json.dumps(payload, indent=2, ensure_ascii=False))except:print(msg.payload.decode())# 創建客戶端
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message# 連接到Broker
client.connect(broker_address, port, 60)# 啟動網絡循環
client.loop_start()# 發送測試消息
def send_message(message_text):msg = {"deviceId": "python-device-001","message": message_text,"timestamp": datetime.now().isoformat()}print(f"發送消息: {json.dumps(msg, ensure_ascii=False)}")client.publish(pub_topic, json.dumps(msg))# 等待連接建立
time.sleep(1)# 示例查詢
send_message("請解釋什么是物聯網?")# 保持腳本運行以接收響應
try:while True:user_input = input("\n輸入問題(輸入'exit'退出): ")if user_input.lower() == 'exit':breaksend_message(user_input)time.sleep(1)
except KeyboardInterrupt:print("程序被用戶中斷")# 斷開連接
client.loop_stop()
client.disconnect()

六、系統安全與擴展

安全配置

  1. MQTT安全加強

    • 添加用戶名密碼認證:修改/etc/mosquitto/conf.d/default.conf
    • 啟用TLS加密:配置證書
  2. Node-RED安全加強

    • 添加登錄認證:修改settings.js
    • 部署HTTPS:配置證書

系統擴展

  1. 添加Dashboard

    • 安裝Node-RED Dashboard節點
    • 創建可視化界面監控系統
  2. 添加數據存儲

    • 連接數據庫(MySQL/MongoDB)存儲交互歷史
  3. 支持多種AI模型

    • 擴展Function節點支持其他AI模型API
  4. 設備認證與管理

    • 開發設備注冊和認證機制

七、故障排除

  1. 無法連接MQTT Broker

    • 檢查防火墻是否開放1883端口
    • 檢查Mosquitto服務狀態
  2. 無法訪問Node-RED

    • 檢查防火墻是否開放1880端口
    • 檢查Node-RED進程是否運行
  3. API調用失敗

    • 檢查API Token是否正確
    • 檢查網絡連接和API地址
    • 檢查HTTP Request節點配置
    • 增加請求超時時間
  4. 消息格式錯誤

    • 確保發送JSON格式消息
    • 確保包含message字段

八、總結

通過本文檔,我們就完成了一個基于Node-RED和MQTT Broker的AI交互系統搭建,實現了:

  1. 接收來自設備的MQTT消息
  2. 處理消息并調用DeepSeek AI API
  3. 將AI響應通過MQTT返回給設備
  4. 錯誤處理和日志記錄

這個系統可以作為物聯網設備與AI大模型交互的基礎平臺,可根據實際需求進行擴展和優化。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/80475.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/80475.shtml
英文地址,請注明出處:http://en.pswp.cn/web/80475.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

算法第21天 | 第77題. 組合、216. 組合總和 III、17. 電話號碼的字母組合

回溯基礎概念 什么是回溯&#xff1f; 如何實現回溯&#xff1f; 第77題. 組合 題目 思路與解法 carl的講解&#xff1a; 回溯搜索法 class Solution:def combine(self, n: int, k: int) -> List[List[int]]:self.path []self.res []self.backtracking(n, k, 1)retu…

嵌入式硬件篇---拓展板

文章目錄 前言 前言 本文簡單介紹了拓展板的原理以及使用。

【深度學習基礎】從感知機到多層神經網絡:模型原理、結構與計算過程全解析

【深度學習基礎】從感知機到多層神經網絡&#xff1a;模型原理、結構與計算過程全解析 1. 引言 神經網絡的重要性&#xff1a; 作為人工智能的核心技術之一&#xff0c;神經網絡通過模擬人腦神經元的工作機制&#xff0c;成為解決復雜模式識別、預測和決策任務的利器。從圖像分…

sparkSQL讀入csv文件寫入mysql(2)

&#xff08;二&#xff09;創建數據庫和表 接下來&#xff0c;我們去創建一個新的數據庫&#xff0c;數據表&#xff0c;并插入一條數據。 -- 創建數據庫 CREATE DATABASE spark; -- 使用數據庫 USE spark;-- 創建表 create table person(id int, name char(20), age int);-- …

JVM如何處理多線程內存搶占問題

目錄 1、堆內存結構 2、運行時數據 3、內存分配機制 3.1、堆內存結構 3.2、內存分配方式 1、指針碰撞 2、空閑列表 4、jvm內存搶占方案 4.1、TLAB 4.2、CAS 4.3、鎖優化 4.4、逃逸分析與棧上分配 5、問題 5.1、內存分配競爭導致性能下降 5.2、偽共享&#xff08…

Ubuntu---omg又出bug了

自用遇到問題的合集 250518——桌面文件突然消失 ANS&#xff1a;參考博文

正則表達式與文本處理的藝術

引言 在前端開發領域&#xff0c;文本處理是一項核心技能。正則表達式作為一種強大的模式匹配工具&#xff0c;能夠幫助我們高效地處理各種復雜的文本操作任務。 正則表達式基礎 什么是正則表達式&#xff1f; 正則表達式是一種用于匹配字符串中字符組合的模式。它由一系列…

初學c語言15(字符和字符串函數)

一.字符串分類函數 頭文件&#xff1a;ctype.h 作用&#xff1a;判斷是什么類型的字符 函數舉例&#xff1a; 函數 符合條件就為真 islower判斷是否為小寫字符&#xff08;a~z&#xff09;isupper判斷是否為大寫字符&#xff08;A~Z&#xff09;isdigit十進制數字&#xf…

12-串口外設

一、串口外設的基本概述 1、基本定義 串口通信&#xff0c;通過在通信雙方之間以比特位&#xff08;bit&#xff09;的形式逐一發送或接收數據&#xff0c;實現了信息的有效傳遞。其通信方式不僅簡單可靠&#xff0c;而且成本很低。 2、stm32的串口 下面是兩個MCU的數據交互&…

NE555雙音門鈴實驗

1腳為地。通常被連接到電路共同接地。 2腳為觸發輸入端。 3腳為輸出端&#xff0c;輸出的電平狀態受觸發器的控制&#xff0c;而觸發器受上比較器6腳和下比較器2腳的控制。當觸發器接受上比較器A1從R腳輸入的高電平時&#xff0c;觸發器被置于復位狀態&#xff0c;3腳輸出低電…

Redis分布式鎖實現

概述 為什么要要分布式鎖 在并發編程中&#xff0c;我們通過鎖&#xff0c;來避免由于競爭而造成的數據不一致問題。 通常&#xff0c;我們以synchronized 、Lock來使用它。Java中的鎖&#xff0c;只能保證在同一個JVM進程內中執行 如果需要在分布式集群環境下的話&#xff0…

軟件設計師-錯題筆記-網絡基礎知識

1. 解析&#xff1a; 1.子網劃分相關知識&#xff1a; 在IPv4地址中&#xff0c;/27表示子網掩碼為255.255.255.224&#xff0c;它將一個C類網絡&#xff08;默認子網掩碼255.255.255.0&#xff09;進一步劃分 對于子網掩碼255.255.255.224&#xff0c;其對應的二進制為111…

Fine-Tuning Llama2 with LoRA

Fine-Tuning Llama2 with LoRA 1. What is LoRA?2. How does LoRA work?3. Applying LoRA to Llama2 models4. LoRA finetuning recipe in torchtune5. Trading off memory and model performance with LoRAModel ArgumentsReferences https://docs.pytorch.org/torchtune/ma…

python打卡day29

類的裝飾器 知識點回顧 類的裝飾器裝飾器思想的進一步理解&#xff1a;外部修改、動態類方法的定義&#xff1a;內部定義和外部定義 回顧一下&#xff0c;函數的裝飾器是 &#xff1a;接收一個函數&#xff0c;返回一個修改后的函數。類也有修飾器&#xff0c;類裝飾器本質上確…

十一、STM32入門學習之FREERTOS移植

目錄 一、FreeRTOS1、源碼下載&#xff1a;2、解壓源碼 二、移植步驟一&#xff1a;在需要移植的項目中新建myFreeRTOS的文件夾&#xff0c;用于存放FREERTOS的相關源碼步驟二&#xff1a;keil中包含相關文件夾和文件引用路徑步驟三&#xff1a;修改FreeRTOSConfig.h文件的相關…

2025 年十大網絡安全預測

隨著我們逐步邁向 2026 年&#xff0c;網絡安全領域正處于一個關鍵的轉折點&#xff0c;技術創新與數字威脅以前所未有的復雜態勢交織在一起。 地緣政治環境進一步加劇了這些網絡安全挑戰&#xff0c;國際犯罪組織利用先進的技術能力來追求戰略目標。 人工智能在這一不斷演變…

Mac 環境下 JDK 版本切換全指南

概要 在 macOS 上安裝了多個 JDK 后&#xff0c;可以通過系統自帶的 /usr/libexec/java_home 工具來查詢并切換不同版本的 Java。只需在終端中執行 /usr/libexec/java_home -V 列出所有已安裝的 JDK&#xff0c;然后將你想使用的版本路徑賦值給環境變量 JAVA_HOME&#xff0c;…

中級網絡工程師知識點6

1.堆疊方式可以共享使用交換機背板帶寬&#xff1b;級聯方式可以使用雙絞線將交換機連接在一起 2.光功率計是專門測量光功率大小的儀器&#xff0c;在對光纜進行檢測時&#xff0c;通過在光纜的發送端和接收端分別測量光功率&#xff0c;進而計算出光衰情況。 3.光時域反射計…

動態規劃——烏龜棋

題目描述 解題思路 首先這是一個很明顯的線性dp的題目&#xff0c;很容易發現規律 數據輸入 我們用 h[ N ] 數組存儲每一個格子的分數 用 cnt [ ]&#xff0c;數組表示每一中卡片的數目 1&#xff0c;狀態表示 因為這里一個有4種跳躍方式可以選擇 f[ i ][ a ][ b ][ c ][ d…

C#自定義控件-實現了一個支持平移、縮放、雙擊重置的圖像顯示控件

1. 控件概述 這是一個繼承自 Control 的自定義控件&#xff0c;主要用于圖像的顯示和交互操作&#xff0c;具有以下核心功能&#xff1a; 圖像顯示與縮放&#xff08;支持鼠標滾輪縮放&#xff09;圖像平移&#xff08;支持鼠標拖拽&#xff09;視圖重置&#xff08;雙擊重置…