公司企微聊天數據存儲在 ES 中,雖然按照企業分儲在不同的ES 索引中,但某些常用的企微主體使用量還是很大。4年中一個索引存儲數據已經達到46多億條數據,占用存儲3.1tb,
ES 配置
由于多一個副本,存儲得翻倍,成本考慮,所以沒有設置副本分片(不建議你們這么做)
索引拆分后,只需要修改插入時的代碼邏輯,設置好別名后,查詢代碼是不需要改動的
進入主題,拆分索引 ,數據按年進行拆分
1.設置索引模板
PUT _template/chat_message_template
{"index_patterns": ["chat-message-*"],"settings": {"number_of_shards": 15, // 這里和集群的節點對應,需要是節點的整數倍"number_of_replicas": 0, // 分片副本,生產環境最好設置>=1"refresh_interval": "20s","codec": "best_compression","max_result_window": "100000000"},"mappings": {"properties": {"msg_id": {"type": "keyword"},"msg_time": {"type": "long"},"msg_type": {"type": "keyword"},"text": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart","term_vector": "with_positions_offsets"},"image": {"type": "text"},.........}}
}
2.根據模板新建索引
PUT chat-message-2021
3.reindex 到新索引
!!重要!!
# reindex 前查看磁盤是否夠用,索引切分后,占用磁盤大小比一個索引大了一些
# 盡量多留一些空間給新索引 , 擴容前磁盤占用40%左右是個不錯的選擇,請提前進行擴容
# 我們遇到了空間不夠用的情況,后擴容,雖然說是滾動擴容,客服說任務可能會取消,但擴容后任務還在,所以盡量提前把空間擴容好
# 查看各節點的分片數量及磁盤使用
GET /_cat/nodes?v&h=name,shards,disk.used_percent
# 結果
name disk.used_percent
es-cn-**-data-g4-3 86.12
es-cn-**-data-g4-2 81.75
es-cn-**-data-g4-0 85.94
es-cn-**-data-g4-4 85.73
es-cn-**-data-g4-1 85.67
reindex命令
?# 限流保護?:添加?requests_per_second=1000參數避免集群過載?
# 異步執行,會直接返回 taskId: wait_for_completion
# 執行完后,修改以下 msg_time 再執行,可以并行遷移每年的數據POST _reindex?requests_per_second=1000&wait_for_completion=false
{"conflicts": "proceed", // 默認情況下,當發生version conflict的時候,_reindex會被abort。解決方案設置為“proceed”:"source": {"index": "chat-message-0613","size":5000,"query": {"range": {"msg_time": {"gte": 1609430400000,"lt": 1610380800111}}}},"dest": {"index": "chat-message-2021","op_type": "create" // 把op_type設置為create,_reindex API,只在dest index中添加不不存在的doucments。如果相同的documents已經存在,則會報version confilct的錯誤。}
}
4.查看進度
GET _tasks?detailed=true&actions=*reindex
# 返回結果
{"nodes" : {"z5VL_HJ2Qn****AMQ" : {"name" : "es-cn-**-data-g4-3","transport_address" : "121.**.114.80:9300","host" : "121.**.114.80","ip" : "121.**.114.80:9300","roles" : ["data","ingest","master","ml","remote_cluster_client","transform"],"attributes" : {"zone_id" : "cn-shanghai-g","ml.machine_memory" : "64887980032","ml.max_open_jobs" : "20","xpack.installed" : "true","zone" : "cn-shanghai-g","transform.node" : "true"},"tasks" : {"z5VL_HJ2Qn****YhoAMQ:6597302" : {"node" : "z5VL_HJ2Qn****YhoAMQ","id" : 6597302,"type" : "transport","action" : "indices:data/write/reindex","status" : {"total" : 484234,"updated" : 0,"created" : 0,"deleted" : 0,"batches" : 11,"version_conflicts" : 55000,"noops" : 0,"retries" : {"bulk" : 0,"search" : 0},"throttled_millis" : 99999,"requests_per_second" : 1000.0,"throttled_until_millis" : 2410},"description" : "reindex from [chat-message-0613] to [chat-message-2021][_doc]","start_time_in_millis" : 1742189556123,"running_time_in_nanos" : 109710051531,"cancellable" : true,"headers" : {"trace_id" : "ywiWopUBbHoVT9Iz8TeX"}}}}}
}
5.查看文檔數量是否相同
把數據和 kibana 中索引文檔數據比對即可
例:查看2021年的數據量
GET chat-message-0613/_count
{"query": {"range": {"msg_time": {"gte": 1609459200000,"lt": 1640966400000}}}
}
# 數量對的上的話,可以把老索引給刪除了,釋放磁盤,在 kibana 中操作即可
6.修改別名
先刪除之前的別名,把老的別名放到新建的索引上
POST _aliases
{"actions" : [{"remove" : {"index" : "chat-message-0613" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2021" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2022" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2023" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2024" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2025" , "alias" : "chat-message"}}]
}
修改別名后,ES 查詢部分不用修改
取消reindex命令
POST _tasks/z5VL_HJ2Qn****YhoAMQ:6597302/_cancel
插入文檔數據的部分邏輯代碼
func (s *ElasticService) CreateIndex(index, alias, mapping string) (err error) {// 判斷索引是否存在exists, err := sys.Elastic().IndexExists(index).Do(context.Background())if err != nil {sys.Log().Error(sys.NewProjectErr(1000401).Error() + index)return}if exists {// 加鎖,說明此索引已經存在_, _ = model.Factory.Lock.AddLock(model.LockSync, "mapping:ext:"+index, 370*24*3600)return}defer func() {model.Factory.Lock.UnLocK(model.LockSync, "mapping:create:"+index)}()// 多線程創建防止出錯,加鎖ok, err := model.Factory.Lock.AddLock(model.LockSync, "mapping:create:"+index, 120)if err != nil {return}if !ok {err = errors.New("未搶占到鎖")return}// 創建indexcreateIndex, err := sys.Elastic().CreateIndex(index).Body(mapping).Do(context.Background())if err != nil {sys.Log().Error(sys.NewProjectErr(1000401).Error() + index)return}if !createIndex.Acknowledged {// Not acknowledgedsys.Log().Error(sys.NewProjectErr(1000401).Error() + index)return}// 創建aliasputAlias, err := sys.Elastic().Alias().Add(index, alias).Do(context.Background())if err != nil {sys.Log().Error(sys.NewProjectErr(1000402).Error() + alias)return}// 可選:檢查別名操作是否被集群確認(按需添加)if !putAlias.Acknowledged {sys.Log().Error("Alias creation not acknowledged: " + alias)return}return
}// 數據按年切分后請求此方法
func (s *ElasticService) Bulk(index, alias string, chatMsg []model.ChatMessage) (bulkResponseItem []*elastic.BulkResponseItem, err error) {tryTime := 0
CreateIndex:// 分布鎖ok, err := model.Factory.Lock.ExistLock(model.LockSync, "mapping:ext:"+index)if err != nil {return}if !ok {errCreate := s.CreateIndex(index, alias, ChatMessageMapping)if errCreate != nil {if tryTime < 10 {tryTime++time.Sleep(time.Second)goto CreateIndex} else {err = errCreatereturn}}}bulkRequest := sys.Elastic().Bulk().........}
最后
整個遷移非常費時間。 讓grok 幫我算了下(ps. 同樣的提問,國內大模型都沒算出來 ,還是MASK的強啊)