Elastic-Job-Lite 是一款由 Apache 開源的輕量級分布式任務調度框架,屬于 ShardingSphere 生態體系的一部分。它專注于分布式任務調度,支持彈性伸縮、分片處理、高可用等特性,且不依賴中心化架構。
一、基礎
(一)核心特性
-
分布式協調
通過 ZooKeeper 實現作業的分布式調度和協調,確保任務在集群環境中不重復、不遺漏地執行。 -
分片機制
支持將任務拆分為多個分片(Sharding)并行執行,提升處理效率。例如:// 根據分片參數處理不同數據 int shardIndex = context.getShardingItem(); // 分片索引(0,1,2...) String shardParam = context.getShardingParameter(); // 分片參數
-
彈性伸縮
動態感知集群節點變化,自動重新分配分片。新增節點時,分片會被均勻分配到新節點;節點下線時,其分片會被其他節點接管。 -
多種作業類型
SimpleJob
:簡單任務,實現SimpleJob
接口即可。DataflowJob
:數據流任務,支持數據抓取(fetch)和處理(process)。ScriptJob
:腳本任務,支持 Shell、Python 等腳本語言。
-
失效轉移
當作業節點崩潰時,正在執行的分片會被轉移到其他節點繼續執行。 -
冪等性保障
通過 ZooKeeper 實現分布式鎖,確保同一分片在同一時間只被一個節點執行。
(二)架構設計
Elastic-Job-Lite 采用去中心化架構:
- 作業節點:直接部署在應用中,既是執行節點也是調度節點。
- 注冊中心:依賴 ZooKeeper 存儲作業元數據和運行狀態。
- 無中心化調度器:每個節點通過注冊中心協調,無需單獨的調度中心。
(三)核心概念
-
作業(Job)
任務的抽象,支持 Simple、Dataflow、Script 三種類型。 -
分片(Sharding)
將任務拆分為多個獨立的子任務,每個分片由不同的節點執行。例如:elasticjob:jobs:myJob:sharding-total-count: 3 # 總分片數sharding-item-parameters: "0=北京,1=上海,2=廣州" # 分片參數
-
注冊中心(Registry Center)
ZooKeeper 作為協調服務,存儲作業配置、運行狀態和分片信息。 -
作業實例(Job Instance)
每個作業節點啟動時會向注冊中心注冊自己,成為一個作業實例。
二、在springboot中使用Elastic-Job-Lite
(一)添加依賴
在 pom.xml
中添加 Elastic-Job-Lite 和 ZooKeeper 客戶端依賴:
<!-- Elastic-Job-Lite Spring Boot Starter -->
<dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-spring-boot-starter</artifactId><version>3.0.3</version> <!-- 最新穩定版本 -->
</dependency><!-- ZooKeeper 客戶端 -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.3.0</version>
</dependency>
(二) 配置 ZooKeeper 注冊中心
你提到的沒錯!Elastic-Job-Lite 配置 ZooKeeper 確實有三種主要方式,我之前的回答集中在 Java 代碼配置 上。現在我補充完整另外兩種方式:
1.YAML 配置(Spring Boot 自動配置)
最簡潔的方式,通過 application.yml
配置:
elasticjob:reg-center:server-lists: localhost:2181 # ZooKeeper 地址namespace: elastic-job # 命名空間base-sleep-time-milliseconds: 1000 # 初始重試等待時間max-sleep-time-milliseconds: 3000 # 最大重試等待時間max-retries: 3 # 最大重試次數digest: "" # 認證信息(可選)jobs:mySimpleJob:type: SIMPLEclass: com.example.job.MySimpleJob # 作業類路徑cron: "0/10 * * * * ?" # Cron 表達式sharding-total-count: 3 # 分片總數sharding-item-parameters: "0=A,1=B,2=C" # 分片參數overwrite: true # 覆蓋注冊中心配置
關鍵點:
elasticjob.reg-center
配置 ZooKeeper 連接信息。elasticjob.jobs
下定義具體作業,支持SIMPLE
、DATAFLOW
、SCRIPT
等類型。
2.Java 代碼配置(手動構建 Bean)
前面示例中使用的方式,適合需要靈活控制配置的場景:
@Configuration
public class JobConfig {@Bean(initMethod = "init")public ZookeeperRegistryCenter regCenter() {ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("localhost:2181", "elastic-job");return new ZookeeperRegistryCenter(zkConfig);}@Bean(initMethod = "init")public SpringJobScheduler simpleJobScheduler(MySimpleJob mySimpleJob, ZookeeperRegistryCenter regCenter) {JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("mySimpleJob", "0/10 * * * * ?", 3).shardingItemParameters("0=A,1=B,2=C").build();SimpleJobConfiguration jobConfig = new SimpleJobConfiguration(coreConfig, MySimpleJob.class.getCanonicalName());return new SpringJobScheduler(mySimpleJob, regCenter, LiteJobConfiguration.newBuilder(jobConfig).overwrite(true).build());}
}
關鍵點:
- 手動創建
ZookeeperRegistryCenter
和SpringJobScheduler
Bean。 - 通過
JobCoreConfiguration
和SimpleJobConfiguration
構建作業配置。
(三)創建簡單作業類
實現 SimpleJob
接口,定義作業邏輯:
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.springframework.stereotype.Component;@Component
public class MySimpleJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {// 獲取分片信息int shardIndex = shardingContext.getShardingItem();String shardParam = shardingContext.getShardingParameter();// 作業邏輯(根據分片參數處理不同數據)System.out.printf("分片項: %d, 參數: %s, 時間: %s%n", shardIndex, shardParam, System.currentTimeMillis());// 示例:根據分片處理不同的數據// if (shardIndex == 0) { processGroupA(); }// else if (shardIndex == 1) { processGroupB(); }}
}
(四)配置作業
使用 @ElasticSimpleJob
注解配置作業:
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.api.ElasticSimpleJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class JobConfig {@Autowiredprivate ZookeeperRegistryCenter regCenter;@Autowiredprivate MySimpleJob mySimpleJob;@Bean(initMethod = "init")public SpringJobScheduler simpleJobScheduler() {// 定義作業核心配置JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("mySimpleJob", // 作業名稱"0/10 * * * * ?", // Cron 表達式3 // 分片總數).shardingItemParameters("0=A,1=B,2=C") // 分片參數.build();// 定義 Simple 作業配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, MySimpleJob.class.getCanonicalName());// 定義 Lite 作業配置LiteJobConfiguration jobConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true) // 允許覆蓋注冊中心配置.build();// 創建作業調度器return new SpringJobScheduler(mySimpleJob, regCenter, jobConfig);}
}
(五)配置說明
參數 | 說明 |
---|---|
reg-center.server-lists | ZooKeeper 服務器地址,多個地址用逗號分隔(如 host1:2181,host2:2181 ) |
reg-center.namespace | 命名空間,用于隔離不同項目的作業配置 |
coreConfig.cron | Cron 表達式,定義作業執行時間規則 |
coreConfig.shardingTotalCount | 分片總數,決定作業拆分為多少個并行執行單元 |
coreConfig.shardingItemParameters | 分片參數,格式為 0=A,1=B,2=C ,為每個分片指定參數 |