文章目錄
- **一、前期準備**
- **1. 集群健康檢查**
- **2. 備份數據**
- **3. 監控系統準備**
- **二、創建新索引并配置**
- **1. 設計新索引映射**
- **2. 創建讀寫別名**
- **三、全量數據遷移**
- **1. 執行初始 Reindex**
- **2. 監控 Reindex 進度**
- **四、增量數據同步**
- **1. 方案選擇**
- **五、雙寫切換**
- **1. 修改應用代碼實現雙寫**
- **2. 驗證雙寫一致性**
- **六、流量切換**
- **1. 只讀流量切換**
- **2. 驗證查詢結果一致性**
- **3. 寫入流量切換**
- **七、收尾工作**
- **1. 恢復新索引配置**
- **2. 驗證性能和穩定性**
- **3. 刪除舊索引(可選)**
- **八、回滾策略**
- **九、優化建議**
- **十、風險控制**
一、前期準備
1. 集群健康檢查
GET /_cluster/health
確保:
status
為green
number_of_nodes
符合預期unassigned_shards
為 0
2. 備份數據
# 注冊快照倉庫
PUT /_snapshot/my_backup
{"type": "fs","settings": {"location": "/path/to/snapshots"}
}# 創建全量快照
PUT /_snapshot/my_backup/snapshot_1?wait_for_completion=true
3. 監控系統準備
- 開啟 ES 性能監控(如使用 Elastic APM、Prometheus + Grafana)
- 設置關鍵指標告警(如集群負載、JVM 內存、磁盤使用率)
二、創建新索引并配置
1. 設計新索引映射
PUT /new_products
{"settings": {"index.number_of_shards": 5, // 與舊索引保持一致"index.number_of_replicas": 1,"index.refresh_interval": "30s", // 臨時調大,提升寫入性能"index.translog.durability": "async", // 臨時使用異步持久化"index.translog.sync_interval": "30s"},"mappings": {"properties": {"id": {"type": "keyword"},"name": {"type": "text"},"price": {"type": "double"}, // 假設原字段為 integer"create_time": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"},"tags": {"type": "keyword"}}}
}
2. 創建讀寫別名
POST /_aliases
{"actions": [{ "add": { "alias": "products_read", "index": "old_products" } },{ "add": { "alias": "products_write", "index": "old_products" } }]
}
三、全量數據遷移
1. 執行初始 Reindex
POST /_reindex?wait_for_completion=false
{"source": {"index": "old_products","size": 5000, // 每次查詢 5000 條"sort": ["_doc"] // 按文檔順序處理,避免遺漏},"dest": {"index": "new_products","op_type": "create"},"script": {"source": """// 類型轉換邏輯if (ctx._source.containsKey("price")) {ctx._source.price = Double.parseDouble(ctx._source.price.toString());}// 日期格式轉換if (ctx._source.containsKey("create_time")) {try {ctx._source.create_time = new Date(ctx._source.create_time).getTime();} catch (Exception e) {// 處理異常日期格式ctx._source.create_time = System.currentTimeMillis();}}"""}
}
2. 監控 Reindex 進度
GET /_tasks?detailed=true&actions=*reindex
四、增量數據同步
1. 方案選擇
- 方案 A:基于時間戳的定時同步(適合有
update_time
字段的場景)
POST /_reindex?wait_for_completion=false
{"source": {"index": "old_products","query": {"range": {"update_time": {"gte": "{{last_sync_time}}", // 上次同步時間"lt": "now"}}}},"dest": {"index": "new_products"}
}
- 方案 B:基于 Canal 的 binlog 訂閱(適合 ES 作為 MySQL 從庫的場景)
# 部署 Canal 客戶端
canal.deployer-1.1.5/bin/startup.sh# 配置 canal.instance.filter.regex=.*\\..* 監聽全量變更
五、雙寫切換
1. 修改應用代碼實現雙寫
在應用層同時寫入新舊索引:
// 偽代碼示例
public void indexProduct(Product product) {// 寫入舊索引esClient.index("products_write", product);// 寫入新索引(帶類型轉換)Product newProduct = convertProduct(product);esClient.index("new_products", newProduct);
}
2. 驗證雙寫一致性
// 對比同一文檔在新舊索引中的差異
GET old_products/_doc/1
GET new_products/_doc/1
六、流量切換
1. 只讀流量切換
POST /_aliases
{"actions": [{ "remove": { "alias": "products_read", "index": "old_products" } },{ "add": { "alias": "products_read", "index": "new_products" } }]
}
2. 驗證查詢結果一致性
// 對比相同查詢在新舊索引中的結果
GET products_read/_search?q=name:iphone
GET old_products/_search?q=name:iphone
3. 寫入流量切換
POST /_aliases
{"actions": [{ "remove": { "alias": "products_write", "index": "old_products" } },{ "add": { "alias": "products_write", "index": "new_products" } }]
}
七、收尾工作
1. 恢復新索引配置
PUT /new_products/_settings
{"index.refresh_interval": "1s","index.translog.durability": "request","index.translog.sync_interval": "5s"
}
2. 驗證性能和穩定性
- 監控集群負載
- 驗證業務查詢性能
- 驗證寫入吞吐量
3. 刪除舊索引(可選)
DELETE /old_products
八、回滾策略
若出現問題,可快速回滾:
POST /_aliases
{"actions": [{ "remove": { "alias": "products_read", "index": "new_products" } },{ "add": { "alias": "products_read", "index": "old_products" } },{ "remove": { "alias": "products_write", "index": "new_products" } },{ "add": { "alias": "products_write", "index": "old_products" } }]
}
九、優化建議
- 分批次遷移:對百萬級數據,按時間或 ID 范圍分批 Reindex,避免單次任務過大
- 限流控制:
POST /_reindex?wait_for_completion=false
{"source": { "index": "old_products" },"dest": { "index": "new_products" },"requests_per_second": 100 // 每秒處理 100 個請求
}
- 臨時擴容:遷移期間增加專用協調節點,減輕數據節點壓力
- 預熱緩存:遷移后對熱點數據執行預熱查詢
- 自動化腳本:使用 Python 腳本編排整個流程:
import requests
import timedef reindex_with_progress():# 啟動 reindexresponse = requests.post("http://localhost:9200/_reindex?wait_for_completion=false",json={"source": {"index": "old_products"},"dest": {"index": "new_products"}})task_id = response.json()["task"]# 監控進度while True:status = requests.get(f"http://localhost:9200/_tasks/{task_id}").json()completed = status["task"]["status"]["completed"]total = status["task"]["status"]["total"]print(f"進度: {completed}/{total} ({completed/total*100:.2f}%)")if status["completed"]:breaktime.sleep(5)reindex_with_progress()
十、風險控制
- 灰度發布:先遷移部分數據進行驗證
- 熔斷機制:設置錯誤率閾值,超過則自動停止遷移
- 預留資源:確保集群有 30% 以上的空閑資源
- 夜間執行:選擇業務低峰期執行核心操作