BookKeeper的優勢
高吞吐量和低延遲
-
分布式架構: Apache BookKeeper采用分布式的架構設計,能夠支持高并發的寫入和讀取操作。
-
批量寫入: 支持批量寫入日志條目,顯著提高寫入效率。
-
異步I/O: 使用異步I/O操作,減少等待時間,提升整體性能。
數據一致性和持久性
-
強一致性保證: BookKeeper提供強一致性保證,確保所有寫入的數據都能被正確讀取。
-
多副本復制: 數據在多個Bookies(BookKeeper節點)上進行多副本復制,防止單點故障導致的數據丟失。
-
自動恢復: 在節點故障時,BookKeeper能夠自動檢測并恢復數據,確保系統的連續運行。
水平擴展能力
-
動態擴展: 可以通過增加Bookies來擴展集群規模,適應不斷增長的業務需求。
-
負載均衡: 自動分配負載,確保各節點之間的工作負載平衡,避免熱點問題。
-
靈活性: 支持多種部署方式,包括本地部署、云部署等。
數據加密和訪問控制
-
數據加密: 支持對存儲的日志數據進行加密處理,防止未授權訪問。
-
認證和授權: 提供細粒度的權限管理機制,限制不同角色的訪問權限。
-
審計日志: 記錄所有對系統的訪問和操作,便于追蹤和審計。
記得啟動ZooKeeper服務器
因為BookKeeper依賴于ZooKeeper來進行元數據管理和協調!!!
代碼實操
<project?xmlns="http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.5</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>bookkeeper-springboot-example</artifactId><version>0.0.1-SNAPSHOT</version><name>bookkeeper-springboot-example</name><description>Demo project for Spring Boot and Apache BookKeeper integration</description><properties><java.version>11</java.version></properties><dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Apache BookKeeper Client --><dependency><groupId>org.apache.bookkeeper</groupId><artifactId>bookkeeper-server</artifactId><version>4.18.0</version></dependency><!-- Jackson Databind for JSON processing --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Lombok for reducing boilerplate code --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- Test dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
application.properties
# ZooKeeper 連接字符串
bookkeeper.zk.connectString=localhost:2181
server.port=8080
配置類
package?com.example.bookkeeperspringbootexample.config;import?org.apache.bookkeeper.client.BookKeeper;
import?org.apache.bookkeeper.client.LedgerHandle;
import?org.apache.bookkeeper.conf.ClientConfiguration;
import?org.slf4j.Logger;
import?org.slf4j.LoggerFactory;
import?org.springframework.beans.factory.annotation.Value;
import?org.springframework.context.annotation.Bean;
import?org.springframework.context.annotation.Configuration;import?javax.annotation.PreDestroy;
import?java.io.IOException;@Configuration
publicclass?BookKeeperConfig?{privatestaticfinal?Logger logger = LoggerFactory.getLogger(BookKeeperConfig.class);@Value("${bookkeeper.zk.connectString}")private?String zkConnectString;private?BookKeeper bookKeeper;private?LedgerHandle ledgerHandle;/*** 初始化BookKeeper客戶端**?@return?BookKeeper實例*?@throws?IOException 如果初始化失敗*/@Beanpublic?BookKeeper?bookKeeper()?throws?IOException?{ClientConfiguration conf =?new?ClientConfiguration();conf.setZkServers(zkConnectString);bookKeeper =?new?BookKeeper(conf);logger.info("BookKeeper客戶端已初始化。");return?bookKeeper;}/*** 創建一個新的Ledger**?@param?bookKeeper BookKeeper實例*?@return?LedgerHandle實例*?@throws?Exception 如果創建Ledger失敗*/@Beanpublic?LedgerHandle?ledgerHandle(BookKeeper bookKeeper)?throws?Exception?{ledgerHandle = bookKeeper.createLedger(BookKeeper.DigestType.CRC32,"password".getBytes());logger.info("Ledger已創建,ID: {}", ledgerHandle.getId());return?ledgerHandle;}/*** 關閉BookKeeper客戶端和Ledger*/@PreDestroypublic?void?shutdown()?throws?InterruptedException, BookKeeper.BKException?{if?(ledgerHandle !=?null) {ledgerHandle.close();logger.info("Ledger已關閉。");}if?(bookKeeper !=?null) {bookKeeper.close();logger.info("BookKeeper客戶端已關閉。");}}}
交易的數據模型
package?com.example.bookkeeperspringbootexample.model;import?lombok.Data;import?java.time.LocalDateTime;/*** 表示交易的數據模型*/
@Data
public?class?Transaction?{private?Long transactionId;?// 交易IDprivate?Double amount; ? ? ?// 交易金額private?LocalDateTime timestamp;?// 時間戳
}
服務類
package?com.example.bookkeeperspringbootexample.service;import?com.example.bookkeeperspringbootexample.model.Transaction;
import?com.fasterxml.jackson.databind.ObjectMapper;
import?org.apache.bookkeeper.client.BKException;
import?org.apache.bookkeeper.client.LedgerHandle;
import?org.apache.bookkeeper.proto.BookieProtocol;
import?org.slf4j.Logger;
import?org.slf4j.LoggerFactory;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Service;import?java.io.IOException;
import?java.util.concurrent.CompletableFuture;
import?java.util.concurrent.ExecutionException;@Service
publicclass?BookKeeperService?{privatestaticfinal?Logger logger = LoggerFactory.getLogger(BookKeeperService.class);@Autowiredprivate?LedgerHandle ledgerHandle;@Autowiredprivate?ObjectMapper objectMapper;/*** 異步添加交易到BookKeeper**?@param?transaction 交易對象*?@return?CompletableFuture<Long> 包含新條目的entryId*/public?CompletableFuture<Long>?addTransaction(Transaction transaction)?{try?{byte[] logData = objectMapper.writeValueAsBytes(transaction);?// 將交易對象轉換為字節數組return?CompletableFuture.supplyAsync(() -> {try?{long?entryId = ledgerHandle.addEntry(logData);?// 將字節數組添加到Ledgerlogger.info("已添加交易,entryId: {}", entryId);return?entryId;}?catch?(BKException | InterruptedException e) {thrownew?RuntimeException(e);}});}?catch?(IOException e) {thrownew?RuntimeException(e);}}/*** 異步從BookKeeper讀取交易**?@param?entryId 條目ID*?@return?CompletableFuture<Transaction> 包含讀取的交易對象*/public?CompletableFuture<Transaction>?readTransaction(long?entryId)?{return?CompletableFuture.supplyAsync(() -> {try?{LedgerSequence seq = ledgerHandle.readEntries(entryId, entryId);?// 讀取指定entryId的條目if?(seq.hasMoreElements()) {LedgerEntry entry = seq.nextElement();?// 獲取條目byte[] data = entry.getEntryBytes();?// 獲取條目的字節數組logger.info("已讀取交易,entryId: {}", entryId);return?objectMapper.readValue(data, Transaction.class);?// 將字節數組轉換為交易對象}thrownew?IllegalArgumentException("未找到ID為 "?+ entryId +?" 的交易");}?catch?(BKException | InterruptedException | ExecutionException | IOException e) {thrownew?RuntimeException(e);}});}}
Controller
package?com.example.bookkeeperspringbootexample.controller;import?com.example.bookkeeperspringbootexample.model.Transaction;
import?com.example.bookkeeperspringbootexample.service.BookKeeperService;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.http.ResponseEntity;
import?org.springframework.web.bind.annotation.*;import?java.util.concurrent.CompletableFuture;@RestController
@RequestMapping("/transactions")
publicclass?TransactionController?{@Autowiredprivate?BookKeeperService bookKeeperService;/*** 添加新的交易**?@param?transaction 交易對象*?@return?ResponseEntity<Long> 包含新條目的entryId*/@PostMapping("/")public?ResponseEntity<Long>?addTransaction(@RequestBody Transaction transaction)?{CompletableFuture<Long> futureEntryId = bookKeeperService.addTransaction(transaction);?// 異步添加交易try?{Long entryId = futureEntryId.get();?// 獲取結果return?ResponseEntity.ok(entryId);?// 返回成功的HTTP響應}?catch?(InterruptedException | ExecutionException e) {Thread.currentThread().interrupt();?// 中斷線程return?ResponseEntity.internalServerError().build();?// 返回內部服務器錯誤}}/*** 根據entryId讀取交易**?@param?entryId 條目ID*?@return?ResponseEntity<Transaction> 包含讀取的交易對象*/@GetMapping("/{entryId}")public?ResponseEntity<Transaction>?getTransaction(@PathVariable?long?entryId)?{CompletableFuture<Transaction> futureTransaction = bookKeeperService.readTransaction(entryId);?// 異步讀取交易try?{Transaction transaction = futureTransaction.get();?// 獲取結果return?ResponseEntity.ok(transaction);?// 返回成功的HTTP響應}?catch?(InterruptedException | ExecutionException e) {Thread.currentThread().interrupt();?// 中斷線程return?ResponseEntity.notFound().build();?// 返回未找到資源}}}
Application
package?com.example.bookkeeperspringbootexample;import?org.springframework.boot.SpringApplication;
import?org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public?class?BookKeeperSpringBootExampleApplication?{public?static?void?main(String[] args)?{SpringApplication.run(BookKeeperSpringBootExampleApplication.class,?args);}}
測試
添加交易
curl -X POST http://localhost:8080/transactions/ \
-H?"Content-Type: application/json"?\
-d?'{"transactionId": 1, "amount": 100.50, "timestamp": "2025-03-19T21:36:06"}'
Respons:
1
讀取交易
curl -X GET http://localhost:8080/transactions/1
Respons:
{"transactionId":1,"amount":100.5,"timestamp":"2025-03-19T21:36:06"}