Python實例題:基于邊緣計算的智能物聯網系統

目錄

Python實例題

題目

問題描述

解題思路

關鍵代碼框架

難點分析

擴展方向

Python實例題

題目

基于邊緣計算的智能物聯網系統

問題描述

開發一個基于邊緣計算的智能物聯網系統,包含以下功能:

  • 邊緣設備管理:連接和管理大量物聯網設備
  • 邊緣計算節點:在邊緣處理數據,減少云傳輸
  • 實時數據分析:實時處理和分析傳感器數據
  • 智能決策:基于 AI 模型的自動化決策
  • 云邊協同:邊緣節點與云端的協同工作

解題思路

  • 設計邊緣設備與邊緣節點的通信協議
  • 開發邊緣計算框架處理本地數據
  • 訓練和部署輕量級 AI 模型到邊緣節點
  • 實現云邊數據同步和協同機制
  • 構建可視化界面監控系統狀態

關鍵代碼框架

# 邊緣節點主程序
import asyncio
import json
import time
import logging
from typing import Dict, List, Any
import numpy as np
from sklearn.ensemble import IsolationForest
import paho.mqtt.client as mqtt
from sqlalchemy import create_engine, Column, Integer, Float, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import datetime# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)# 配置
CONFIG = {'mqtt_broker': 'localhost','mqtt_port': 1883,'mqtt_username': 'edge_node','mqtt_password': 'edge_password','data_topic': 'iot/data/#','control_topic': 'iot/control/','edge_id': 'edge-node-01','db_url': 'sqlite:///edge_data.db','anomaly_detection_window': 100,'sync_interval': 60  # 數據同步到云端的間隔(秒)
}# 數據庫模型
Base = declarative_base()class SensorData(Base):__tablename__ = 'sensor_data'id = Column(Integer, primary_key=True)device_id = Column(String(50))sensor_type = Column(String(50))value = Column(Float)timestamp = Column(DateTime)processed = Column(Boolean, default=False)is_anomaly = Column(Boolean, default=False)class AnomalyAlert(Base):__tablename__ = 'anomaly_alerts'id = Column(Integer, primary_key=True)device_id = Column(String(50))sensor_type = Column(String(50))value = Column(Float)timestamp = Column(DateTime)confidence = Column(Float)resolved = Column(Boolean, default=False)# 初始化數據庫
engine = create_engine(CONFIG['db_url'])
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)# 異常檢測模型
class AnomalyDetector:def __init__(self, window_size=100):self.window_size = window_sizeself.models = {}  # 每個傳感器一個模型def train(self, device_id, sensor_type, data):"""訓練異常檢測模型"""if len(data) < 10:  # 至少需要10個數據點return# 重塑數據以適應模型X = np.array(data).reshape(-1, 1)# 創建或更新模型model_key = f"{device_id}_{sensor_type}"if model_key not in self.models:self.models[model_key] = IsolationForest(contamination=0.1, random_state=42)# 訓練模型self.models[model_key].fit(X)def detect(self, device_id, sensor_type, value):"""檢測異常"""model_key = f"{device_id}_{sensor_type}"if model_key not in self.models:return False, 0.0# 預測X = np.array([value]).reshape(1, -1)prediction = self.models[model_key].predict(X)decision_function = self.models[model_key].decision_function(X)# 轉換為異常分數 (越負越異常)anomaly_score = -decision_function[0]is_anomaly = prediction[0] == -1return is_anomaly, anomaly_score# MQTT客戶端
class EdgeMQTTClient:def __init__(self, config, data_processor, cloud_sync):self.config = configself.data_processor = data_processorself.cloud_sync = cloud_syncself.client = mqtt.Client(client_id=config['edge_id'])# 設置回調函數self.client.on_connect = self.on_connectself.client.on_message = self.on_messageself.client.on_disconnect = self.on_disconnect# 設置認證self.client.username_pw_set(config['mqtt_username'], config['mqtt_password'])def connect(self):"""連接到MQTT代理"""try:self.client.connect(self.config['mqtt_broker'], self.config['mqtt_port'], 60)self.client.loop_start()logger.info(f"已連接到MQTT代理: {self.config['mqtt_broker']}:{self.config['mqtt_port']}")except Exception as e:logger.error(f"連接MQTT代理失敗: {e}")raisedef disconnect(self):"""斷開MQTT連接"""self.client.loop_stop()self.client.disconnect()logger.info("已斷開MQTT連接")def on_connect(self, client, userdata, flags, rc):"""連接成功回調"""if rc == 0:logger.info("MQTT連接成功")# 訂閱數據主題client.subscribe(self.config['data_topic'])logger.info(f"已訂閱主題: {self.config['data_topic']}")else:logger.error(f"MQTT連接失敗,錯誤碼: {rc}")def on_message(self, client, userdata, msg):"""消息接收回調"""try:topic = msg.topicpayload = msg.payload.decode('utf-8')data = json.loads(payload)logger.debug(f"收到消息 - 主題: {topic}, 數據: {data}")# 處理數據self.data_processor.process_data(data)except json.JSONDecodeError as e:logger.error(f"JSON解析錯誤: {e}, 數據: {msg.payload}")except Exception as e:logger.error(f"處理消息時出錯: {e}")def on_disconnect(self, client, userdata, rc):"""斷開連接回調"""logger.warning(f"MQTT連接斷開,錯誤碼: {rc}")# 嘗試重新連接try:time.sleep(5)self.client.reconnect()except Exception as e:logger.error(f"重新連接失敗: {e}")def publish_control_message(self, device_id, command, payload=None):"""發布控制消息"""topic = f"{self.config['control_topic']}{device_id}"message = {'command': command,'payload': payload,'timestamp': datetime.datetime.now().isoformat()}try:self.client.publish(topic, json.dumps(message))logger.info(f"發布控制消息 - 主題: {topic}, 消息: {message}")except Exception as e:logger.error(f"發布控制消息失敗: {e}")# 數據處理器
class DataProcessor:def __init__(self, config, db_session, anomaly_detector):self.config = configself.db_session = db_sessionself.anomaly_detector = anomaly_detectorself.sensor_data_buffers = {}  # 存儲傳感器數據緩沖區def process_data(self, data):"""處理傳感器數據"""try:# 提取數據device_id = data.get('device_id')sensor_type = data.get('sensor_type')value = data.get('value')timestamp = data.get('timestamp')if not device_id or not sensor_type or value is None:logger.warning("無效的傳感器數據,缺少必要字段")return# 轉換時間戳if timestamp:try:timestamp = datetime.datetime.fromisoformat(timestamp)except ValueError:timestamp = datetime.datetime.now()else:timestamp = datetime.datetime.now()# 檢測異常buffer_key = f"{device_id}_{sensor_type}"if buffer_key not in self.sensor_data_buffers:self.sensor_data_buffers[buffer_key] = []self.sensor_data_buffers[buffer_key].append(value)# 保持緩沖區大小if len(self.sensor_data_buffers[buffer_key]) > self.config['anomaly_detection_window']:self.sensor_data_buffers[buffer_key] = self.sensor_data_buffers[buffer_key][-self.config['anomaly_detection_window']:]# 訓練異常檢測模型self.anomaly_detector.train(device_id, sensor_type, self.sensor_data_buffers[buffer_key])# 檢測異常is_anomaly, confidence = self.anomaly_detector.detect(device_id, sensor_type, value)# 保存數據到數據庫sensor_data = SensorData(device_id=device_id,sensor_type=sensor_type,value=value,timestamp=timestamp,is_anomaly=is_anomaly)self.db_session.add(sensor_data)# 如果是異常,記錄警報if is_anomaly:alert = AnomalyAlert(device_id=device_id,sensor_type=sensor_type,value=value,timestamp=timestamp,confidence=confidence)self.db_session.add(alert)logger.warning(f"檢測到異常 - 設備: {device_id}, 傳感器: {sensor_type}, 值: {value}, 置信度: {confidence}")# 提交事務self.db_session.commit()logger.info(f"處理數據 - 設備: {device_id}, 傳感器: {sensor_type}, 值: {value}, 是否異常: {is_anomaly}")except Exception as e:self.db_session.rollback()logger.error(f"處理數據時出錯: {e}")# 云同步服務
class CloudSyncService:def __init__(self, config, db_session, mqtt_client):self.config = configself.db_session = db_sessionself.mqtt_client = mqtt_clientself.cloud_topic = "cloud/data"async def start_periodic_sync(self):"""啟動定期數據同步"""while True:try:logger.info("開始云同步...")self.sync_data_to_cloud()logger.info("云同步完成")# 等待下一次同步await asyncio.sleep(self.config['sync_interval'])except Exception as e:logger.error(f"云同步失敗: {e}")await asyncio.sleep(self.config['sync_interval'])def sync_data_to_cloud(self):"""將數據同步到云端"""try:# 獲取未同步的數據unsynced_data = self.db_session.query(SensorData).filter(SensorData.processed == False).limit(100).all()if not unsynced_data:logger.info("沒有需要同步的數據")return# 準備數據發送data_to_send = []for data in unsynced_data:data_to_send.append({'id': data.id,'device_id': data.device_id,'sensor_type': data.sensor_type,'value': data.value,'timestamp': data.timestamp.isoformat(),'is_anomaly': data.is_anomaly})# 發送數據到云端payload = {'edge_id': self.config['edge_id'],'data': data_to_send,'timestamp': datetime.datetime.now().isoformat()}self.mqtt_client.publish(self.cloud_topic, json.dumps(payload))# 標記數據為已同步for data in unsynced_data:data.processed = Trueself.db_session.commit()logger.info(f"已同步 {len(unsynced_data)} 條數據到云端")except Exception as e:self.db_session.rollback()logger.error(f"同步數據到云端時出錯: {e}")# 主應用
async def main():# 初始化數據庫會話session = Session()# 初始化異常檢測器anomaly_detector = AnomalyDetector(window_size=CONFIG['anomaly_detection_window'])# 初始化數據處理器data_processor = DataProcessor(CONFIG, session, anomaly_detector)# 初始化云同步服務cloud_sync = CloudSyncService(CONFIG, session, None)  # MQTT客戶端稍后設置# 初始化MQTT客戶端mqtt_client = EdgeMQTTClient(CONFIG, data_processor, cloud_sync)mqtt_client.connect()# 設置MQTT客戶端到云同步服務cloud_sync.mqtt_client = mqtt_client# 啟動云同步任務sync_task = asyncio.create_task(cloud_sync.start_periodic_sync())# 保持應用運行try:await sync_taskexcept KeyboardInterrupt:logger.info("應用被用戶中斷")finally:# 清理資源mqtt_client.disconnect()session.close()logger.info("應用已停止")if __name__ == "__main__":asyncio.run(main())

難點分析

  • 邊緣節點資源限制:在資源受限的設備上運行復雜算法
  • 實時數據處理:處理高頻率、大量的傳感器數據
  • 網絡可靠性:在不穩定網絡環境下保持系統正常運行
  • 安全與隱私:保護邊緣設備和數據的安全
  • 云邊協同機制:設計高效的云邊數據同步和協同策略

擴展方向

  • 添加更多邊緣智能功能(如計算機視覺、自然語言處理)
  • 實現邊緣節點的自動擴展和負載均衡
  • 開發邊緣設備管理平臺
  • 添加預測性維護功能
  • 集成區塊鏈技術保證數據完整性

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/910317.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/910317.shtml
英文地址,請注明出處:http://en.pswp.cn/news/910317.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

一,python語法教程.內置API

一&#xff0c;字符串相關API string.strip([chars])方法&#xff1a;移除字符串開頭和結尾的空白字符&#xff08;如空格、制表符、換行符等&#xff09;&#xff0c;它不會修改原始字符串&#xff0c;而是返回一個新的處理后的字符串 chars&#xff08;可選&#xff09;&…

私有 Word 文件預覽轉 PDF 實現方案

私有 Word 文件在線預覽方案&#xff08;.doc/.docx 轉 PDF&#xff09; 前言 由于 .doc 和 .docx Word 文件 無法在瀏覽器中直接預覽&#xff08;尤其在私有 API 場景下&#xff09;&#xff0c;常見的 Content-Disposition: inline 并不能生效。因此&#xff0c;本方案通過…

Alpine Docker 容器中安裝包緩存與 C/C++ 運行問題

在使用 Docker 容器部署應用時&#xff0c;基于 Alpine 鏡像能帶來輕量化的優勢&#xff0c;但過程中也會遇到不少問題。今天就來分享下我在 Alpine 容器中解決安裝包緩存與 C/C 程序運行問題的經驗。 一、Alpine 安裝包緩存到本地目錄 Alpine Linux 默認使用apk作為包管理工…

[2-02-02].第59節:功能函數 - 函數基礎

服務器端操作學習大綱 一、函數基礎 需求場景 在shell腳本的編寫過程中&#xff0c;我們經常會遇到一些功能代碼場景&#xff1a;多條命令組合在一起&#xff0c;實現一個特定的功能場景邏輯、一些命令在腳本內部的多個位置頻繁出現。在這些場景的代碼量往往不多&#xff0c;…

RA4M2開發涂鴉模塊CBU(6)----RA4M2驅動涂鴉CBU模組

RA4M2開發涂鴉模塊CBU.6--RA4M2驅動涂鴉CBU模組 概述視頻教學樣品申請參考程序硬件準備接口生成UARTUART屬性配置R_SCI_UART_Open()函數原型回調函數user_uart_callback0 ()變量定義按鍵回調更新按鍵狀態DP-LED 同步長按進入配網涂鴉協議解析主循環任務調度 概述 本方案基于瑞…

MiniMax-M1: Scaling Test-TimeCompute Efficiently with I Lightning Attention

我們推出了MiniMax-M1&#xff0c;這是全球首個開源權重、大規模混合注意力推理模型。MiniMax-M1采用了混合專家系統&#xff08;Mixture-of-Experts&#xff0c;簡稱MoE&#xff09;架構&#xff0c;并結合了閃電注意力機制。該模型是在我們之前的MiniMax-Text-01模型&#xf…

Appium+python自動化(二十六) -Toast提示

在日常使用App過程中&#xff0c;經常會看到App界面有一些彈窗提示&#xff08;如下圖所示&#xff09;這些提示元素出現后等待3秒左右就會自動消失&#xff0c;那么我們該如何獲取這些元素文字內容呢&#xff1f; Toast簡介 Android中的Toast是一種簡易的消息提示框。 當視圖…

【信號與系統三】離散時間傅里葉變換

上一講我們講述了連續時間傅里葉變換&#xff0c;這一講同理來個離散時間傅里葉變換。 和上講模塊類似 5.1離散時間傅里葉變換 這一式子就是離散時間傅里葉變換對 5.2周期信號的傅里葉變換 同理&#xff0c;由于之前第一講講到&#xff1a; 可以推出&#xff1a; 舉個例子&am…

Python應用石頭剪刀布練習初解

大家好!作為 Python 初學者&#xff0c;尋找一個既簡單又有趣的項目來練習編程技能是至關重要的。今天&#xff0c;我將向大家介紹一個經典的編程練習——石頭剪刀布游戲&#xff0c;它可以幫助你掌握 Python 的基本概念&#xff0c;如條件語句、隨機數生成和用戶輸入處理等。 …

私有規則庫:企業合規與安全的終極防線

2.1 為什么企業需要私有規則庫?——合規與安全的最后防線 真實案例:2023年某跨境電商因員工泄露內部檢測規則,導致黑產繞過風控系統,損失1200萬+ 企業規則庫的三大剛需: 行業合規: 金融行業需符合《個人金融信息保護技術規范》 醫療行業需滿足HIPAA患者數據脫敏要求 業…

長尾關鍵詞優化SEO核心策略

內容概要 本文旨在系統解析長尾關鍵詞在搜索引擎優化中的核心地位&#xff0c;為讀者提供從理論到實踐的全面指南。文章首先探討長尾關鍵詞的基礎作用&#xff0c;幫助理解其在提升網站流量質量中的價值。接著&#xff0c;深入介紹精準定位低搜索量、高轉化率關鍵詞的策略&…

騰訊云事件總線:構建毫秒級響應的下一代事件驅動架構

摘要 事件總線&#xff08;EventBridge&#xff09;作為云原生架構的核心樞紐&#xff0c;其性能與可靠性直接影響企業系統彈性。騰訊云事件總線基于TGW云網關底層能力重構&#xff0c;實現單節點吞吐量提升125%、故障恢復時間降至4秒級&#xff08;行業平均>30秒&#xff0…

PyTorch 中mm和bmm函數的使用詳解

torch.mm 是 PyTorch 中用于 二維矩陣乘法&#xff08;matrix-matrix multiplication&#xff09; 的函數&#xff0c;等價于數學中的 A B 矩陣乘積。 一、函數定義 torch.mm(input, mat2) → Tensor執行的是兩個 2D Tensor&#xff08;矩陣&#xff09;的標準矩陣乘法。 in…

Qt 解析復雜對象構成

Qt 解析復雜對象構成 dumpStructure 如 QComboBox / QCalendarWidget / QSpinBox … void Widget::Widget(QWidget* parent){auto c new QCalendarWidget(this);dumpStructure(c,4); }void Widget::dumpStructure(const QObject *obj, int spaces) {qDebug() << QString…

山姆·奧特曼:從YC到OpenAI,硅谷創新之星的崛起

名人說&#xff1a;路漫漫其修遠兮&#xff0c;吾將上下而求索。—— 屈原《離騷》 創作者&#xff1a;Code_流蘇(CSDN)&#xff08;一個喜歡古詩詞和編程的Coder&#x1f60a;&#xff09; 山姆奧特曼&#xff1a;從YC到OpenAI&#xff0c;硅谷創新之星的崛起 在人工智能革命…

PHP語法基礎篇(五):流程控制

任何 PHP 腳本都是由一系列語句構成的。一條語句可以是一個賦值語句&#xff0c;一個函數調用&#xff0c;一個循環&#xff0c;一個條件語句或者甚至是一個什么也不做的語句&#xff08;空語句&#xff09;。語句通常以分號結束。此外&#xff0c;還可以用花括號將一組語句封裝…

怎么隱藏關閉或恢復顯示輸入法的懸浮窗

以搜狗輸入法為例&#xff0c;隱藏輸入法懸浮窗 懸浮窗在輸入法里的官方叫法為【狀態欄】。 假設目前大家的輸入法相關顯示呈現如下狀態&#xff1a; 那我們只需在輸入法懸浮窗&#xff08;狀態欄&#xff09;的任意位置鼠標右鍵單擊&#xff0c;調出輸入法菜單&#xff0c;就…

Electron (02)集成 SpringBoot:服務與桌面程序協同啟動方案

本篇是關于把springboot生成的jar打到electron里&#xff0c;在生成的桌面程序啟動時springboot服務就會自動啟動。 雖然之后并不需要這種方案&#xff0c;更好的是部署[一套服務端&#xff0c;多個客戶端]...但是既然搭建成功了&#xff0c;也記錄一下。 前端文件 1、main.js…

2025年計算機應用與神經網絡國際會議(CANN 2025)

2025 International Conference on Computer Applications and Neural Networks &#xff08;一&#xff09;會議信息 會議簡稱&#xff1a;CANN 2025 大會地點&#xff1a;中國重慶 收錄檢索&#xff1a;提交Ei Compendex,CPCI,CNKI,Google Scholar等 &#xff08;二&#x…

振動分析中的低頻噪聲問題:從理論到實踐的完整解決方案

前言 在振動監測和結構健康監測領域&#xff0c;我們經常需要從加速度信號計算速度和位移。然而&#xff0c;許多工程師在實際應用中都會遇到一個令人困擾的問題&#xff1a;通過積分計算得到的速度和位移頻譜中低頻噪聲異常放大。 本文將深入分析這個問題的根本原因&#xf…