文檔遷移
- 1.為什么要進行 reindex 操作
- 2.Reindex 操作的本質
- 3.實際案例
- 3.1 同集群索引之間的全量數據遷移
- 3.2 同集群索引之間基于特定條件的數據遷移
- 3.2.1 源索引設置檢索條件
- 3.2.2 基于 script 腳本的索引遷移
- 3.2.3 基于預處理管道的數據遷移
- 3.3 不同集群之間的索引遷移
- 3.4 查看及取消 reindex 任務
- 3.4.1 查看 reindex 任務
- 3.4.2 取消 reindex 任務
- 4.注意事項
1.為什么要進行 reindex 操作
Reindex 是 Elasticsearch 中一種將數據從一個索引復制到另一個索引的操作,主要用途包括:
- 索引結構變更:當需要修改映射設置,但無法直接更新現有索引時。
- 數據遷移:將數據從一個索引/集群遷移到另一個索引/集群。
- 數據轉換:在遷移過程中對數據進行修改或過濾。
- 分片優化:調整分片數量或分片策略。
- 版本升級:跨大版本升級時重建索引。
2.Reindex 操作的本質
Reindex 本質上是 Elasticsearch 內部的一個 數據復制 過程,它:
- 從源索引讀取文檔。
- 可選地對文檔進行轉換。
- 將文檔寫入目標索引。
- 不是簡單的文件復制,而是重新索引文檔的過程。
3.實際案例
3.1 同集群索引之間的全量數據遷移
場景:將 old_index
的所有數據遷移到新建的 new_index
,因為需要修改分片數量。
生成測試數據。
// 創建源索引 old_index
PUT old_index
{"mappings": {"properties": {"name": { "type": "text" },"age": { "type": "integer" },"email": { "type": "keyword" }}}
}// 批量插入測試數據
POST old_index/_bulk
{"index":{}}
{"name":"John Doe","age":28,"email":"john@example.com"}
{"index":{}}
{"name":"Jane Smith","age":32,"email":"jane@example.com"}
{"index":{}}
{"name":"Bob Johnson","age":45,"email":"bob@example.com"}
{"index":{}}
{"name":"Alice Brown","age":23,"email":"alice@example.com"}
{"index":{}}
{"name":"Tom Wilson","age":37,"email":"tom@example.com"}
執行遷移操作。
POST _reindex
{"source": {"index": "old_index"},"dest": {"index": "new_index"}
}
說明:這是最基本的 reindex 操作,將源索引所有文檔復制到目標索引。
3.2 同集群索引之間基于特定條件的數據遷移
3.2.1 源索引設置檢索條件
場景:只遷移 old_index
中 status
字段為 active
的文檔。
生成測試數據。
// 創建帶 status 字段的索引
PUT status_index
{"mappings": {"properties": {"name": { "type": "text" },"status": { "type": "keyword" },"value": { "type": "integer" }}}
}// 批量插入測試數據
POST status_index/_bulk
{"index":{}}
{"name":"Item 1","status":"active","value":100}
{"index":{}}
{"name":"Item 2","status":"inactive","value":200}
{"index":{}}
{"name":"Item 3","status":"active","value":150}
{"index":{}}
{"name":"Item 4","status":"pending","value":300}
{"index":{}}
{"name":"Item 5","status":"active","value":250}
執行遷移操作。
POST _reindex
{"source": {"index": "status_index","query": {"term": {"status": "active"}}},"dest": {"index": "status_index_new"}
}
3.2.2 基于 script 腳本的索引遷移
場景:遷移時修改字段,例如將 price
字段值增加 10 % 10\% 10%。
生成測試數據。
// 創建產品索引
PUT products
{"mappings": {"properties": {"name": { "type": "text" },"price": { "type": "double" },"category": { "type": "keyword" }}}
}// 批量插入測試數據
POST products/_bulk
{"index":{}}
{"name":"Laptop","price":1000.00,"category":"electronics"}
{"index":{}}
{"name":"Smartphone","price":700.00,"category":"electronics"}
{"index":{}}
{"name":"Desk Chair","price":150.00,"category":"furniture"}
{"index":{}}
{"name":"Coffee Mug","price":10.00,"category":"kitchen"}
{"index":{}}
{"name":"Notebook","price":5.00,"category":"stationery"}
執行遷移操作。
POST _reindex
{"source": {"index": "products"},"dest": {"index": "products_new"},"script": {"source": "ctx._source.price *= 1.10"}
}
3.2.3 基于預處理管道的數據遷移
場景:在遷移過程中使用預處理管道處理數據,例如添加時間戳。
生成測試數據。
// 創建原始日志索引
PUT raw_logs
{"mappings": {"properties": {"message": { "type": "text" },"level": { "type": "keyword" },"source": { "type": "keyword" }}}
}// 批量插入測試日志數據
POST raw_logs/_bulk
{"index":{}}
{"message":"User logged in","level":"INFO","source":"auth-service"}
{"index":{}}
{"message":"Failed authentication attempt","level":"WARN","source":"auth-service"}
{"index":{}}
{"message":"Database connection lost","level":"ERROR","source":"db-service"}
{"index":{}}
{"message":"Cache refreshed successfully","level":"INFO","source":"cache-service"}
{"index":{}}
{"message":"High memory usage detected","level":"WARN","source":"monitoring-service"}
執行遷移操作。
PUT _ingest/pipeline/add_timestamp
{"description": "Adds a timestamp to documents","processors": [{"set": {"field": "@timestamp","value": "{{_ingest.timestamp}}"}}]
}POST _reindex
{"source": {"index": "raw_logs"},"dest": {"index": "raw_logs_new","pipeline": "add_timestamp"}
}
3.3 不同集群之間的索引遷移
場景:將集群 A 的 cluster_a_index
遷移到集群 B 的 cluster_b_index
。
生成測試數據。
// 創建模擬跨集群遷移的源索引
PUT cluster_a_index
{"mappings": {"properties": {"title": { "type": "text" },"views": { "type": "integer" },"published": { "type": "date" }}}
}// 批量插入測試數據
POST cluster_a_index/_bulk
{"index":{}}
{"title":"Introduction to Elasticsearch","views":1500,"published":"2023-01-15"}
{"index":{}}
{"title":"Kibana Dashboard Tutorial","views":3200,"published":"2023-02-20"}
{"index":{}}
{"title":"Advanced Logstash Pipelines","views":875,"published":"2023-03-10"}
{"index":{}}
{"title":"Elasticsearch Performance Tuning","views":2100,"published":"2023-04-05"}
{"index":{}}
{"title":"Machine Learning with ELK","views":1800,"published":"2023-05-12"}
在集群 B 上執行遷移操作。
// 在集群 B 上執行
POST _reindex
{"source": {"remote": {"host": "http://clusterA:9200","username": "admin","password": "xxxxxx"},"index": "cluster_a_index"},"dest": {"index": "cluster_b_index"}
}
注意事項(非常重要):
- ? 需要配置遠程集群白名單。
elasticsearch.yml
中的reindex.remote.whitelist
配置項。
- ? 網絡連接必須暢通,比如:
- 是否在同一 VPC 下;
- 以何種方式訪問,HTTP 還是 HTTPS(如果是 HTTPS,可能需要配置證書)。
- ? 大數據量遷移建議使用快照/恢復方式更高效。
3.4 查看及取消 reindex 任務
生成測試數據。
// 創建一個大索引用于測試長時間運行的 reindex 任務
PUT large_source_index
{"mappings": {"properties": {"data": { "type": "text" },"counter": { "type": "integer" }}}
}// 批量插入大量測試數據(1000條)
POST _scripts/generate_large_data
{"script": {"lang": "painless","source": """def bulk = new StringBuilder();for (int i = 0; i < 1000; i++) {bulk.append('{"index":{}}\n');bulk.append('{"data":"Test data ' + i + '","counter":' + i + '}\n');}return bulk.toString();"""}
}POST large_source_index/_bulk
{ "script": { "id": "generate_large_data" } }
3.4.1 查看 reindex 任務
GET _tasks?detailed=true&actions=*reindex
3.4.2 取消 reindex 任務
POST _tasks/{task_id}/_cancel
示例:
// 先查看任務
GET _tasks?detailed=true&actions=*reindex// 返回結果中獲取任務ID后取消
POST _tasks/oTUltX4IQMOUUVeiohTt8A:12345/_cancel
4.注意事項
- Reindex 會占用大量資源,建議在低峰期執行
- 大數據量 reindex 建議使用
slices
并行處理 - 可以使用
wait_for_completion=false
異步執行 - 監控任務進度,避免影響集群性能
- 考慮版本兼容性問題,特別是跨大版本遷移時
Reindex 是 Elasticsearch 數據管理的重要工具,合理使用可以解決許多數據遷移和結構調整問題。