我們使用ElasticJob需要以下步驟: 1. 添加依賴 2. 配置任務(可以使用Spring命名空間配置或Java配置) 3. 實現任務邏輯(實現SimpleJob、DataflowJob等接口) 4. 啟動任務 下面是一個詳細的示例,包括Spring Boot集成ElasticJob的步驟。 注意:ElasticJob有兩個版本:ElasticJob-Lite和ElasticJob-Cloud。我們這里使用ElasticJob-Lite(輕量級無中心化解決方案)。 步驟1:添加Maven依賴 在pom.xml中添加以下依賴:```xml<dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-lite-spring-boot-starter</artifactId> <version>3.0.1</version> <!-- 請使用最新版本 --></dependency>``` 步驟2:配置ElasticJob(這里使用Spring Boot的application.properties配置) 在application.properties中配置注冊中心(Zookeeper)和任務參數:```properties# 注冊中心配置elasticjob.reg-center.server-lists=localhost:2181elasticjob.reg-center.namespace=elasticjob-demo# 任務配置(這里配置一個簡單任務)elasticjob.jobs.mySimpleJob.elastic-job-class=com.example.demo.job.MySimpleJobelasticjob.jobs.mySimpleJob.cron=0/5 * * * * ?elasticjob.jobs.mySimpleJob.sharding-total-count=3elasticjob.jobs.mySimpleJob.overwrite=true``` 步驟3:編寫任務類(實現SimpleJob接口)```javapackage com.example.demo.job;import org.apache.shardingsphere.elasticjob.api.ShardingContext;import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;public class MySimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { // 根據分片項執行任務 System.out.println("分片項: " + shardingContext.getShardingItem() + ", 總分片數: " + shardingContext.getShardingTotalCount() + ", 任務參數: " + shardingContext.getJobParameter()); // 實際業務邏輯 switch (shardingContext.getShardingItem()) { case 0: // 處理第一片數據 break; case 1: // 處理第二片數據 break; case 2: // 處理第三片數據 break; } }}``` 步驟4:啟動Spring Boot應用 如果需要使用Java配置方式(代替properties配置),可以創建一個配置類:```javaimport org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class ElasticJobConfig { @Autowired private ZookeeperRegistryCenter regCenter; @Bean(initMethod = "schedule") public ScheduleJobBootstrap mySimpleJob(MySimpleJob mySimpleJob) { return new ScheduleJobBootstrap(regCenter, mySimpleJob, JobConfiguration.newBuilder("mySimpleJob", 3) // 任務名稱和分片總數 .cron("0/5 * * * * ?") // cron表達式 .jobParameter("參數") // 任務參數 .overwrite(true) // 是否覆蓋配置 .build()); }}``` 注意:如果使用Java配置,需要同時配置Zookeeper注冊中心:```java@Beanpublic ZookeeperRegistryCenter regCenter() { // 配置Zookeeper ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("localhost:2181", "elasticjob-demo"); return new ZookeeperRegistryCenter(zkConfig);}``` 步驟5:分片策略 ElasticJob可以將任務拆分成多個分片,每個分片由不同的服務器執行。你可以在任務中根據分片項處理不同的數據。 例如,處理數據庫數據時,可以這樣分片:```java@Overridepublic void execute(ShardingContext shardingContext) { // 獲取分片項和總分片數 int shardingItem = shardingContext.getShardingItem(); int shardingTotalCount = shardingContext.getShardingTotalCount(); // 根據分片項查詢數據(例如:按ID取模) List<Long> data = getDataFromDatabase(); for (Long id : data) { if (id % shardingTotalCount == shardingItem) { // 處理這個數據 process(id); } }}``` 步驟6:監聽器(可選) 你可以添加任務監聽器來監控任務執行情況:```javapublic class MyJobListener implements ElasticJobListener { @Override public void beforeJobExecuted(ShardingContexts shardingContexts) { System.out.println("任務開始"); } @Override public void afterJobExecuted(ShardingContexts shardingContexts) { System.out.println("任務結束"); }}``` 然后在任務配置中添加監聽器:```java@Bean(initMethod = "schedule")public ScheduleJobBootstrap mySimpleJob(MySimpleJob mySimpleJob) { return new ScheduleJobBootstrap(regCenter, mySimpleJob, JobConfiguration.newBuilder("mySimpleJob", 3) .cron("0/5 * * * * ?") .jobListenerTypes("myJobListener") // 監聽器名稱(需要將監聽器注冊為Bean) .overwrite(true) .build());}// 注冊監聽器Bean@Beanpublic MyJobListener myJobListener() { return new MyJobListener();}``` 步驟7:錯誤處理 任務執行過程中出現異常,ElasticJob會進行重試(可配置)并記錄錯誤日志。你也可以在任務中自己捕獲異常處理。 總結: 使用ElasticJob的關鍵步驟: 1. 引入依賴 2. 配置注冊中心(Zookeeper) 3. 定義任務(實現SimpleJob等接口) 4. 配置任務(cron表達式、分片數等) 5. 啟動任務(通過Spring Boot自動啟動或手動調度) 更多詳細配置請參考官方文檔:https://shardingsphere.apache.org/elasticjob/current/cn/overview/
ElasticJob 使用詳解
ElasticJob 是一個分布式調度解決方案,提供分布式任務調度、彈性擴容縮容、作業分片等功能。下面我將詳細介紹如何在 Spring Boot 中使用 ElasticJob。
環境準備
1. 添加 Maven 依賴
xml
<dependencies><!-- ElasticJob 核心依賴 --><dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-spring-boot-starter</artifactId><version>3.0.1</version> <!-- 使用最新版本 --></dependency><!-- Zookeeper 注冊中心 --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version></dependency> </dependencies>
2. 配置文件 (application.yml)
yaml
elasticjob:reg-center:server-lists: localhost:2181 # Zookeeper 地址namespace: elasticjob-demo # 命名空間
基礎使用
1. 創建簡單任務
java
import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import org.springframework.stereotype.Component;@Component public class MySimpleJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {// 獲取分片信息int shardingItem = context.getShardingItem();int totalShards = context.getShardingTotalCount();System.out.printf("執行分片任務: %d/%d, 參數: %s%n", shardingItem, totalShards,context.getJobParameter());// 業務邏輯處理switch (shardingItem) {case 0: processShard0();break;case 1:processShard1();break;// ... 其他分片}}private void processShard0() {// 處理分片0的任務}private void processShard1() {// 處理分片1的任務} }
2. 配置任務
java
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class ElasticJobConfig {@Autowiredprivate ZookeeperRegistryCenter regCenter;@Autowiredprivate MySimpleJob mySimpleJob;@Bean(initMethod = "schedule")public ScheduleJobBootstrap scheduleMySimpleJob() {return new ScheduleJobBootstrap(regCenter, mySimpleJob, createJobConfiguration());}private JobConfiguration createJobConfiguration() {// 創建作業配置return JobConfiguration.newBuilder("mySimpleJob", 3) // 作業名稱,分片總數.cron("0/5 * * * * ?") // 每5秒執行一次.shardingItemParameters("0=A,1=B,2=C") // 分片參數.jobParameter("globalParam") // 作業自定義參數.overwrite(true) // 本地配置是否覆蓋注冊中心配置.build();} }
核心功能詳解
1. 分片機制
ElasticJob 的核心功能是將任務拆分成多個分片,由不同節點并行處理:
java
public void execute(ShardingContext context) {// 獲取當前分片項int shardingItem = context.getShardingItem();// 獲取所有待處理數據IDList<Long> allIds = fetchAllDataIds();// 根據分片規則處理數據for (Long id : allIds) {if (id % context.getShardingTotalCount() == shardingItem) {processData(id);}} }
2. 彈性擴容
當增加或減少服務實例時,ElasticJob 會自動重新分配分片:
text
初始狀態(2節點): 節點1:分片0,1 節點2:分片2擴容后(3節點): 節點1:分片0 節點2:分片1 節點3:分片2
3. 任務類型
3.1 簡單任務 (SimpleJob)
java
public class MySimpleJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {// 實現業務邏輯} }
3.2 數據流任務 (DataflowJob)
java
public class MyDataflowJob implements DataflowJob<Order> {@Overridepublic List<Order> fetchData(ShardingContext context) {// 獲取待處理數據return fetchUnprocessedOrders(context);}@Overridepublic void processData(ShardingContext context, List<Order> data) {// 處理數據processOrders(data);} }
3.3 腳本任務 (ScriptJob)
yaml
elasticjob:jobs:myScriptJob:job-type: SCRIPTprops:script.command-line: "echo Hello ElasticJob"
4. 事件追蹤
java
@Bean public ScheduleJobBootstrap scheduleJob() {return new ScheduleJobBootstrap(regCenter,myJob,JobConfiguration.newBuilder("myJob", 3).cron("0/5 * * * * ?").eventTraceRdbDataSource("dataSource") // 配置數據源.build()); }
5. 故障轉移
java
JobConfiguration.newBuilder("myJob", 3).cron("0/5 * * * * ?").failover(true) // 開啟故障轉移.build()
高級配置
1. 作業監聽器
java
public class MyJobListener implements ElasticJobListener {@Overridepublic void beforeJobExecuted(ShardingContexts contexts) {System.out.println("作業開始執行: " + contexts.getJobName());}@Overridepublic void afterJobExecuted(ShardingContexts contexts) {System.out.println("作業執行完成: " + contexts.getJobName());} }// 注冊監聽器 @Bean public ElasticJobListener myJobListener() {return new MyJobListener(); }// 配置到任務 JobConfiguration.newBuilder("myJob", 3).jobListenerTypes("myJobListener") // 監聽器Bean名稱.build()
2. 作業錯誤處理策略
java
JobConfiguration.newBuilder("myJob", 3).jobErrorHandlerType("LOG") // 錯誤處理策略.build()
支持策略:
-
LOG
:記錄日志不中斷作業 -
THROW
:拋出異常中斷作業 -
IGNORE
:忽略異常 -
EMAIL
:發送郵件(需自定義)
3. 自定義錯誤處理器
java
public class MyErrorHandler implements JobErrorHandler {@Overridepublic void handleException(String jobName, Throwable cause) {// 自定義錯誤處理邏輯sendAlert(jobName, cause);} }// 注冊為Bean @Bean public JobErrorHandler myErrorHandler() {return new MyErrorHandler(); }// 配置到任務 JobConfiguration.newBuilder("myJob", 3).jobErrorHandlerType("myErrorHandler").build()
Spring Boot 自動配置
1. 使用?@ElasticJobScheduler
?注解
java
@ElasticJobScheduler(name = "myAutoJob", cron = "0/10 * * * * ?",shardingTotalCount = 3,jobParameters = "autoParam",overwrite = true ) public class MyAutoJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {System.out.println("自動配置任務執行");} }
2. 配置文件配置
yaml
elasticjob:jobs:myConfigJob:elastic-job-class: com.example.MySimpleJobcron: 0/5 * * * * ?sharding-total-count: 3sharding-item-parameters: 0=A,1=B,2=Cjob-parameters: configParamoverwrite: truejob-error-handler-type: LOG
運維管理
1. 控制臺安裝
-
下載?ElasticJob-Lite-UI
-
啟動:
bash
tar -zxvf apache-shardingsphere-elasticjob-${version}-lite-ui-bin.tar.gz cd apache-shardingsphere-elasticjob-${version}-lite-ui-bin bin/start.sh
2. 常用 REST API
-
查看作業狀態:
GET /api/jobs/{jobName}/status
-
觸發作業執行:
POST /api/jobs/{jobName}/trigger
-
禁用作業:
POST /api/jobs/{jobName}/disable
-
啟用作業:
POST /api/jobs/{jobName}/enable
最佳實踐
1. 分片策略設計
java
public void execute(ShardingContext context) {// 1. 獲取所有分片數據List<Long> allIds = getAllDataIds();// 2. 計算分片處理的數據范圍List<Long> shardIds = allIds.stream().filter(id -> id % context.getShardingTotalCount() == context.getShardingItem()).collect(Collectors.toList());// 3. 處理分片數據processShardData(shardIds); }
2. 數據庫分片示例
java
@Autowired private OrderRepository orderRepository;public void execute(ShardingContext context) {// 根據分片查詢數據Pageable pageable = PageRequest.of(0, 100);int mod = context.getShardingTotalCount();int remainder = context.getShardingItem();List<Order> orders = orderRepository.findByStatusAndShard("PENDING",mod,remainder,pageable);// 處理訂單processOrders(orders); }// Repository 接口 @Query("SELECT o FROM Order o WHERE o.status = :status " + "AND MOD(o.id, :mod) = :remainder") List<Order> findByStatusAndShard(@Param("status") String status,@Param("mod") int mod,@Param("remainder") int remainder,Pageable pageable);
3. 冪等性設計
java
public void processOrder(Order order) {// 1. 檢查是否已處理if (order.isProcessed()) {return;}// 2. 加分布式鎖try (Lock lock = lockService.acquireLock("order_" + order.getId())) {// 3. 再次檢查(雙重檢查)if (orderRepository.isProcessed(order.getId())) {return;}// 4. 處理業務process(order);// 5. 更新狀態order.markAsProcessed();orderRepository.save(order);} }
常見問題解決
1. Zookeeper 連接問題
yaml
# 增加連接超時配置 elasticjob:reg-center:server-lists: localhost:2181namespace: elasticjob-demobase-sleep-time-milliseconds: 1000max-sleep-time-milliseconds: 3000max-retries: 3
2. 任務不執行檢查
-
確認 Zookeeper 是否正常運行
-
檢查作業配置是否同步到注冊中心
-
查看節點日志是否有異常
-
確認作業是否被禁用
3. 分片不均問題
使用自定義分片策略:
java
public class CustomShardingStrategy implements JobShardingStrategy {@Overridepublic Map<String, List<Integer>> sharding(List<String> servers, String jobName, int shardingTotalCount) {// 自定義分片邏輯Map<String, List<Integer>> result = new LinkedHashMap<>();for (int i = 0; i < servers.size(); i++) {result.put(servers.get(i), new ArrayList<>());}for (int i = 0; i < shardingTotalCount; i++) {int index = i % servers.size();result.get(servers.get(index)).add(i);}return result;} }// 配置使用 JobConfiguration.newBuilder("myJob", 3).jobShardingStrategyType("com.example.CustomShardingStrategy").build()
總結
ElasticJob 使用核心步驟:
-
添加依賴:引入 ElasticJob 和 Zookeeper 依賴
-
配置注冊中心:在配置文件中設置 Zookeeper 地址
-
實現任務:根據需求選擇 SimpleJob/DataflowJob/ScriptJob
-
配置任務:通過 Java Config 或注解配置任務參數
-
分片設計:合理設計分片策略處理分布式數據
-
運維管理:使用控制臺或 API 管理任務
關鍵優勢:
-
分布式調度:支持任務在集群中分片執行
-
彈性擴容:自動感知節點變化并重新分片
-
故障轉移:自動轉移失敗任務分片
-
作業治理:提供豐富的運維管理功能
生產環境建議:
-
為 Zookeeper 配置集群(至少3節點)
-
重要任務配置事件追蹤和監控報警
-
定期檢查控制臺的任務執行情況
-
設計任務時考慮冪等性和事務一致性
通過合理使用 ElasticJob,可以構建高可靠、可擴展的分布式任務調度系統,有效解決傳統定時任務在分布式環境中的各種問題。