? 功能簡介
該監控系統具備如下主要功能:
📁 目錄監控?? ?實時監聽指定主目錄及其所有子目錄內文件的變動情況。
🔒 文件哈希校驗?? ?對文件內容生成 SHA256 哈希,確保變更檢測基于內容而非時間戳。
🚫 排除機制?? ?支持設置需排除的子目錄或特定文件,避免頻繁改動目錄帶來的干擾。
🧾 日志記錄?? ?所有非排除目錄中的新增或被篡改的文件將自動寫入 update.txt 日志文件中。
🧠 基線建立?? ?系統首次運行時自動建立哈希基線作為“可信狀態”,便于后續變更比對
python代碼
import os
import hashlib
import json
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler# === 配置路徑(可改為環境變量支持更靈活)===
MONITOR_FOLDER = "D:\\WorkRoot\\xxxx\\fms"
HASH_DB_FILE = "D:\\hash_baseline.json"
LOG_FILE = "D:\\update.txt"
#D:\WorkRoot\xxxx\fms\UploadFiles
# ? 排除監控的相對路徑(相對于 MONITOR_FOLDER)
EXCLUDE_PATHS = ["UploadFiles","areweb\\database"
]# === 工具函數 ===
def is_excluded(path):rel_path = os.path.relpath(path, MONITOR_FOLDER)for ex in EXCLUDE_PATHS:if rel_path == ex or rel_path.startswith(ex + os.sep):return Truereturn Falsedef calculate_hash(file_path):hasher = hashlib.sha256()try:with open(file_path, 'rb') as f:while chunk := f.read(8192):hasher.update(chunk)return hasher.hexdigest()except Exception:return Nonedef log_change(message):timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())with open(LOG_FILE, 'a', encoding='utf-8') as f:f.write(f"[{timestamp}] {message}\n")def build_baseline():hash_dict = {}for root, _, files in os.walk(MONITOR_FOLDER):for file in files:full_path = os.path.join(root, file)if is_excluded(full_path):continuerel_path = os.path.relpath(full_path, MONITOR_FOLDER)file_hash = calculate_hash(full_path)if file_hash:hash_dict[rel_path] = file_hashwith open(HASH_DB_FILE, 'w') as f:json.dump(hash_dict, f, indent=4)print("? 哈希基線已建立")def load_baseline():if not os.path.exists(HASH_DB_FILE):return {}with open(HASH_DB_FILE, 'r') as f:return json.load(f)# === 文件監控事件處理 ===
class FileChangeHandler(FileSystemEventHandler):def __init__(self, baseline):self.baseline = baselinedef on_modified(self, event):if event.is_directory or is_excluded(event.src_path):returnrel_path = os.path.relpath(event.src_path, MONITOR_FOLDER)current_hash = calculate_hash(event.src_path)old_hash = self.baseline.get(rel_path)if old_hash and current_hash and current_hash != old_hash:log_change(f"🚨 文件被篡改:{rel_path}")def on_created(self, event):if event.is_directory or is_excluded(event.src_path):returnrel_path = os.path.relpath(event.src_path, MONITOR_FOLDER)log_change(f"🆕 新增文件:{rel_path}")def on_deleted(self, event):if event.is_directory or is_excluded(event.src_path):returnrel_path = os.path.relpath(event.src_path, MONITOR_FOLDER)if rel_path in self.baseline:log_change(f"? 文件被刪除:{rel_path}")# === 啟動監控器 ===
def start_monitor():if not os.path.exists(HASH_DB_FILE):build_baseline()baseline = load_baseline()print(f"🚀 正在監控文件夾:{MONITOR_FOLDER}")print(f"🚫 排除路徑:{EXCLUDE_PATHS}")print(f"📄 日志寫入文件:{LOG_FILE}")event_handler = FileChangeHandler(baseline)observer = Observer()observer.schedule(event_handler, MONITOR_FOLDER, recursive=True)observer.start()try:while True:time.sleep(1)except KeyboardInterrupt:observer.stop()observer.join()# === 程序入口 ===
if __name__ == "__main__":start_monitor()
打包exe程序??--icon=logo.ico是logo標識,可刪除
?pyinstaller --noconsole --onefile --icon=logo.ico 篡改腳本.py
優化:
可配置路徑config.json讀取路徑地址
{"monitor_folder": "D:\\WX\\fms_bak","hash_db_file": "D:\\hash_baseline.json","log_file": "D:\\update.txt","exclude_paths": ["areweb"]
}
?python代碼
import os
import hashlib
import json
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler# === 加載配置 ===
CONFIG_FILE = "C:\\config.json"def load_config():with open(CONFIG_FILE, 'r', encoding='utf-8') as f:return json.load(f)config = load_config()
MONITOR_FOLDER = config['monitor_folder']
HASH_DB_FILE = config['hash_db_file']
LOG_FILE = config['log_file']
EXCLUDE_PATHS = config['exclude_paths']# === 工具函數 ===
def is_excluded(path):rel_path = os.path.relpath(path, MONITOR_FOLDER)for ex in EXCLUDE_PATHS:if rel_path == ex or rel_path.startswith(ex + os.sep):return Truereturn Falsedef calculate_hash(file_path):hasher = hashlib.sha256()try:with open(file_path, 'rb') as f:while chunk := f.read(8192):hasher.update(chunk)return hasher.hexdigest()except Exception:return Nonedef log_change(message):timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())with open(LOG_FILE, 'a', encoding='utf-8') as f:f.write(f"[{timestamp}] {message}\n")def build_baseline():hash_dict = {}for root, _, files in os.walk(MONITOR_FOLDER):for file in files:full_path = os.path.join(root, file)if is_excluded(full_path):continuerel_path = os.path.relpath(full_path, MONITOR_FOLDER)file_hash = calculate_hash(full_path)if file_hash:hash_dict[rel_path] = file_hashwith open(HASH_DB_FILE, 'w') as f:json.dump(hash_dict, f, indent=4)print("? 哈希基線已建立")def load_baseline():if not os.path.exists(HASH_DB_FILE):return {}with open(HASH_DB_FILE, 'r') as f:return json.load(f)# === 文件監控處理器 ===
class FileChangeHandler(FileSystemEventHandler):def __init__(self, baseline):self.baseline = baselinedef on_modified(self, event):if event.is_directory or is_excluded(event.src_path):returnrel_path = os.path.relpath(event.src_path, MONITOR_FOLDER)current_hash = calculate_hash(event.src_path)old_hash = self.baseline.get(rel_path)if old_hash and current_hash and current_hash != old_hash:log_change(f"🚨 文件被篡改:{rel_path}")def on_created(self, event):if event.is_directory or is_excluded(event.src_path):returnrel_path = os.path.relpath(event.src_path, MONITOR_FOLDER)log_change(f"🆕 新增文件:{rel_path}")def on_deleted(self, event):if event.is_directory or is_excluded(event.src_path):returnrel_path = os.path.relpath(event.src_path, MONITOR_FOLDER)log_change(f"? 文件被刪除:{rel_path}")# === 啟動監控器 ===
def start_monitor():if not os.path.exists(HASH_DB_FILE):build_baseline()baseline = load_baseline()print(f"🚀 正在監控文件夾:{MONITOR_FOLDER}")print(f"🚫 排除路徑:{EXCLUDE_PATHS}")print(f"📄 日志寫入文件:{LOG_FILE}")event_handler = FileChangeHandler(baseline)observer = Observer()observer.schedule(event_handler, MONITOR_FOLDER, recursive=True)observer.start()try:while True:time.sleep(1)except KeyboardInterrupt:observer.stop()observer.join()# === 程序入口 ===
if __name__ == "__main__":start_monitor()