背景:已知某后臺服務將日志存放在本地硬盤的日志文件中,該服務也支持代碼熱更新,并在完成熱更新后輸出一條日志。我們需要對服務日志進行監控,以確保文件熱更新后的錯誤能被第一時間發現。
我們提供 Python 程序模擬(https://pastebin.com/pZH8wruC,密碼pWXRFeSpwU)上述行為,該程序會不斷生成日志,并輸出到日志文件 1.log 中,日志格式參考源碼。
要求:
1、使用 Python 或者 Shell 對上述日志進行檢測,檢測方式可以是常駐進程實時檢測,也可以通過系統定時任務(需要說明如何配置)定時檢測日志;
2、當檢測到熱更新日志時(以"load file:"開頭,具體格式參考源碼),對錯誤日志(error級別)進行分類統計(分類按照日志內容進行);
3、如果熱更新后1分鐘內某一類別的錯誤日志比熱更新前1分鐘內的量級更多(增加50%),則在另一個文件 2.log 中記錄相關日志(附帶該量級超標的日志內容、熱更新前后的量級信息)。
?
//這是鏈接里面的代碼# 日志生成用例# - 程序隨機地生成 info 和 error 級別日志# - 每隔一段時間產生熱更新日志# - 模擬熱更新后出現更多的錯誤日志import osimport timeimport randomimport loggingdef init_logging():logger = logging.getLogger()logger.setLevel(logging.INFO)logger_format = logging.Formatter("[%(asctime)s][%(levelname)s][%(module)s:%(funcName)s:%(lineno)d] %(message)s")logger_console = logging.StreamHandler()logger_console.setLevel(logging.INFO)logger_console.setFormatter(logger_format)logger.addHandler(logger_console)logger_file = logging.FileHandler("1.log")logger_file.setLevel(logging.INFO)logger_file.setFormatter(logger_format)logger.addHandler(logger_file)STAGE_CONFIG = {"normal": {"duration": "70±10","info_base_prob": 0.01,"info_add_prob": 0,"error_base_prob": 0.01,"error_add_prob": 0,},"after_hotreload": {"duration": "80±20","info_base_prob": 0.1,"info_add_prob": 0.1,"error_base_prob": 0.1,"error_add_prob": 0.2,}}INFO_LOG_TEMPLATE = ["session connected","session closed","player login","player registered","player logout","buy item flow","sell item flow",]ERROR_LOG_TEMPLATE = ["function %d error occur","player login failed","incorrect password","establish connection failed","bad arguments",]INFO_HOTRELOAD_TEMPLATE = ["load file: %d.lua .......... [ok]"]def random_stage_time(duration):pos = duration.find('±')base_time = int(duration[:pos])offset_time = int(duration[pos + 1:])return base_time + (random.random() * 2 - 1) * offset_timedef random_select(l):e = random.choice(l)if e.find('%') >= 0:r = random.randint(0, 10)return e % rreturn eclass App:def __init__(self):self._stage = Noneself._stage_time = 0self._timer = 0self._init_state("normal")def _init_state(self, stage):self._stage = stageself._stage_time = random_stage_time(STAGE_CONFIG[stage]["duration"])self._timer = 0def on_tick(self, elapsed_time):self._timer += elapsed_timeif self._timer >= self._stage_time:if self._stage == "normal":logging.info(random_select(INFO_HOTRELOAD_TEMPLATE))self._init_state("after_hotreload")elif self._stage == "after_hotreload":self._init_state("normal")log_add_strength = 1 - min(1.0, (self._stage_time - self._timer) / self._stage_time)log_info_prob = STAGE_CONFIG[self._stage]["info_base_prob"] + \log_add_strength * STAGE_CONFIG[self._stage]["info_add_prob"]log_err_prob = STAGE_CONFIG[self._stage]["error_base_prob"] + \log_add_strength * STAGE_CONFIG[self._stage]["error_add_prob"]if random.random() < log_info_prob:logging.info(random_select(INFO_LOG_TEMPLATE))if random.random() < log_err_prob:logging.error(random_select(ERROR_LOG_TEMPLATE))def main():init_logging()app = App()last_time = time.time()while True:now_time = time.time()app.on_tick(now_time - last_time)last_time = now_timetime.sleep(0.01)if __name__ == "__main__":main()
解決方法:
為了使用Python來實現上述要求,我們可以編寫一個Python腳本來處理日志文件的讀取、解析、分類統計以及比較熱更新前后的錯誤日志數量。同時,我們將使用系統定時任務(如Linux下的cron
)來定期運行這個Python腳本。
步驟 1: 編寫Python腳本
首先,我們需要編寫一個Python腳本來處理日志文件。這個腳本將需要能夠:
- 讀取日志文件。
- 檢測熱更新日志(以"load file:"開頭)。
- 提取錯誤日志并根據內容進行分類統計。
- 比較熱更新前后的錯誤日志數量。
- 記錄超標的日志內容和統計信息到另一個文件。
下面是一個簡化的Python腳本示例:
import re
import os
from datetime import datetime, timedelta LOG_FILE = '/path/to/your/logfile.log'
OUTPUT_FILE = '/path/to/2.log'
TEMP_STATS_FILE = '/tmp/stats.tmp' # 模擬從日志文件中讀取行(實際中應使用文件操作)
def read_log_lines(file_path): # 這里只是模擬,實際中應打開文件并逐行讀取 with open(file_path, 'r') as file: return file.readlines() # 檢測熱更新并處理日志
def process_logs(): lines = read_log_lines(LOG_FILE) update_time = None before_counts = {} after_counts = {} # 遍歷日志行 for line in lines: if line.startswith('load file:'): # 檢測到熱更新,記錄時間并重置計數器 update_time = datetime.strptime(line.split(' ')[-1], '%Y-%m-%d %H:%M:%S') before_counts = {} after_counts = {} elif 'ERROR' in line: # 假設每行日志包含時間和級別 log_time = datetime.strptime(re.search(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', line).group(), '%Y-%m-%d %H:%M:%S') error_type = re.search(r'ERROR: (.+?)(?: \[|$)', line).group(1) if update_time and update_time - timedelta(minutes=1) <= log_time <= update_time + timedelta(minutes=1): # 在熱更新前后1分鐘內 if log_time < update_time: before_counts[error_type] = before_counts.get(error_type, 0) + 1 else: after_counts[error_type] = after_counts.get(error_type, 0) + 1 # 比較并記錄結果 for type, after_count in after_counts.items(): before_count = before_counts.get(type, 0) if after_count > before_count * 1.5: with open(OUTPUT_FILE, 'a') as file: file.write(f"Error type {type} increased significantly after update:\n") file.write(f"Before: {before_count}, After: {after_count}\n") # 這里可以添加更多邏輯來記錄具體的日志內容 # 定時任務將調用這個函數
if __name__ == '__main__': process_logs()
注意:上面的腳本有幾個簡化和假設的地方,比如直接讀取整個日志文件并假設時間戳和錯誤類型可以直接從日志行中提取。在實際應用中,你可能需要處理更復雜的日志格式和更大的日志文件。
步驟 2: 配置Cron定時任務
接下來,你需要在Linux系統上設置Cron定時任務來定期運行這個Python腳本。
- 打開終端。
- 輸入
crontab -e
命令來編輯當前用戶的Cron任務列表。 - 添加一個定時任務來每分鐘運行一次腳本(或者根據你的需求設置不同的頻率):
* * * * * /usr/bin/python3 /path/to/your/script.py
注意將/usr/bin/python3替換為你的Python解釋器的實際路徑,/path/to/your/script.py替換為你的Python腳本的實際路徑。
保存并關閉編輯器。Cron將自動加載新的任務列表。
現在,你的Python腳本將按照Cron任務的設置定期運行,檢測日志文件中的熱更新,并對錯誤