xxljob分片廣播+多線程實現高效定時同步elasticsearch索引庫

需求:為了利用elasticsearch實現高效搜索,需要將mysql中的數據查出來,再定時同步到es里,同時在同步過程中通過分片廣播+多線程提高同步數據的效率。

1. 添加映射

  • 使用kibana添加映射
PUT  /app_info_article
{"mappings":{"properties":{"id":{"type":"long"},"publishTime":{"type":"date"},"layout":{"type":"integer"},"images":{"type":"keyword","index": false},"staticUrl":{"type":"keyword","index": false},"authorId": {"type": "long"},"authorName": {"type": "keyword"},"title":{"type":"text","analyzer":"ik_max_word","copy_to": "all"},"content":{"type":"text","analyzer":"ik_max_word","copy_to": "all"},"all":{"type": "text","analyzer": "ik_max_word"}}}
}
  • 使用http請求
    PUT請求添加映射:http://192.168.200.130:9200/app_info_article
    GET請求查詢映射:http://192.168.200.130:9200/app_info_article
    DELETE請求,刪除索引及映射:http://192.168.200.130:9200/app_info_article
    GET請求,查詢所有文檔:http://192.168.200.130:9200/app_info_article/_search
    在這里插入圖片描述

2. springboot測試

引入依賴

<!--elasticsearch--><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.12.1</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>7.12.1</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.12.1</version><exclusions><exclusion><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-smile</artifactId></exclusion><exclusion><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-yaml</artifactId></exclusion></exclusions></dependency>

elasticsearch配置

#自定義elasticsearch連接配置
elasticsearch:host: 192.168.200.131port: 9200

elasticsearch配置類

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {private String host;private int port;@Beanpublic RestHighLevelClient client(){return new RestHighLevelClient(RestClient.builder(new HttpHost(host,port,"http")));}
}

實體類

@Data
public class SearchArticleVo {// 文章idprivate Long id;// 文章標題private String title;// 文章發布時間private Date publishTime;// 文章布局private Integer layout;// 封面private String images;// 作者idprivate Long authorId;// 作者名詞private String authorName;//靜態urlprivate String staticUrl;//文章內容private String content;}

測試。測試成功后別忘了刪除app_info_article的所有文檔

@SpringBootTest
@RunWith(SpringRunner.class)
public class ApArticleTest {@Autowiredprivate ApArticleMapper apArticleMapper;@Autowiredprivate RestHighLevelClient restHighLevelClient;/*** 注意:數據量的導入,如果數據量過大,需要分頁導入*  1)查詢數據庫數據*  2)將數據寫入到ES中即可*     創建BulkRequest*            ================================*            ||A:創建XxxRequest*            ||B:向XxxRequest封裝DSL語句數據*            ||                             X C:使用RestHighLevelClient執行遠程請求*            ================================*            將XxxRequest添加到BulkRequest*       使用RestHighLevelClient將BulkRequest添加到索引庫* @throws Exception*/@Testpublic void init() throws Exception {//1)查詢數據庫數據List<SearchArticleVo> searchArticleVos = apArticleMapper.loadArticleList();//2)創建BulkRequest - 刷新策略BulkRequest bulkRequest = new BulkRequest()//刷新策略-立即刷新.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);for (SearchArticleVo searchArticleVo : searchArticleVos) {//A:創建XxxRequestIndexRequest indexRequest = new IndexRequest("app_info_article")//B:向XxxRequest封裝DSL語句數據.id(searchArticleVo.getId().toString()).source(JSON.toJSONString(searchArticleVo), XContentType.JSON);//3)將XxxRequest添加到BulkRequestbulkRequest.add(indexRequest);}//4)使用RestHighLevelClient將BulkRequest添加到索引庫restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);}}

3. 核心代碼

3.1 xxljob配置

xxl:job:accessToken: default_tokenadmin:addresses: http://127.0.0.1:8080/xxl-job-adminexecutor:address: ''appname: hmttip: ''logpath: /data/applogs/xxl-job/jobhandlerlogretentiondays: 30port: 9998
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}
}

3.2 elasticsearch配置

和前面一樣

3.3 任務編寫

@Component
public class SyncIndexTask {@Autowiredprivate IArticleClient articleClient;@Autowiredprivate RestHighLevelClient restHighLevelClient;//線程池public static ExecutorService pool = Executors.newFixedThreadPool(10);/**** 同步索引任務*  1)當數量大于100條的時候,才做分片導入,否則只讓第1個導入即可*      A:查詢所有數據量 ->searchTotal total>100 [判斷當前分片不是第1個分片]*      第N個分片執行數據處理范圍-要計算   確定當前分片處理的數據范圍  limit #{index},#{size}*                                                                [index-范圍]**      B:執行分頁查詢-需要根據index判斷是否超過界限,如果沒有超過界限,則并開啟多線程,分頁查詢,將當前分頁數據批量導入到ES**      C:在xxl-job中配置作業-策略:分片策略**/@XxlJob("syncIndex")public void syncIndex()  {//1、獲取任務傳入的參數   {"minSize":100,"size":10}String jobParam = XxlJobHelper.getJobParam();Map<String,Integer> jobData = JSON.parseObject(jobParam,Map.class);int minSize = jobData.get("minSize"); //分片處理的最小總數據條數int size =  jobData.get("size"); //分頁查詢的每頁條數   小分頁//2、查詢需要處理的總數據量  total=IArticleClient.searchTotal()Long total = articleClient.searchTotal();//3、判斷當前分片是否屬于第1片,不屬于,則需要判斷總數量是否大于指定的數據量[minSize],大于,則執行任務處理,小于或等于,則直接結束任務int cn = XxlJobHelper.getShardIndex(); //當前節點的下標if(total<=minSize && cn!=0){//結束return;}//4、執行任務   [index-范圍]   大的分片分頁處理//4.1:節點個數int n = XxlJobHelper.getShardTotal();//4.2:當前節點處理的數據量int count = (int) (total % n==0? total/n :  (total/n)+1);//4.3:確定當前節點處理的數據范圍//從下標為index的數據開始處理  limit #{index},#{count}int indexStart = cn*count;int indexEnd = cn*count+count-1; //最大的范圍的最后一個數據的下標//5.小的分頁查詢和批量處理int index =indexStart; //第1頁的indexSystem.out.println("分片個數是【"+n+"】,當前分片下標【"+cn+"】,處理的數據下標范圍【"+indexStart+"-"+indexEnd+"】");do {//=============================================小分頁================================//5.1:分頁查詢//5.2:將數據導入ESpush(index,size,indexEnd);//5.3:是否要查詢下一頁 index+sizeindex = index+size;}while (index<=indexEnd);}/*** 數據批量導入* @param index* @param size* @param indexEnd* @throws IOException*/public void push(int index,int size,int indexEnd)  {pool.execute(()->{System.out.println("當前線程處理的分頁數據是【index="+index+",size="+(index+size>indexEnd? indexEnd-index+1 : size)+"】");//1)查詢數據庫數據List<SearchArticleVo> searchArticleVos = articleClient.searchPage(index, index+size>indexEnd? indexEnd-index+1 : size);  //size可能越界//2)創建BulkRequest - 刷新策略BulkRequest bulkRequest = new BulkRequest()//刷新策略-立即刷新.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);for (SearchArticleVo searchArticleVo : searchArticleVos) {//A:創建XxxRequestIndexRequest indexRequest = new IndexRequest("app_info_article")//B:向XxxRequest封裝DSL語句數據.id(searchArticleVo.getId().toString()).source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);//3)將XxxRequest添加到BulkRequestbulkRequest.add(indexRequest);}//4)使用RestHighLevelClient將BulkRequest添加到索引庫if(searchArticleVos!=null && searchArticleVos.size()>0){try {restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}}});}
}

3.4 xxl-admin新增任務

  • 新增執行器 這里的appName要和XxlJobConfig 中的appname保持一致。也可以不用新增執行器,執行器就相當于一個分組的作用。
    在這里插入圖片描述
  • 新增任務
    在這里插入圖片描述

4. 模擬集群,運行項目

  • 運行3個SearchApplication項目,設置xxl.job.executor.port分別為9998,9997,9996
    在這里插入圖片描述
  • 執行一次任務
    在這里插入圖片描述
  • 查看結果,3個application都成功了
    在這里插入圖片描述
  • kibana查看是否有數據,發現有18條,mysql的數據全部被導入了

在這里插入圖片描述

5. 總結

  • xxljob分片廣播:假如一共有1000條數據,有3個節點上運行著SearchApplication服務。那么每個節點需要同步的數據總條數為334,334,332條。
    在這里插入圖片描述

  • 分頁查詢:節點0的任務總條數為334條,那么需要做分頁(假設分頁size為20)查詢的次數為17次,每查1次后,將查到的數據通過restHighLevelClient發送到es中。

do {//5.1:分頁查詢//5.2:將數據導入ESpush(index,size,indexEnd);  //分頁查詢+導入es的操作//5.3:是否要查詢下一頁 index+sizeindex = index+size;
}while (index<=indexEnd);
  • 多線程:上述代碼,push方法包括了分頁查詢+導入es的操作,是低效的。并且push方法不結束的話,下一頁的操作不會開始。這里可以用多線程,每次push的時候都開啟一個新線程,這樣每一頁的操作都是獨立的,可以同時查詢,同時導入到es,不會互相影響。這里使用了線程池。
public static ExecutorService pool = Executors.newFixedThreadPool(10);
......
public void push(int index,int size,int indexEnd)  {pool.execute(()->{//分頁查詢+導入到es});}
  • elasticsearch的相關api:參考我另一篇博客 項目中使用Elasticsearch的API相關介紹
//2)創建BulkRequest - 刷新策略BulkRequest bulkRequest = new BulkRequest()//刷新策略-立即刷新.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);for (SearchArticleVo searchArticleVo : searchArticleVos) {//A:創建XxxRequestIndexRequest indexRequest = new IndexRequest("app_info_article")//B:向XxxRequest封裝DSL語句數據.id(searchArticleVo.getId().toString()).source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);//3)將XxxRequest添加到BulkRequestbulkRequest.add(indexRequest);}//4)使用RestHighLevelClient將BulkRequest添加到索引庫if(searchArticleVos!=null && searchArticleVos.size()>0){try {restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/12720.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/12720.shtml
英文地址,請注明出處:http://en.pswp.cn/web/12720.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

HL7協議

提示&#xff1a;文章寫完后&#xff0c;目錄可以自動生成&#xff0c;如何生成可參考右邊的幫助文檔 文章目錄 1.介紹2.傳輸協議規范2.1. MLLP2.1.1. 數據頭定義2.1.2. 轉義字符集 2.2. 規范說明2.3. 消息格式說明 3.HL7結構介紹3.1. 患者建檔&#xff08;ADT^A28&#xff09;…

linux c++獲取當前程序的運行路徑

比如我的程序名為:aaa 存放路徑是:/homo/code/ 我在/home/ccc 目錄執行shell文件。shell文件的內容為 #!/bin/bash /homo/code/aaa我希望獲取的路徑是 /homo/code/ 而不是腳本的路徑 給出完整接口代碼 #include <iostream> #include <string> #include <stri…

人工智能領域向量化技術加速多模態大模型訓練與應用

目錄 前言1、TextIn文檔解析技術1.1、文檔解析技術1.2、目前存在的問題1.2.1、不規則的文檔信息示例 1.3、合合信息的文檔解析1.3.1、合合信息的TextIn文檔解析技術架構1.3.2、版面分析關鍵技術 Layout-engine1.3.3、文檔樹提取關鍵技術 Catalog-engine1.3.4、雙欄1.3.5、非對稱…

matlab實現馬爾科夫鏈

在MATLAB中實現馬爾科夫鏈算法通常涉及定義狀態轉移矩陣、初始化狀態向量以及迭代狀態轉移過程。以下是一個簡單的步驟和示例代碼&#xff0c;用于演示如何在MATLAB中實現馬爾科夫鏈。 步驟 定義狀態轉移矩陣&#xff1a;狀態轉移矩陣P描述了從一個狀態轉移到另一個狀態的概率…

注冊海外公司為什么?

注冊海外公司通常是為了實現以下目標之一&#xff1a; 國際化業務擴張&#xff1a; 一些企業可能希望在海外注冊子公司&#xff0c;以便在國際市場上開展業務。這樣的公司可能是跨國企業&#xff0c;已經在多個國家有業務&#xff0c;或者是希望進入新的國際市場的企業。 稅收…

計算機服務器中了locked勒索病毒怎么解決,locked勒索病毒解密恢復工具

在網絡技術飛速發展的時代&#xff0c;通過網絡開展各項工作業務成為眾多企業的首選&#xff0c;網絡也為企業的生產運營提供了極大便利&#xff0c;大大提升了企業辦公效率&#xff0c;但是利用網絡避免不了網絡威脅的存在&#xff0c;數據安全問題一直是企業關心的主要話題。…

不知道代理IP怎么挑?一文帶你了解挑選的關鍵點!

IP代理在如今的網絡環境中扮演者至關重要的角色。通過使用代理IP&#xff0c;可以增強用戶個人信息和網絡的安全。但想要挑選到適合自己的代理IP&#xff0c;并非是一件易事。今天就為大家帶來挑選代理IP的關鍵注意點&#xff0c;幫你輕松篩選出最佳的選擇。 穩定性與速度&…

TikTok機房ip好還是住宅ip好?

住宅ip比較好&#xff0c;機房數據中心IP高效、低價&#xff0c;所以使用的人多且用處復雜&#xff0c;這類ip極大可能存在濫用的黑歷史&#xff0c;通過此類ip訪問tiktok&#xff0c;被禁止的可能性更高&#xff0c;更容易被拉入黑名單。所以我們推薦tiktok獨享原生ip搭建節點…

CC工具箱使用指南:【界線導出Excel(一橫)】

一、簡介 群友定制工具。 這個工具的目的是將面要素的邊界線的屬性導出Excel。 給定的Excel模板如下&#xff1a; 結果需要輸出每一段界一的起點、終點的坐標&#xff0c;這里以度分秒的方法表達。 每段界線的方位角以及方向&#xff0c;方向按16位方位角描述&#xff1a; …

高通QCS6490開發(六):連接使用攝像頭

本文將會介紹如何在FV01開發板上連接攝像頭和顯示預覽。 所用硬件有&#xff1a; 1. FV01開發板 2.Raspberry 攝像頭 操作步驟如下&#xff1a; 通過FPC線和杜邦線將FV01板和攝像頭連接起來&#xff0c;接線如下&#xff1a; 1、Camera設備連接&#xff0c;通過22pin轉15pi…

togaf培訓簡介2

1.定義 2.ADM 業務下降期不要瞎折騰&#xff0c;上升期配合業務做一些改革&#xff1f; 項目交付物不能是聊天記錄、PPT什么的&#xff0c;最起碼是郵件。 3.架構內容框架 或者叫&#xff1a;企業統一體。 包括&#xff1a;企業連續性和解決方案連續性 方案和工具的解耦很大程…

【回溯】1255. 得分最高的單詞集合

本文涉及知識點 回溯 力扣難道&#xff1a;1881 LeetCode1255. 得分最高的單詞集合 你將會得到一份單詞表 words&#xff0c;一個字母表 letters &#xff08;可能會有重復字母&#xff09;&#xff0c;以及每個字母對應的得分情況表 score。 請你幫忙計算玩家在單詞拼寫游戲…

Mysql常見數據類型探索

Mysql常見數據類型探索 數值類型 MySQL 支持所有標準 SQL 數值數據類型。 這些類型包括嚴格數值數據類型(INTEGER、SMALLINT、DECIMAL 和 NUMERIC)&#xff0c;以及近似數值數據類型(FLOAT、REAL 和 DOUBLE PRECISION)。 關鍵字INT是INTEGER的同義詞&#xff0c;關鍵字DEC是…

K8s 二進制部署 上篇

一 K8S按裝部署方式&#xff1a; ① Minikube Minikube是一個工具&#xff0c;可以在本地快速運行一個單節點微型K8S&#xff0c;僅用于學習、預覽K8S的一些特 性使用。 部署地址&#xff1a;https://kubernetes.io/docs/setup/minikube ② Kubeadmin Kubeadmin也是一個工…

vue網頁端控制臺展示獨有標記

效果展示 實現步驟 1. 新建js文件 定義一個類 用于提供控制臺打印日志顯示樣式的方法 src\libs\util.log.js class Logger {// 定義靜態方法static typeColor(type "default") {let color "";switch (type) {case "default":color "#3…

后臺菜單數據遞歸展示

后臺菜單數據遞歸展示 效果示例圖aslide.vueaslideItem.vuemenu 效果示例圖 aslide.vue <script setup>import {ref} from vue;const props defineProps({isCollapse: {type: Boolean,default: false}});import AslideItem from "./aslideItem.vue"const def…

MIRO時,修改頁簽“采購訂單參考”的數量時,金額不自動計算

MIRO 發票校驗時&#xff0c;進入到如下界面&#xff0c;系統參考采購訂單自動帶出已經收貨的金額和數量。 此時如果想要修改數量時&#xff0c;有些用戶賬號下&#xff0c;金額不自動計算&#xff0c;但是有些用戶賬號下&#xff0c;數量更改時&#xff0c;系統自動計算和建議…

“普惠門診保”24年升級回歸! 您醫保的有效商業補充!

2024年5月15日&#xff0c; “普惠門診保如意版”正式官宣發布&#xff01; 2023年&#xff0c;中國人民財產保險股份有限公司湖南省分公司積極創新的惠民型商業補充醫療保險&#xff0c;推出湖南省內首款互聯網門診醫療保險“普惠門診保” 2024年&#xff0c;在去年保障內容…

窮人翻身的秘訣!2024年普通人如何創業賺錢?窮人如何逆襲翻身?普通人創業新風口?

窮人的思維有一個致命的缺陷&#xff0c;就是追求確定性&#xff0c;進而失去了可能性。而賺錢的真相實際上非常殘酷。世界上能夠賺錢的事情必定是不確定的&#xff0c;能夠賺取巨額財富的事情更是極度不確定的。只有面對不確定性&#xff0c;才能讓你把競爭對手攔在門外&#…

如何在 Linux 上檢查 CPU 和硬盤溫度

為了更好地監測您的Linux系統的硬件健康狀況&#xff0c;如CPU與硬盤溫度、風扇轉速等關鍵指標&#xff0c;采用lm_sensors與hddtemp這兩款強大工具是明智之選。以下是關于這些工具的詳盡指南&#xff0c;包括它們的功能介紹、安裝步驟以及如何配置lm_sensors&#xff0c;旨在為…