物聯網數據安全區塊鏈服務
下面是一個專為物聯網數據安全設計的區塊鏈服務實現,使用Python編寫并封裝為RESTful API。該服務確保物聯網設備數據的不可篡改性、可追溯性和安全性。
import hashlib
import json
import time
from datetime import datetime
from uuid import uuid4
from flask import Flask, jsonify, request, Response
import requests
from urllib.parse import urlparse
import jwt
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
import sqlite3
import threading# --------------------- 區塊鏈核心實現 ---------------------
class IoTBlockchain:def __init__(self):self.chain = []self.pending_transactions = []self.nodes = set()self.device_keys = {} # 存儲設備公鑰 {device_id: public_key}# 創建創世區塊self.create_genesis_block()# 創建數據庫連接self.db_conn = sqlite3.connect('iot_blockchain.db', check_same_thread=False)self.init_db()# 自動清理線程self.cleanup_thread = threading.Thread(target=self.periodic_cleanup, daemon=True)self.cleanup_thread.start()def init_db(self):"""初始化數據庫"""cursor = self.db_conn.cursor()cursor.execute('''CREATE TABLE IF NOT EXISTS device_data (data_id TEXT PRIMARY KEY,device_id TEXT,data_hash TEXT,timestamp REAL,block_index INTEGER,public_key TEXT)''')cursor.execute('''CREATE TABLE IF NOT EXISTS revoked_tokens (token TEXT PRIMARY KEY,revocation_time REAL)''')self.db_conn.commit()def periodic_cleanup(self):"""定期清理舊數據"""while True:time.sleep(3600) # 每小時清理一次try:cursor = self.db_conn.cursor()# 保留最近48小時的數據cutoff = time.time() - 48 * 3600cursor.execute("DELETE FROM revoked_tokens WHERE revocation_time < ?", (cutoff,))self.db_conn.commit()except Exception as e:print(f"清理錯誤: {e}")def create_genesis_block(self):"""創建創世區塊"""genesis_block = {'index': 0,'timestamp': time.time(),'transactions': [],'proof': 100,'previous_hash': '0','merkle_root': '0'}self.chain.append(genesis_block)def register_device(self, device_id, public_key):"""注冊物聯網設備"""if device_id in self.device_keys:return Falseself.device_keys[device_id] = public_keyreturn Truedef verify_signature(self, device_id, data, signature):"""驗證設備簽名"""if device_id not in self.device_keys:return Falsepublic_key = self.device_keys[device_id]try:# 在實際應用中應使用更安全的驗證方法# 這里簡化實現pub_key = serialization.load_pem_public_key(public_key.encode(),backend=default_backend())# 在實際應用中應使用pub_key.verify()方法# 這里簡化驗證過程calculated_hash = self.hash_data(data)return calculated_hash == signatureexcept:return Falsedef hash_data(self, data):"""計算數據的哈希值"""if isinstance(data, dict):data_str = json.dumps(data, sort_keys=True)else:data_str = str(data)return hashlib.sha256(data_str.encode()).hexdigest()def create_merkle_root(self, transactions):"""創建Merkle樹根哈希"""if not transactions:return "0"# 計算所有交易的哈希hashes = [self.hash_data(tx) for tx in transactions]while len(hashes) > 1:new_hashes = []# 兩兩配對計算哈希for i in range(0, len(hashes), 2):if i + 1 < len(hashes):combined = hashes[i] + hashes[i + 1]else:combined = hashes[i] + hashes[i] # 奇數個時復制最后一個new_hashes.append(hashlib.sha256(combined.encode()).hexdigest())hashes = new_hashesreturn hashes[0]def new_transaction(self, device_id, data, signature):"""創建新的物聯網數據交易"""# 驗證簽名if not self.verify_signature(device_id, data, signature):return None# 生成唯一數據IDdata_id = str(uuid4())# 存儲到數據庫(在實際應用中應使用更安全的存儲)cursor = self.db_conn.cursor()cursor.execute("INSERT INTO device_data (data_id, device_id, data_hash, timestamp, public_key) VALUES (?, ?, ?, ?, ?)",(data_id, device_id, self.hash_data(data), time.time(), self.device_keys[device_id])self.db_conn.commit()# 添加到待處理交易transaction = {'data_id': data_id,'device_id': device_id,'data_hash': self.hash_data(data),'timestamp': time.time(),'signature': signature}self.pending_transactions.append(transaction)return data_iddef mine(self):"""挖礦創建新區塊"""if not self.pending_transactions:return Nonelast_block = self.last_blocklast_proof = last_block['proof']proof = self.proof_of_work(last_proof)# 創建Merkle根merkle_root = self.create_merkle_root(self.pending_transactions)# 創建新區塊block = {'index': len(self.chain),'timestamp': time.time(),'transactions': self.pending_transactions,'proof': proof,'previous_hash': self.hash_block(last_block),'merkle_root': merkle_root}# 重置待處理交易self.pending_transactions = []# 添加到區塊鏈self.chain.append(block)return blockdef proof_of_work(self, last_proof):"""簡單的工作量證明算法"""proof = 0while not self.valid_proof(last_proof, proof):proof += 1return proofdef valid_proof(self, last_proof, proof):"""驗證工作量證明"""guess = f'{last_proof}{proof}'.encode()guess_hash = hashlib.sha256(guess).hexdigest()# 調整難度:要求前3位為0(可根據需求調整)return guess_hash[:3] == "000"def hash_block(self, block):"""計算區塊的哈希值"""block_string = json.dumps(block, sort_keys=True).encode()return hashlib.sha256(block_string).hexdigest()@propertydef last_block(self):"""獲取最后一個區塊"""return self.chain[-1]def validate_chain(self):"""驗證區塊鏈的完整性"""for i in range(1, len(self.chain)):current_block = self.chain[i]previous_block = self.chain[i-1]# 檢查區塊哈希是否正確if current_block['previous_hash'] != self.hash_block(previous_block):return False# 檢查工作量證明if not self.valid_proof(previous_block['proof'], current_block['proof']):return False# 驗證Merkle根calculated_merkle = self.create_merkle_root(current_block['transactions'])if calculated_merkle != current_block['merkle_root']:return False# 驗證交易簽名for tx in current_block['transactions']:if tx['device_id'] not in self.device_keys:return False# 在實際應用中應使用公鑰驗證簽名# 這里簡化驗證過程stored_data = self.get_data_by_id(tx['data_id'])if not stored_data or stored_data['data_hash'] != tx['data_hash']:return Falsereturn Truedef get_data_by_id(self, data_id):"""根據ID獲取數據記錄"""cursor = self.db_conn.cursor()cursor.execute("SELECT * FROM device_data WHERE data_id = ?", (data_id,))row = cursor.fetchone()if row:return {'data_id': row[0],'device_id': row[1],'data_hash': row[2],'timestamp': row[3],'block_index': row[4],'public_key': row[5]}return Nonedef revoke_token(self, token):"""撤銷JWT令牌"""cursor = self.db_conn.cursor()cursor.execute("INSERT OR REPLACE INTO revoked_tokens (token, revocation_time) VALUES (?, ?)",(token, time.time()))self.db_conn.commit()return Truedef is_token_revoked(self, token):"""檢查令牌是否已被撤銷"""cursor = self.db_conn.cursor()cursor.execute("SELECT token FROM revoked_tokens WHERE token = ?", (token,))return cursor.fetchone() is not None# --------------------- Flask應用和API ---------------------
app = Flask(__name__)
blockchain = IoTBlockchain()# 生成RSA密鑰對用于API認證
private_key = rsa.generate_private_key(public_exponent=65537,key_size=2048,backend=default_backend()
)
public_key = private_key.public_key()# 序列化密鑰
private_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM,format=serialization.PrivateFormat.PKCS8,encryption_algorithm=serialization.NoEncryption()
).decode()public_pem = public_key.public_bytes(encoding=serialization.Encoding.PEM,format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode()# API密鑰(僅用于演示)
API_KEYS = {"admin": "supersecretkey123","device_manager": "devicekey456"
}# JWT認證裝飾器
def jwt_required(f):def decorated_function(*args, **kwargs):token = Noneif 'Authorization' in request.headers:token = request.headers['Authorization'].split(" ")[1]if not token:return jsonify({'message': 'Token is missing'}), 401if blockchain.is_token_revoked(token):return jsonify({'message': 'Token has been revoked'}), 401try:# 在實際應用中應驗證簽名# 這里簡化驗證過程data = jwt.decode(token, algorithms=["HS256"], options={"verify_signature": False})request.current_user = data['sub']except:return jsonify({'message': 'Token is invalid'}), 401return f(*args, **kwargs)return decorated_function@app.route('/api/auth', methods=['POST'])
def authenticate():"""獲取JWT令牌"""auth_data = request.get_json()if not auth_data or 'api_key' not in auth_data:return jsonify({'message': 'Missing API key'}), 400api_key = auth_data['api_key']if api_key not in API_KEYS.values():return jsonify({'message': 'Invalid API key'}), 401# 確定用戶角色role = "user"for k, v in API_KEYS.items():if v == api_key:role = kbreak# 生成JWT令牌(在實際應用中應設置合理過期時間)token = jwt.encode({'sub': role,'iat': datetime.utcnow(),# 'exp': datetime.utcnow() + timedelta(hours=1) # 設置過期時間}, 'secret', algorithm='HS256') # 在實際應用中應使用更安全的密鑰return jsonify({'token': token})@app.route('/api/devices/register', methods=['POST'])
@jwt_required
def register_device():"""注冊物聯網設備"""if request.current_user != 'admin':return jsonify({'message': 'Unauthorized'}), 403data = request.get_json()if not data or 'device_id' not in data or 'public_key' not in data:return jsonify({'message': 'Missing device ID or public key'}), 400device_id = data['device_id']public_key = data['public_key']if blockchain.register_device(device_id, public_key):return jsonify({'message': f'Device {device_id} registered successfully'}), 201else:return jsonify({'message': f'Device {device_id} already registered'}), 400@app.route('/api/data/submit', methods=['POST'])
@jwt_required
def submit_data():"""提交物聯網數據"""data = request.get_json()if not data or 'device_id' not in data or 'data' not in data or 'signature' not in data:return jsonify({'message': 'Missing required fields'}), 400device_id = data['device_id']sensor_data = data['data']signature = data['signature']data_id = blockchain.new_transaction(device_id, sensor_data, signature)if data_id:return jsonify({'message': 'Data submitted successfully','data_id': data_id}), 201else:return jsonify({'message': 'Invalid device signature'}), 400@app.route('/api/chain/mine', methods=['POST'])
@jwt_required
def mine_block():"""挖礦創建新區塊"""if request.current_user != 'admin':return jsonify({'message': 'Unauthorized'}), 403block = blockchain.mine()if block:return jsonify({'message': 'New block mined','block': block}), 201else:return jsonify({'message': 'No transactions to mine'}), 400@app.route('/api/chain', methods=['GET'])
@jwt_required
def get_full_chain():"""獲取完整區塊鏈"""return jsonify({'chain': blockchain.chain,'length': len(blockchain.chain)}), 200@app.route('/api/data/<data_id>', methods=['GET'])
@jwt_required
def get_data(data_id):"""根據ID獲取數據記錄"""data_record = blockchain.get_data_by_id(data_id)if data_record:# 在實際應用中應返回更多信息return jsonify({'data_id': data_record['data_id'],'device_id': data_record['device_id'],'timestamp': data_record['timestamp'],'block_index': data_record['block_index']}), 200else:return jsonify({'message': 'Data not found'}), 404@app.route('/api/validate', methods=['GET'])
@jwt_required
def validate_chain():"""驗證區塊鏈完整性"""is_valid = blockchain.validate_chain()if is_valid:return jsonify({'message': 'Blockchain is valid'}), 200else:return jsonify({'message': 'Blockchain is invalid'}), 400@app.route('/api/auth/revoke', methods=['POST'])
@jwt_required
def revoke_token():"""撤銷當前令牌"""token = request.headers['Authorization'].split(" ")[1]if blockchain.revoke_token(token):return jsonify({'message': 'Token revoked successfully'}), 200else:return jsonify({'message': 'Failed to revoke token'}), 400@app.route('/api/public_key', methods=['GET'])
def get_public_key():"""獲取API公鑰(用于客戶端驗證)"""return Response(public_pem, mimetype='text/plain')# --------------------- 設備模擬器 ---------------------
class IoTDeviceSimulator:def __init__(self, device_id):self.device_id = device_id# 生成設備密鑰對self.private_key = rsa.generate_private_key(public_exponent=65537,key_size=2048,backend=default_backend())self.public_key = self.private_key.public_key().public_bytes(encoding=serialization.Encoding.PEM,format=serialization.PublicFormat.SubjectPublicKeyInfo).decode()def sign_data(self, data):"""簽名數據(簡化實現)"""# 在實際應用中應使用私鑰簽名# 這里返回數據的哈希作為簡化簽名if isinstance(data, dict):data_str = json.dumps(data, sort_keys=True)else:data_str = str(data)return hashlib.sha256(data_str.encode()).hexdigest()def generate_data(self):"""生成模擬傳感器數據"""return {'temperature': round(20 + 10 * (time.time() % 1), 2),'humidity': round(40 + 30 * ((time.time() + 0.3) % 1), 2),'pressure': round(980 + 40 * ((time.time() + 0.7) % 1), 2),'timestamp': time.time()}# --------------------- 主程序 ---------------------
if __name__ == '__main__':import argparseparser = argparse.ArgumentParser(description='IoT Blockchain API Server')parser.add_argument('-p', '--port', type=int, default=5000, help='Port to run the server on')parser.add_argument('-d', '--debug', action='store_true', help='Run in debug mode')args = parser.parse_args()# 注冊一個模擬設備sim_device = IoTDeviceSimulator("sensor-001")blockchain.register_device(sim_device.device_id, sim_device.public_key)# 啟動Flask應用app.run(host='0.0.0.0', port=args.port, debug=args.debug)
功能說明
1. 核心區塊鏈功能
- 設備注冊:物聯網設備使用公鑰注冊到系統
- 數據提交:設備提交帶簽名的傳感器數據
- 區塊挖礦:將待處理數據打包進新區塊
- Merkle樹:用于高效驗證區塊中的交易
- 工作量證明:簡單的PoW共識機制
2. 安全特性
- 設備身份驗證:每個設備使用公私鑰對進行身份驗證
- 數據簽名:設備對提交的數據進行簽名
- API認證:使用JWT令牌保護API端點
- 令牌撤銷:支持撤銷已發放的JWT令牌
3. 數據管理
- SQLite數據庫:存儲設備數據和撤銷令牌
- 數據檢索:通過數據ID查詢數據記錄
- 自動清理:定期清理舊數據
4. RESTful API 端點
端點 | 方法 | 描述 | 認證 |
---|---|---|---|
/api/auth | POST | 獲取JWT令牌 | API密鑰 |
/api/devices/register | POST | 注冊新設備 | admin令牌 |
/api/data/submit | POST | 提交傳感器數據 | 有效JWT |
/api/chain/mine | POST | 挖礦創建新區塊 | admin令牌 |
/api/chain | GET | 獲取完整區塊鏈 | 有效JWT |
/api/data/<data_id> | GET | 獲取數據記錄 | 有效JWT |
/api/validate | GET | 驗證區塊鏈完整性 | 有效JWT |
/api/auth/revoke | POST | 撤銷當前令牌 | 有效JWT |
/api/public_key | GET | 獲取API公鑰 | 無 |
部署和使用指南
1. 啟動服務
python iot_blockchain.py
2. 獲取API令牌
curl -X POST http://localhost:5000/api/auth \-H "Content-Type: application/json" \-d '{"api_key": "supersecretkey123"}'
3. 注冊物聯網設備
curl -X POST http://localhost:5000/api/devices/register \-H "Authorization: Bearer <JWT_TOKEN>" \-H "Content-Type: application/json" \-d '{"device_id": "sensor-002", "public_key": "-----BEGIN PUBLIC KEY-----\n..."}'
4. 提交傳感器數據
curl -X POST http://localhost:5000/api/data/submit \-H "Authorization: Bearer <JWT_TOKEN>" \-H "Content-Type: application/json" \-d '{"device_id": "sensor-001","data": {"temperature": 25.5, "humidity": 60},"signature": "<DATA_SIGNATURE>"}'
5. 挖礦創建新區塊
curl -X POST http://localhost:5000/api/chain/mine \-H "Authorization: Bearer <JWT_TOKEN>"
6. 查看區塊鏈
curl http://localhost:5000/api/chain \-H "Authorization: Bearer <JWT_TOKEN>"
系統架構圖
實際應用建議
-
增強安全性:
- 使用硬件安全模塊(HSM)管理密鑰
- 實現真正的數字簽名驗證
- 添加傳輸層加密(HTTPS)
-
性能優化:
- 使用更高效的共識算法(如PoS)
- 實現分片技術處理大量設備
- 使用分布式數據庫(如Cassandra)
-
擴展功能:
- 添加設備管理面板
- 實現數據分析和告警功能
- 支持設備固件驗證
-
存儲優化:
- 使用IPFS存儲大型傳感器數據
- 實現數據壓縮和聚合
- 添加時間序列數據庫支持
-
隱私保護:
- 實現零知識證明驗證
- 添加數據脫敏功能
- 支持差分隱私
這個區塊鏈服務為物聯網數據提供了強大的安全保障,確保數據的完整性和不可篡改性,同時通過API接口提供了靈活的集成方式。