腳本
import logging
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch# 配置日志記錄
logging.basicConfig(filename='delete_uat_indices.log',level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s'
)# Elasticsearch 集群的連接信息
ELASTICSEARCH_HOST = "http://192.178.18.209:9200" # 修改為你的 Elasticsearch 地址
USERNAME = "elastic" # 修改為你的用戶名
PASSWORD = "7F9L%mYWjWtb" # 修改為你的密碼
INDEX_PATTERN = "*uat*" # 匹配帶有 "uat" 的索引def delete_old_indices():# 連接到 Elasticsearch 集群es = Elasticsearch([ELASTICSEARCH_HOST], basic_auth=(USERNAME, PASSWORD))# 檢查連接是否成功if not es.ping():logging.error("無法連接到 Elasticsearch 集群")returnlogging.info("成功連接到 Elasticsearch 集群")# 計算 5 天前的日期five_days_ago = datetime.now() - timedelta(days=2)date_str = five_days_ago.strftime('%Y.%m.%d')# 獲取所有索引(確保使用關鍵字參數)try:indices = es.indices.get(index="*uat*") # 使用關鍵字參數except Exception as e:logging.error(f"獲取索引列表失敗: {e}")return# 過濾出符合條件的索引indices_to_delete = []for index_name in indices.keys():if not index_name.startswith(".") and "uat" in index_name:# 提取索引名稱中的日期部分try:index_date_str = index_name.split("-")[-1] # 假設日期在索引名稱的最后部分index_date = datetime.strptime(index_date_str, '%Y.%m.%d')if index_date < five_days_ago:indices_to_delete.append(index_name)except Exception as e:logging.warning(f"無法解析索引名稱 {index_name} 的日期部分: {e}")if not indices_to_delete:logging.info(f"未找到符合條件的索引: 包含 'uat' 且日期為 {date_str}")return# 刪除符合條件的索引for index in indices_to_delete:try:es.indices.delete(index=index)logging.info(f"成功刪除索引: {index}")except Exception as e:logging.error(f"刪除索引 {index} 時失敗: {e}")if __name__ == "__main__":delete_old_indices()
定時任務
crontab -e
#每天23點30定時刪除5天前的uat環境日志
30 23 * * * /usr/local/python-3.9/bin/python3.9 /data/delete_old_indices/delete_uat_old_indices.py >/dev/null 2>&1