?目的:在生產環境把一個索引的數據同步到測試環境中
1、在生產環境導出json數據
curl -u "adims_user:xkR%cHwR5I9g" -X GET "http://172.18.251.132:9200/unify_info_mb_sp_aggregatetb_0004/_search?scroll=1m" -H 'Content-Type: application/json' -d'{"size": 100000,"query": {"bool": {"must": [{ "term": { "categoryId": 30 }},{ "term": { "factoryType": "煤炭電廠" }},{ "term": { "isDelete": 0 }},{ "term": { "countryName": "中國" }}]}}}' > initial_batch.json
2、利用python把導出的json數據轉成bulk數據
python腳本convert_to_bulk_simple.py
import json
import sysdef convert_search_to_bulk(input_file, output_file, target_index):"""將ES查詢結果轉換為bulk格式"""with open(input_file, 'r', encoding='utf-8') as f:data = json.load(f)if 'hits' not in data or 'hits' not in data['hits']:print("錯誤: 不是有效的ES查詢結果格式")return Falsehits = data['hits']['hits']print(f"找到 {len(hits)} 個文檔")with open(output_file, 'w', encoding='utf-8') as f:for hit in hits:# action行action = {"index": {"_index": target_index, "_id": hit.get('_id')}}f.write(json.dumps(action) + '\n')# document行f.write(json.dumps(hit.get('_source', {})) + '\n')# 確保以換行符結尾f.write('\n')print(f"轉換完成: {output_file}")return Trueif __name__ == "__main__":if len(sys.argv) != 4:print("使用方法: python convert_to_bulk_simple.py input.json output.json target_index")sys.exit(1)convert_search_to_bulk(sys.argv[1], sys.argv[2], sys.argv[3])
執行轉換命令:
python convert_to_bulk_simple.py initial_batch2.json bulk_data.json unify_info_mb_sp_aggregatetb_0004
3、把轉換的數據導入到測試環境
curl -u "adims_user:j0SMMmI+Rwfv" -X POST "http://192.168.168.243:9200/_bulk" -H "Content-Type: application/json" --data-binary @bulk_data.json
4、導入前后查詢數據量大小,驗證是否導入成功
curl -u "adims_user:j0SMMmI+Rwfv" -X GET "http://192.168.168.243:9200/unify_info_mb_sp_aggregatetb_0004/_count"