SpringCloud實用篇7——深入elasticsearch

目錄

  • 1 數據聚合
    • 1.1 聚合的種類
    • 1.2 DSL實現聚合
      • 1.2.1 Bucket聚合語法
      • 1.2.2 聚合結果排序
      • 1.2.3 限定聚合范圍
      • 1.2.4 Metric聚合語法
      • 1.2.5.小結
    • 1.3 RestAPI實現聚合
      • 1.3.1 API語法
      • 1.3.2 業務需求
      • 1.3.3 業務實現
  • 2 自動補全
    • 2.1 拼音分詞器
    • 2.2 自定義分詞器
    • 2.3 自動補全查詢
    • 2.4 實現酒店搜索框自動補全
      • 2.4.1 修改酒店映射結構
      • 2.4.2 修改HotelDoc實體類
      • 2.4.3 重新導入
      • 2.4.4 自動補全查詢的JavaAPI
      • 2.4.5 實現搜索框自動補全
  • 3 數據同步
    • 3.1 思路分析
      • 3.1.1 同步調用
      • 3.1.2 異步通知
      • 3.1.3 監聽binlog
      • 3.1.4 三種方式的優缺點
    • 3.2 實現數據同步
      • 3.2.1 思路
      • 3.2.2 導入demo
      • 3.2.3 聲明交換機、隊列
      • 3.2.4 發送MQ消息
      • 3.2.5 接收MQ消息
      • 3.2.5 啟動并測試數據同步功能
  • 4.集群
    • 4.1 搭建ES集群
      • 4.1.1 創建es集群
      • 4.1.2 集群狀態監控
      • 4.1.3 創建索引庫
      • 4.1.4 查看分片效果
    • 4.2 集群腦裂問題
      • 4.2.1 集群職責劃分
      • 4.2.2.腦裂問題
      • 4.2.3 小結
    • 4.3 集群分布式存儲
      • 4.3.1 分片存儲測試
      • 4.3.2 分片存儲原理
    • 4.4 集群分布式查詢
    • 4.5.集群故障轉移

1 數據聚合

聚合(aggregations) 可以讓我們極其方便的實現對數據的統計、分析、運算。例如:

  • 什么品牌的手機最受歡迎?
  • 這些手機的平均價格、最高價格、最低價格?
  • 這些手機每月的銷售情況如何?

實現這些統計功能的比數據庫的sql要方便的多,而且查詢速度非常快,可以實現近實時搜索效果。

1.1 聚合的種類

聚合常見的有三類:

  • 桶(Bucket) 聚合:用來對文檔做分組

    • TermAggregation:按照文檔字段值分組,例如按照品牌值分組、按照國家分組
    • Date Histogram:按照日期階梯分組,例如一周為一組,或者一月為一組
  • 度量(Metric) 聚合:用以計算一些值,比如:最大值、最小值、平均值等

    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stats:同時求max、min、avg、sum等
  • 管道(pipeline) 聚合:其它聚合的結果為基礎做聚合

注意: 參加聚合的字段必須是keyword、日期、數值、布爾類型,絕對不能是text類型

在這里插入圖片描述

1.2 DSL實現聚合

現在,我們要統計所有數據中的酒店品牌有幾種,其實就是按照品牌對數據分組。此時可以根據酒店品牌的名稱做聚合,也就是Bucket聚合。

1.2.1 Bucket聚合語法

語法如下:

GET /hotel/_search
{"size": 0,  // 設置size為0,結果中不包含文檔,只包含聚合結果"aggs": { // 定義聚合"brandAgg": { //給聚合起個名字"terms": { // 聚合的類型,按照品牌值聚合,所以選擇term"field": "brand", // 參與聚合的字段"size": 20 // 希望獲取的聚合結果數量}}}
}

結果如圖:

在這里插入圖片描述

1.2.2 聚合結果排序

默認情況下,Bucket聚合會統計Bucket內的文檔數量,記為 _count ,并且按照_count降序排序。

一般情況下我們使用默認的降序排序就可以了,但是我們可以指定order屬性,自定義聚合的排序方式

GET /hotel/_search
{"size": 0, "aggs": {"brandAgg": {"terms": {"field": "brand","order": {"_count": "asc" // 按照_count升序排列},"size": 20}}}
}

1.2.3 限定聚合范圍

默認情況下,Bucket聚合是對索引庫的所有文檔做聚合,但真實場景下,用戶會輸入搜索條件,因此聚合必須是對搜索結果聚合。那么聚合必須添加限定條件

我們可以限定要聚合的文檔范圍,只要添加query條件即可:

GET /hotel/_search
{"query": {"range": {"price": {"lte": 200 // 只對200元以下的文檔聚合}}}, "size": 0, "aggs": {"brandAgg": {"terms": {"field": "brand","size": 20}}}
}

這次,聚合得到的品牌明顯變少了:

在這里插入圖片描述

1.2.4 Metric聚合語法

我們對酒店按照品牌分組,形成了一個個桶。現在我們需要對桶內的酒店做運算,獲取每個品牌的用戶評分的min、max、avg等值。

這就要用到Metric聚合了,例如stat聚合:就可以獲取min、max、avg等結果。

語法如下:

GET /hotel/_search
{"size": 0, "aggs": {"brandAgg": { "terms": { "field": "brand", "size": 20},"aggs": { // 是brands聚合的子聚合,也就是分組后對每組分別計算"score_stats": { // 聚合名稱"stats": { // 聚合類型,這里stats可以計算min、max、avg等"field": "score" // 聚合字段,這里是score}}}}}
}

這次的score_stats聚合是在brandAgg的聚合內部嵌套的子聚合。因為我們需要在每個桶分別計算。

另外,我們還可以給聚合結果做個排序,例如按照每個桶的酒店平均分做排序:

在這里插入圖片描述

1.2.5.小結

aggs代表聚合,與query同級,此時query的作用是?

  • 限定聚合的的文檔范圍

聚合必須的三要素:

  • 聚合名稱
  • 聚合類型
  • 聚合字段

聚合可配置屬性有:

  • size:指定聚合結果數量
  • order:指定聚合結果排序方式
  • field:指定聚合字段

1.3 RestAPI實現聚合

1.3.1 API語法

聚合條件與query條件同級別,因此需要使用request.source()來指定聚合條件。

聚合條件的語法:

在這里插入圖片描述

聚合的結果也與查詢結果不同,API也比較特殊。不過同樣是JSON逐層解析:

在這里插入圖片描述

代碼:

@Test
void testAgg() throws IOException {// 1.準備請求SearchRequest request = new SearchRequest("hotel");// 2.請求參數// 2.1.sizerequest.source().size(0);// 2.2.聚合request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(20));// 3.發出請求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析結果Aggregations aggregations = response.getAggregations();// 4.1.根據聚合名稱,獲取聚合結果Terms brandAgg = aggregations.get("brandAgg");// 4.2.獲取bucketsList<? extends Terms.Bucket> buckets = brandAgg.getBuckets();// 4.3.遍歷for (Terms.Bucket bucket : buckets) {String brandName = bucket.getKeyAsString();System.out.println("brandName = " + brandName);long docCount = bucket.getDocCount();System.out.println("docCount = " + docCount);}
}

注意:response.getAggregations().get(“brandAgg”)的返回結果要是Terms,注意包別導錯了,導es的包。

1.3.2 業務需求

需求:搜索頁面的品牌、城市等信息不應該是在頁面寫死,而是通過聚合索引庫中的酒店數據得來的

在這里插入圖片描述

分析:

目前,頁面的城市列表、星級列表、品牌列表都是寫死的,并不會隨著搜索結果的變化而變化。但是用戶搜索條件改變時,搜索結果會跟著變化。

例如:用戶搜索“東方明珠”,那搜索的酒店肯定是在上海東方明珠附近,因此,城市只能是上海,此時城市列表中就不應該顯示北京、深圳、杭州這些信息了。

也就是說,搜索結果中包含哪些城市,頁面就應該列出哪些城市;搜索結果中包含哪些品牌,頁面就應該列出哪些品牌。

如何得知搜索結果中包含哪些品牌?如何得知搜索結果中包含哪些城市?

使用聚合功能,利用Bucket聚合,對搜索結果中的文檔基于品牌分組、基于城市分組,就能得知包含哪些品牌、哪些城市了。

因為是對搜索結果聚合,因此聚合是限定范圍的聚合,也就是說聚合的限定條件跟搜索文檔的條件一致。

查看瀏覽器可以發現,前端其實已經發出了這樣的一個請求:

在這里插入圖片描述

請求參數與搜索文檔的參數完全一致

返回值類型就是頁面要展示的最終結果:

在這里插入圖片描述

結果是一個Map結構

key是字符串,城市、星級、品牌、價格

value是集合,例如多個城市的名稱

1.3.3 業務實現

在cn.itcast.hotel.web包的HotelController中添加一個方法,遵循下面的要求:

請求方式:POST

請求路徑:/hotel/filters

請求參數:RequestParams,與搜索文檔的參數一致

返回值類型:Map<String, List<String>>

代碼:

@PostMapping("filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams params){return hotelService.getFilters(params);
}

這里調用了IHotelService中的getFilters方法,尚未實現。

在cn.itcast.hotel.service.IHotelService中定義新方法:

Map<String, List<String>> filters(RequestParams params);

在cn.itcast.hotel.service.impl.HotelService中實現該方法:

@Override
public Map<String, List<String>> filters(RequestParams params) {try {// 1.準備RequestSearchRequest request = new SearchRequest("hotel");// 2.準備DSL// 2.1.querybuildBasicQuery(params, request);// 2.2.設置sizerequest.source().size(0);// 2.3.聚合buildAggregation(request);// 3.發出請求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析結果Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();// 4.1.根據品牌名稱,獲取品牌結果List<String> brandList = getAggByName(aggregations, "brandAgg");result.put("品牌", brandList);// 4.2.根據品牌名稱,獲取品牌結果List<String> cityList = getAggByName(aggregations, "cityAgg");result.put("城市", cityList);// 4.3.根據品牌名稱,獲取品牌結果List<String> starList = getAggByName(aggregations, "starAgg");result.put("星級", starList);
?return result;} catch (IOException e) {throw new RuntimeException(e);}
}
?
private void buildAggregation(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));
}
?
private List<String> getAggByName(Aggregations aggregations, String aggName) {// 4.1.根據聚合名稱獲取聚合結果Terms brandTerms = aggregations.get(aggName);// 4.2.獲取bucketsList<? extends Terms.Bucket> buckets = brandTerms.getBuckets();// 4.3.遍歷List<String> brandList = new ArrayList<>();for (Terms.Bucket bucket : buckets) {// 4.4.獲取keyString key = bucket.getKeyAsString();brandList.add(key);}return brandList;
}

2 自動補全

當用戶在搜索框輸入字符時,我們應該提示出與該字符有關的搜索項,如圖:

在這里插入圖片描述

這種根據用戶輸入的字母,提示完整詞條的功能,就是自動補全了。

因為需要根據拼音字母來推斷,因此要用到拼音分詞功能。

2.1 拼音分詞器

要實現根據字母做補全,就必須對文檔按照拼音分詞。在GitHub上有elasticsearch的拼音分詞插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin

在這里插入圖片描述

安裝方式與IK分詞器一樣,分三步:

? ① 解壓

? ② 上傳到虛擬機中,elasticsearch的plugin目錄
在這里插入圖片描述

? ③ 重啟elasticsearch

? ④ 測試

詳細安裝步驟可以參考IK分詞器的安裝過程。

測試用法如下:

POST /_analyze
{"text": "如家酒店還不錯","analyzer": "pinyin"
}

結果:

在這里插入圖片描述

2.2 自定義分詞器

默認的拼音分詞器會將每個漢字單獨分為拼音,而我們希望的是每個詞條形成一組拼音,需要對拼音分詞器做個性化定制,形成自定義分詞器。

elasticsearch中分詞器(analyzer)的組成包含三部分:

  • character filters:在tokenizer之前對文本進行處理。例如刪除字符、替換字符
  • tokenizer:將文本按照一定的規則切割成詞條(term)。例如keyword,就是不分詞;還有ik_smart
  • tokenizer filter將tokenizer輸出的詞條做進一步處理。例如大小寫轉換、同義詞處理、拼音處理等

文檔分詞時會依次由這三部分來處理文檔:

在這里插入圖片描述

聲明自定義分詞器的語法如下:
我們可以在創建索引庫時,通過settings來配置自定義的analyzer(分詞器)

PUT /test
{"settings": {"analysis": {"analyzer": { // 自定義分詞器"my_analyzer": {  // 分詞器名稱"tokenizer": "ik_max_word","filter": "py"}},"filter": { // 自定義tokenizer filter"py": { // 過濾器名稱"type": "pinyin", // 過濾器類型,這里是pinyin"keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"name": {"type": "text","analyzer": "my_analyzer","search_analyzer": "ik_smart"}}}
}

測試:

在這里插入圖片描述

  • 更多的自定義可以查看文檔使用
  • 拼音分詞器適合在創建索引時使用,不能在搜索時候用,如果搜索時用拼音分詞器,搜索"獅子愛跳舞",會搜出"虱子"等同音字。
    在這里插入圖片描述
    因此在使用ik+拼音過濾的分詞器時,建議創建的字段的索引分詞器設為自定義分詞器,搜索分詞器設為ik分詞器。防止搜索時搜出拼音諧音的情況。

總結:

如何使用拼音分詞器?

  • ① 下載pinyin分詞器

  • ② 解壓并放到elasticsearch的plugin目錄

  • ③ 重啟即可

如何自定義分詞器?

  • 創建索引庫時,在settings中配置,可以包含三部分:

    • character filter

    • tokenizer

    • filter

拼音分詞器注意事項?

  • 為了避免搜索到同音字,搜索時不要使用拼音分詞器

2.3 自動補全查詢

elasticsearch提供了Completion Suggester查詢來實現自動補全功能。這個查詢會匹配以用戶輸入內容開頭的詞條并返回。為了提高補全查詢的效率,對于文檔中字段的類型有一些約束:

  • 參與補全查詢的字段必須是completion類型。

  • 字段的內容一般是用來補全的多個詞條形成的數組。

比如,一個這樣的索引庫:

// 創建索引庫
PUT test
{"mappings": {"properties": {"title":{"type": "completion"}}}
}

然后插入下面的數據:

// 示例數據
POST test/_doc
{"title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{"title": ["SK-II", "PITERA"]
}
POST test/_doc
{"title": ["Nintendo", "switch"]
}

查詢的DSL語句如下:

// 自動補全查詢
GET /test/_search
{"suggest": {"title_suggest": {"text": "s", // 關鍵字"completion": {"field": "title", // 補全查詢的字段"skip_duplicates": true, // 跳過重復的"size": 10 // 獲取前10條結果}}}
}
  • 參與補全查詢的字段必須是completion類型,數據是字符串數組。completion譯為完成
  • 字段的內容一般是用來補全的多個詞條形成的數組

2.4 實現酒店搜索框自動補全

現在,我們的hotel索引庫還沒有設置拼音分詞器,需要修改索引庫中的配置。但是我們知道索引庫是無法修改的,只能刪除然后重新創建

另外,我們需要添加一個字段,用來做自動補全,將brand、suggestion、city等都放進去,作為自動補全的提示。

因此,總結一下,我們需要做的事情包括:

  1. 修改hotel索引庫結構,設置自定義拼音分詞器

  2. 修改索引庫的name、all字段,使用自定義分詞器

  3. 索引庫添加一個新字段suggestion,類型為completion類型,使用自定義的分詞器

  4. 給HotelDoc類添加suggestion字段,內容包含brand、business

  5. 重新導入數據到hotel庫

2.4.1 修改酒店映射結構

代碼如下:

// 酒店數據索引庫
PUT /hotel
{"settings": {"analysis": {"analyzer": {"text_anlyzer": {"tokenizer": "ik_max_word","filter": "py"},"completion_analyzer": {"tokenizer": "keyword","filter": "py"}},"filter": {"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"id":{"type": "keyword"},"name":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart","copy_to": "all"},"address":{"type": "keyword","index": false},"price":{"type": "integer"},"score":{"type": "integer"},"brand":{"type": "keyword","copy_to": "all"},"city":{"type": "keyword"},"starName":{"type": "keyword"},"business":{"type": "keyword","copy_to": "all"},"location":{"type": "geo_point"},"pic":{"type": "keyword","index": false},"all":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart"},"suggestion":{"type": "completion","analyzer": "completion_analyzer"}}}
}

2.4.2 修改HotelDoc實體類

HotelDoc中要添加一個字段,用來做自動補全,內容可以是酒店品牌、城市、商圈等信息。按照自動補全字段的要求,最好是這些字段的數組。

因此我們在HotelDoc中添加一個suggestion字段,類型為List<String>,然后將brand、city、business等信息放到里面。

代碼如下:

package cn.itcast.hotel.pojo;import lombok.Data;
import lombok.NoArgsConstructor;import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;@Data
@NoArgsConstructor
public class HotelDoc {private Long id;private String name;private String address;private Integer price;private Integer score;private String brand;private String city;private String starName;private String business;private String location;private String pic;private Object distance;private Boolean isAD;private List<String> suggestion;public HotelDoc(Hotel hotel) {this.id = hotel.getId();this.name = hotel.getName();this.address = hotel.getAddress();this.price = hotel.getPrice();this.score = hotel.getScore();this.brand = hotel.getBrand();this.city = hotel.getCity();this.starName = hotel.getStarName();this.business = hotel.getBusiness();this.location = hotel.getLatitude() + ", " + hotel.getLongitude();this.pic = hotel.getPic();// 組裝suggestionif(this.business.contains("/")){// business有多個值,需要切割String[] arr = this.business.split("/");// 添加元素this.suggestion = new ArrayList<>();this.suggestion.add(this.brand);Collections.addAll(this.suggestion, arr);}else {this.suggestion = Arrays.asList(this.brand, this.business);}}
}

2.4.3 重新導入

重新執行之前編寫的導入數據功能,可以看到新的酒店數據中包含了suggestion:

在這里插入圖片描述

測試一下:
在這里插入圖片描述

2.4.4 自動補全查詢的JavaAPI

之前我們學習了自動補全查詢的DSL,而沒有學習對應的JavaAPI,這里給出一個示例:

在這里插入圖片描述

而自動補全的結果也比較特殊,解析的代碼如下:

在這里插入圖片描述

2.4.5 實現搜索框自動補全

查看前端頁面,可以發現當我們在輸入框鍵入時,前端會發起ajax請求:
在這里插入圖片描述

返回值是補全詞條的集合,類型為List<String>

1)在cn.itcast.hotel.web包下的HotelController中添加新接口,接收新的請求:

@GetMapping("suggestion")
public List<String> getSuggestions(@RequestParam("key") String prefix) {return hotelService.getSuggestions(prefix);
}

2)在cn.itcast.hotel.service包下的IhotelService中添加方法:

List<String> getSuggestions(String prefix);

3)在cn.itcast.hotel.service.impl.HotelService中實現該方法:

@Override
public List<String> getSuggestions(String prefix) {try {// 1.準備RequestSearchRequest request = new SearchRequest("hotel");// 2.準備DSLrequest.source().suggest(new SuggestBuilder().addSuggestion("suggestions",SuggestBuilders.completionSuggestion("suggestion").prefix(prefix).skipDuplicates(true).size(10)));// 3.發起請求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析結果Suggest suggest = response.getSuggest();// 4.1.根據補全查詢名稱,獲取補全結果CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");// 4.2.獲取optionsList<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();// 4.3.遍歷List<String> list = new ArrayList<>(options.size());for (CompletionSuggestion.Entry.Option option : options) {String text = option.getText().toString();list.add(text);}return list;} catch (IOException e) {throw new RuntimeException(e);}
}

3 數據同步

elasticsearch中的酒店數據來自于mysql數據庫,因此mysql數據發生改變時,elasticsearch也必須跟著改變,這個就是elasticsearch與mysql之間的數據同步
在這里插入圖片描述

3.1 思路分析

常見的數據同步方案有三種:

  • 同步調用
  • 異步通知
  • 監聽binlog

3.1.1 同步調用

方案一:同步調用

在這里插入圖片描述

基本步驟如下:

  • hotel-demo對外提供接口,用來修改elasticsearch中的數據
  • 酒店管理服務在完成數據庫操作后,直接調用hotel-demo提供的接口,

3.1.2 異步通知

方案二:異步通知

在這里插入圖片描述

流程如下:

  • hotel-admin對mysql數據庫數據完成增、刪、改后,發送MQ消息
  • hotel-demo監聽MQ,接收到消息后完成elasticsearch數據修改

3.1.3 監聽binlog

方案三:監聽binlog

在這里插入圖片描述

流程如下:

  • 給mysql開啟binlog功能
  • mysql完成增、刪、改操作都會記錄在binlog中
  • hotel-demo基于canal監聽binlog變化,實時更新elasticsearch中的內容

3.1.4 三種方式的優缺點

方式一:同步調用

  • 優點:實現簡單,粗暴
  • 缺點:業務耦合度高

方式二:異步通知

  • 優點:低耦合,實現難度一般
  • 缺點:依賴mq的可靠性

方式三:監聽binlog

  • 優點:完全解除服務間耦合
  • 缺點:開啟binlog增加數據庫負擔、實現復雜度高

3.2 實現數據同步

3.2.1 思路

資料提供的hotel-admin項目作為酒店管理的微服務。當酒店數據發生增、刪、改時,要求對elasticsearch中數據也要完成相同操作。

步驟:

  • 導入提供的hotel-admin項目,啟動并測試酒店數據的CRUD

  • 聲明exchange、queue、RoutingKey

  • 在hotel-admin中的增、刪、改業務中完成消息發送

  • 在hotel-demo中完成消息監聽,并更新elasticsearch中數據

  • 啟動并測試數據同步功能

3.2.2 導入demo

導入課前資料提供的hotel-admin項目,運行后,訪問 http://localhost:8099

在這里插入圖片描述

其中包含了酒店的CRUD功能:

在這里插入圖片描述

3.2.3 聲明交換機、隊列

MQ結構如圖:

在這里插入圖片描述

  1. 引入依賴,修改yml文件

在hotel-admin、hotel-demo中引入rabbitmq的依賴,修改yml文件:

<!--amqp-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:rabbitmq:host: 192.168.1.189port: 5672username: itcastpassword: 123321virtual-host: / #虛擬主機
  1. 聲明隊列交換機名稱

在hotel-admin和hotel-demo中的cn.itcast.hotel.constatnts包下新建一個類MqConstants

package cn.itcast.hotel.constatnts;public class MqConstants {/*** 交換機*/public final static String HOTEL_EXCHANGE = "hotel.topic";/*** 監聽新增和修改的隊列*/public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";/*** 監聽刪除的隊列*/public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";/*** 新增或修改的RoutingKey*/public final static String HOTEL_INSERT_KEY = "hotel.insert";/*** 刪除的RoutingKey*/public final static String HOTEL_DELETE_KEY = "hotel.delete";
}

在RestClient的API中,全量修改與新增的API完全一致,判斷依據是ID: 如果新增時,ID已經存在,則修改;如果新增時,ID不存在,則新增。因此這兒只需要兩個隊列

  1. 聲明隊列交換機

在hotel-demo中,定義配置類,聲明隊列、交換機:

package cn.itcast.hotel.config;import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqConfig {@Beanpublic TopicExchange topicExchange(){return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);}@Beanpublic Queue insertQueue(){return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);}@Beanpublic Queue deleteQueue(){return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);}@Beanpublic Binding insertQueueBinding(){return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);}@Beanpublic Binding deleteQueueBinding(){return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);}
}

3.2.4 發送MQ消息

在hotel-admin中的增、刪、改業務中分別發送MQ消息:

消息內容為id,hotel-demo根據id增刪改
由于消息會保存到mq中,而mq是基于內存的,如果把整個hotel都發送過去,會比較消耗內存,對于mq就很容易將隊列占滿,所以建議發送消息時,消息體盡量小一點,因此這里我們可以只發送hotel的id,對方可以根據id查到這個數據

在這里插入圖片描述

3.2.5 接收MQ消息

hotel-demo接收到MQ消息要做的事情包括:

  • 新增消息:根據傳遞的hotel的id查詢hotel信息,然后新增一條數據到索引庫
  • 刪除消息:根據傳遞的hotel的id刪除索引庫中的一條數據

1)首先在hotel-demo的cn.itcast.hotel.service包下的IHotelService中新增新增、刪除業務

void deleteById(Long id);void insertById(Long id);

2)給hotel-demo中的cn.itcast.hotel.service.impl包下的HotelService中實現業務:

@Override
public void deleteById(Long id) {try {// 1.準備RequestDeleteRequest request = new DeleteRequest("hotel", id.toString());// 2.發送請求client.delete(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}
}@Override
public void insertById(Long id) {try {// 0.根據id查詢酒店數據Hotel hotel = getById(id);// 轉換為文檔類型HotelDoc hotelDoc = new HotelDoc(hotel);// 1.準備Request對象IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());// 2.準備Json文檔request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);// 3.發送請求client.index(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}
}

3)編寫監聽器

在hotel-demo中的cn.itcast.hotel.mq包新增一個類:

package cn.itcast.hotel.mq;import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class HotelListener {@Autowiredprivate IHotelService hotelService;/*** 監聽酒店新增或修改的業務* @param id 酒店id*/@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)public void listenHotelInsertOrUpdate(Long id){hotelService.insertById(id);}/*** 監聽酒店刪除的業務* @param id 酒店id*/@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)public void listenHotelDelete(Long id){hotelService.deleteById(id);}
}

3.2.5 啟動并測試數據同步功能

運行管理端和用戶端服務后,打開rabbitmq服務端: http://192.168.1.189/15672,可以看到隊列、交換機創建成功;以及交換機綁定關系

在這里插入圖片描述
在管理端修改酒店信息后可以看到消息:
在這里插入圖片描述
也可以查看到用戶界面數據也已經修改。刪除功能一樣。

為了避免刪除數據后就找不到了,可以 vue插件實現快速拷貝數據到表單

4.集群

單機的elasticsearch做數據存儲,必然面臨兩個問題:海量數據存儲問題、單點故障問題。

  • 海量數據存儲問題:將索引庫從邏輯上拆分為N個分片(shard),存儲到多個節點
  • 單點故障問題:將分片數據在不同節點備份(replica )

ES集群相關概念:

  • 集群(cluster):一組擁有共同的 cluster name 的 節點。

  • 節點(node) :集群中的一個 Elasticearch 實例

  • 分片(shard):索引可以被拆分為不同的部分進行存儲,稱為分片。在集群環境下,一個索引的不同分片可以拆分到不同的節點中

    解決問題:數據量太大,單點存儲量有限的問題。

在這里插入圖片描述

此處,我們把數據分成3片:shard0、shard1、shard2

  • 主分片(Primary shard):相對于副本分片的定義。

  • 副本分片(Replica shard)每個主分片可以有一個或者多個副本,數據和主分片一樣。

    ?

數據備份可以保證高可用,但是每個分片備份一份,所需要的節點數量就會翻一倍,成本實在是太高了!

為了在高可用和成本間尋求平衡,我們可以這樣做:

  • 首先對數據分片,存儲到不同節點
  • 然后對每個分片進行備份,放到對方節點,完成互相備份

這樣可以大大減少所需要的服務節點數量,如圖,我們以3分片,每個分片備份一份為例:

在這里插入圖片描述

現在,每個分片都有1個備份,存儲在3個節點:

  • node0:保存了分片0和1
  • node1:保存了分片0和2
  • node2:保存了分片1和2

4.1 搭建ES集群

我們會在單機上利用docker容器運行多個es實例來模擬es集群。不過生產環境推薦大家每一臺服務節點僅部署一個es的實例。

部署es集群可以直接使用docker-compose來完成,但這要求你的Linux虛擬機至少有4G的內存空間

4.1.1 創建es集群

首先編寫一個docker-compose.yml文件,內容如下:

version: '2.2'
services:es01:image: elasticsearch:7.12.1container_name: es01environment:- node.name=es01- cluster.name=es-docker-cluster- discovery.seed_hosts=es02,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data01:/usr/share/elasticsearch/dataports:- 9200:9200networks:- elastices02:image: elasticsearch:7.12.1container_name: es02environment:- node.name=es02- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data02:/usr/share/elasticsearch/dataports:- 9201:9200networks:- elastices03:image: elasticsearch:7.12.1container_name: es03environment:- node.name=es03- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es02- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data03:/usr/share/elasticsearch/datanetworks:- elasticports:- 9202:9200
volumes:data01:driver: localdata02:driver: localdata03:driver: localnetworks:elastic:driver: bridge

es運行需要修改一些linux系統權限,修改/etc/sysctl.conf文件

vi /etc/sysctl.conf

添加下面的內容:

vm.max_map_count=262144

然后執行命令,讓配置生效:

sysctl -p

通過docker-compose啟動集群:

docker-compose up -d

在這里插入圖片描述

4.1.2 集群狀態監控

kibana可以監控es集群,不過新版本需要依賴es的x-pack 功能,配置比較復雜。

這里推薦使用cerebro來監控es集群狀態,官方網址:https://github.com/lmenezes/cerebro

資料已經提供了安裝包:
在這里插入圖片描述
解壓即可使用,非常方便。

解壓好的目錄如下:
在這里插入圖片描述

進入對應的bin目錄:

在這里插入圖片描述

雙擊其中的cerebro.bat文件即可啟動服務。

訪問http://localhost:9000 即可進入管理界面:

在這里插入圖片描述

輸入你的elasticsearch的任意節點的地址和端口,點擊connect即可:實心五角星是當前的主節點,空心五角星是候選節點

在這里插入圖片描述

綠色的條,代表集群處于綠色(健康狀態)。

4.1.3 創建索引庫

  1. 利用kibana的DevTools創建索引庫

在DevTools中輸入指令:

PUT /itcast
{"settings": {"number_of_shards": 3, // 分片數量"number_of_replicas": 1 // 副本數量},"mappings": {"properties": {// mapping映射定義 ...}}
}
  1. 利用cerebro創建索引庫

利用cerebro還可以創建索引庫:
在這里插入圖片描述

填寫索引庫信息:

在這里插入圖片描述

點擊右下角的create按鈕:

在這里插入圖片描述

4.1.4 查看分片效果

回到首頁,即可查看索引庫分片效果:

在這里插入圖片描述

4.2 集群腦裂問題

4.2.1 集群職責劃分

elasticsearch中集群節點有不同的職責劃分:

在這里插入圖片描述

默認情況下,集群中的任何一個節點都同時具備上述四種角色。

但是真實的集群一定要將集群職責分離:

  • master節點:對CPU要求高,但是內存要求第
  • data節點:對CPU和內存要求都高
  • coordinating節點:對網絡帶寬、CPU要求高

職責分離可以讓我們根據不同節點的需求分配不同的硬件去部署。而且避免業務之間的互相干擾。

一個典型的es集群職責劃分如圖:

在這里插入圖片描述

4.2.2.腦裂問題

腦裂是因為集群中的節點失聯導致的。

例如一個集群中,主節點與其它節點失聯:

在這里插入圖片描述

此時,node2和node3認為node1宕機,就會重新選主:

在這里插入圖片描述

當node3當選后,集群繼續對外提供服務,node2和node3自成集群,node1自成集群,兩個集群數據不同步,出現數據差異。

當網絡恢復后,因為集群中有兩個master節點,集群狀態的不一致,出現腦裂的情況:

在這里插入圖片描述

解決腦裂的方案:要求選票超過 ( eligible節點數量 + 1 )/ 2 才能當選為主,因此eligible節點數量最好是奇數。對應配置項是discovery.zen.minimum_master_nodes,在es7.0以后,已經成為默認配置,因此一般不會發生腦裂問題

例如:3個節點形成的集群,選票必須超過 (3 + 1) / 2 ,也就是2票。node3得到node2和node3的選票,當選為主。node1只有自己1票,沒有當選。集群中依然只有1個主節點,沒有出現腦裂。

4.2.3 小結

master eligible節點的作用是什么?

  • 參與集群選主
  • 主節點可以管理集群狀態、管理分片信息、處理創建和刪除索引庫的請求

data節點的作用是什么?

  • 數據的CRUD

coordinator節點的作用是什么?

  • 路由請求到其它節點

  • 合并查詢到的結果,返回給用戶

4.3 集群分布式存儲

當新增文檔時,應該保存到不同分片,保證數據均衡,那么coordinating node如何確定數據該存儲到哪個分片呢?

4.3.1 分片存儲測試

插入三條數據:

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

測試可以看到,三條數據分別在不同分片:

在這里插入圖片描述

結果:

在這里插入圖片描述

4.3.2 分片存儲原理

elasticsearch會通過hash算法來計算文檔應該存儲到哪個分片:

在這里插入圖片描述

說明:

  • _routing默認是文檔的id
  • 算法與分片數量有關,因此索引庫一旦創建,分片數量不能修改!

新增文檔的流程如下:

在這里插入圖片描述

解讀:

  • 1)新增一個id=1的文檔
  • 2)對id做hash運算,假如得到的是2,則應該存儲到shard-2
  • 3)shard-2的主分片在node3節點,將數據路由到node3
  • 4)保存文檔
  • 5)同步給shard-2的副本replica-2,在node2節點
  • 6)返回結果給coordinating-node節點

4.4 集群分布式查詢

elasticsearch的查詢分成兩個階段:

  • scatter phase:分散階段,coordinating node會把請求分發到每一個分片

  • gather phase:聚集階段,coordinating node匯總data node的搜索結果,并處理為最終結果集返回給用戶

在這里插入圖片描述

coordinating node可以是三個節點中的任意一個(因為默認情況下,每個節點都是協調節點),也可以單獨指定一個節點,無論訪問的nodo1還是node2或者node3都會把請求分發給每一個分片

總結:

  • 分布式新增如何確定分片?
    • coordinating node根據id做hash運算,得到結果對shard數量取余,余數就是對應的分片
  • 分布式查詢的兩個階段
    • 分散階段: coordinating node將查詢請求分發給不同分片
    • 收集階段:將查詢結果匯總到coordinating node ,整理并返回給用戶

4.5.集群故障轉移

集群的master節點會監控集群中的節點狀態,如果發現有節點宕機,會立即將宕機節點的分片數據遷移到其它節點,確保數據安全,這個叫做故障轉移。

1)例如一個集群結構如圖:
在這里插入圖片描述

現在,node1是主節點,其它兩個節點是從節點。

2)突然,node1發生了故障(模擬es01宕機:docker-compose stop es01):

在這里插入圖片描述

宕機后的第一件事,需要重新選主,例如選中了node2:

在這里插入圖片描述

node2成為主節點后,會檢測集群監控狀態,發現:shard-1、shard-0沒有副本節點。因此需要將node1上的數據遷移到node2、node3

在這里插入圖片描述

在這里插入圖片描述

此時重啟node1 docker-compose start es01,發現會重新分配出兩個分片到es01:
在這里插入圖片描述

總結:

故障轉移:

  • master宕機后,EligibleMaster選舉為新的主節點。
  • master節點監控分片、節點狀態,將故障節點上的分片轉移到正常節點,確保數據安全。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/42818.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/42818.shtml
英文地址,請注明出處:http://en.pswp.cn/news/42818.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

POJ 1995 Raising Modulo Numbers 快速冪

一、總結 我一開始擔心溢出&#xff0c;開了一個無符號的long long&#xff0c;但是直接超時&#xff0c;后來一看它的mod不是很大&#xff0c;于是改成int&#xff0c;直接過了。 二、代碼 #include <iostream> using namespace std; int H, Z; int M; int mulMod(in…

P1217 [USACO1.5] 回文質數 Prime Palindromes

P1217 [USACO1.5] 回文質數 Prime Palindromes - 洛谷 | 計算機科學教育新生態 (luogu.com.cn) # [USACO1.5] 回文質數 Prime Palindromes ## 題目描述 因為 $151$ 既是一個質數又是一個回文數&#xff08;從左到右和從右到左是看一樣的&#xff09;&#xff0c;所以 $151$ …

Python基礎教程:私有變量的訪問和賦值教程

嗨嘍~大家好呀&#xff0c;這里是魔王吶 ? ~! 首先我們這里先描述下&#xff1a; Python中&#xff0c;變量名類似__x__的&#xff0c;以雙下劃線開頭&#xff0c;并且以雙下劃線結尾的&#xff0c;是特殊變量&#xff0c;特殊變量是可以直接訪問的&#xff08;比如 doc, __i…

SpringBoot3集成ElasticSearch

標簽&#xff1a;ElasticSearch8.Kibana8&#xff1b; 一、簡介 Elasticsearch是一個分布式、RESTful風格的搜索和數據分析引擎&#xff0c;適用于各種數據類型&#xff0c;數字、文本、地理位置、結構化數據、非結構化數據&#xff1b; 在實際的工作中&#xff0c;歷經過Ela…

南大通用數據庫(gbase 8s) 在Centos7環境 集群安裝

國產數據庫-達夢 一、環境詳情二、Centos7 參數優化&#xff08;所有節點&#xff09;三、創建gbase用戶&#xff08;所有節點&#xff09;四、 安裝部署&#xff0c;只在node1 安裝即可同步五、數據庫卸載六、數據庫連接與使用 當前安裝 在指定版本環境下 測試&#xff0c;僅供…

QT的設計器介紹

設計器介紹 Qt制作 UI 界面&#xff0c;一般可以通過UI制作工具QtDesigner和純代碼編寫兩種方式來實現。純代碼實現暫時在這里不闡述了在后續布局章節詳細說明&#xff0c;QtDesigner已經繼承到開發環境中&#xff0c;在工程中直接雙擊ui文件就可以直接在QtDesigner設計器中打…

【100天精通python】Day39:GUI界面編程_PyQt 從入門到實戰(下)_圖形繪制和動畫效果,數據可視化,刷新交互

目錄 專欄導讀 6 圖形繪制與動畫效果 6.1 繪制基本圖形、文本和圖片 6.2 實現動畫效果和過渡效果 7 數據可視化 7.1 使用 Matplotlib繪制圖表 7.2 使用PyQtGraph繪制圖表 7.3 數據的實時刷新和交互操作 7.3.1 數據的實時刷新 7.3.2 交互操作 7.4 自定義數據可視化…

C# 窗體永遠在最前

C# 窗體永遠在最前 1、調用系統API public const int HWND_TOP 0;public const int HWND_BOTTOM 1;public const int HWND_TOPMOST -1;public const int HWND_NOTOPMOST -2;//設置此窗體為活動窗體&#xff1a;//將創建指定窗口的線程帶到前臺并激活該窗口。鍵盤輸入直接指…

【WPF】 本地化的最佳做法

【WPF】 本地化的最佳做法 資源文件英文資源文件 en-US.xaml中文資源文件 zh-CN.xaml 資源使用App.xaml主界面布局cs代碼 App.config輔助類語言切換操作類資源 binding 解析類 實現效果 應用程序本地化有很多種方式&#xff0c;選擇合適的才是最好的。這里只討論一種方式&#…

Unity制作一個簡單的登入注冊頁面

1.創建Canvas組件 首先我們創建一個Canvas畫布&#xff0c;我們再在Canvas畫布底下創建一個空物體&#xff0c;取名為Resgister。把空物體的錨點設置為全屏撐開。 2.我們在Resgister空物體底下創建一個Image組件&#xff0c;改名為bg。我們也把它 的錨點設置為全屏撐開狀態。接…

【深入理解ES6】字符串和正則表達式

概念 字符串&#xff08;String&#xff09;是JavaScript6大原始數據類型。其他幾個分別是Boolean、Null、Undefined、Number、Symbol&#xff08;es6新增&#xff09;。 更好的Unicode支持 1. UTF-16碼位 字符串里的字符有兩種&#xff1a; 前 個碼位均以16位的編碼單元…

每日一道面試題之session 和 cookie 有什么區別?

Session和Cookie是兩種在Web開發中用于跟蹤用戶狀態的機制&#xff1a; 它們之間的區別如下&#xff1a; 存儲位置&#xff1a;Cookie是存儲在用戶瀏覽器中的小型文本文件&#xff0c;而Session是存儲在服務器上的數據結構。 數據安全性&#xff1a;Cookie中的數據可以被用戶…

總結,由于順豐的問題,產生了電腦近期一個月死機問題集錦

由于我搬家&#xff0c;我媽搞順豐發回家&#xff0c;但是沒有檢查有沒有壞&#xff0c;并且我自己由于不可抗力因素&#xff0c;超過了索賠時間&#xff0c;反饋給順豐客服&#xff0c;說超過了造成了無法索賠的情況&#xff0c;現在總結發生了損壞配件有幾件&#xff0c;顯卡…

NLP | 論文摘要文本分類

基于論文摘要的文本分類與關鍵詞抽取挑戰賽??????2023 iFLYTEK A.I.開發者大賽-訊飛開放平臺 環境需求&#xff1a;Anaconda-JupyterNotebook&#xff0c;或者百度AIStudio 賽題解析&#xff1a; 【文本二分類任務】根據論文摘要等信息理解&#xff0c;將論文劃分為0-1兩…

文心一言最新重磅發布!

8月16日&#xff0c;由深度學習技術及應用國家工程研究中心主辦的WAVE SUMMIT深度學習開發者大會2023舉辦。百度首席技術官、深度學習技術及應用國家工程研究中心主任王海峰以《大語言模型為通用人工智能帶來曙光》為題&#xff0c;闡述了大語言模型具備理解、生成、邏輯、記憶…

【云原生】k8s存儲管理中ConfigMap Secret的使用

目錄 1 ConfigMap 1.1 簡介 1.2 優點 1.3 定義 ConfigMap 1.4 使用 2 Secret 2.1 簡介 2.1 定義 Secret 2.2 使用 1 ConfigMap 1.1 簡介 在 Kubernetes 中&#xff0c;ConfigMap 是一種用于存儲非敏感信息的 Kubernetes 對象。它用于存儲配置數據&#xff0c;如鍵值…

樹莓派系統入門教程(三)—— 使用Windows上的VSCode遠程連接樹莓派進行Python開發

樹莓派系統入門教程&#xff08;三&#xff09;—— 使用Windows上的VSCode遠程連接樹莓派進行Python開發 1. 安裝VSCode和SSH擴展2. SSH連接配置3. 連接到樹莓派4. 運行Python程序5. 建議和注意事項 很多開發者更喜歡在大屏幕和強大的開發環境中編寫代碼&#xff0c;但同時他們…

Vue 2 動態組件和異步組件

先閱讀 【Vue 2 組件基礎】中的初步了解動態組件。 動態組件與keep-alive 我們知道動態組件使用is屬性和component標簽結合來切換不同組件。 下面給出一個示例&#xff1a; <!DOCTYPE html> <html><head><title>Vue 動態組件</title><scri…

Typora 相對路徑保存圖片以及 Gitee 無法顯示圖片

目錄 Typora 相對路徑保存圖片 Gitee 無法顯示圖片 Typora 相對路徑保存圖片 Step1&#xff1a;修改 Typora 的偏好設置 自動在當前目錄創建名為 "./${filename}.assets" 的文件夾粘貼圖片到 md 中時&#xff0c;圖片會自動另存到 "./${filename}.assets&qu…