ShardingSphere實戰
文章目錄
- ShardingSphere實戰
- 分庫分表實戰
- 建表
- 建表sql
- 利用存儲過程建表
- Sharding-jdbc分庫分表配置
- 基于業務的Sharding-key考慮
- 訂單id
- 用戶id
- 分片策略
- 訂單id的設計與實現
- **設計思想**:
- 設計思路:
- 具體分片策略實現
- 測試數據插入
- 商戶
- 商品
- 用戶
- 訂單和訂單詳情
- 如何批量新增數據
- Mybatis的批量插入
- 線程池開啟多線程執行插入
- 關于查詢
- 根據用戶Id查詢
- 根據訂單id查詢
- 分頁查詢模擬
- 有分片鍵的情況
- 無分片鍵的情況
- 全局查詢法
- 禁止跳躍查詢
- 商戶端查詢
- 索引法(冗余)
- ES異構
系統每日數據增量龐大,如何解決龐大數據帶來的數據庫性能瓶頸?
假設現在系統初始設計日增量十萬,而數據是每半年做一次歸檔。則半年內,數據能達到180 * 10W = 1800W。
當系統運行到一定時間后,日增量達到一百萬,則半年內數據達到180 * 100W =18000W,也就是1.8個億。
這么龐大的數據量,單表無法承受那么大壓力。這個時候就要考慮分庫分表了。假設保持單表數據最多100W的情況下,這需要180張表容納下這半年的數據。
分庫分表實戰
建表
為了模擬廣播表和綁定表,選用Mysql數據庫,這里共設計了5張表,分別為
-
用戶表: user,這里設置成廣播表,實際用戶數量龐大,需要分表
-
商戶表:business 這里設置成廣播表
-
商品表: product 這里設置成廣播表
-
訂單表:orders 主要數據表,需要分表,需要根據用戶Id和訂單Id進行分片策略分表
-
訂單詳情表: order_items 主要數據表,需要分表,和訂單表orders是綁定表關系,根據用戶Id和訂單Id進行分片策略分表。
演示共準備了三個數據庫,orders和order_items均分成0…31,分別共3*32=96張表。
建表sql
各表類型字段如下:
-- 商品表,模擬廣播表
create table product(product_id bigint primary key,product_name varchar(255)
);-- 用戶表,暫時不分表,模擬廣播表
create table user(user_id BIGINT PRIMARY KEY,user_name varchar(55) not null
);
-- 商戶表,分表
create table business(business_id bigint primary key,business_name varchar(255) not null
);-- 訂單表,分庫分表,和訂單詳情表是綁定表
CREATE TABLE orders (order_id BIGINT PRIMARY KEY,user_id BIGINT,business_id BIGINT,order_date DATE
);-- 訂單詳情表,分庫分表
CREATE TABLE order_items (item_id BIGINT PRIMARY KEY,order_id BIGINT,product_id BIGINT
);
利用存儲過程建表
分別在三個數據庫中執行以下sql語句
drop database IF EXISTS shardingdb ;create database shardingdb;
use shardingdb;-- 商品表,模擬廣播表
create table product(product_id bigint primary key,product_name varchar(255));-- 用戶表,暫時不分表,模擬廣播表
create table user(user_id BIGINT PRIMARY KEY,user_name varchar(55) not null
);
-- 商戶表,暫時不分表,模擬廣播表
create table business(business_id bigint primary key,business_name varchar(255) not null
);-- 如果存儲過程已存在,先刪除
DROP PROCEDURE IF EXISTS `createTables`;CREATE PROCEDURE `createTables`()
BEGINDECLARE `@i` int(11);DECLARE `@createSql` VARCHAR(2560);DECLARE `@createIndexSql1` VARCHAR(2560);DECLARE `@createIndexSql2` VARCHAR(2560);DECLARE `@createIndexSql3` VARCHAR(2560);set `@i`=0;WHILE `@i`<=31 DO SET @createSql = CONCAT('CREATE TABLE IF NOT EXISTS orders_',`@i`,'(`order_id` BIGINT NOT NULL COMMENT \'訂單id\',`user_id` BIGINT COMMENT \'用戶名\',`business_id` BIGINT COMMENT \'商戶id\',`order_date` date,PRIMARY KEY (`order_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin');prepare stmt from @createSql;execute stmt;SET `@i`= `@i`+1; END WHILE;
END;-- 如果存儲過程已存在,先刪除
DROP PROCEDURE IF EXISTS `createTables2`;CREATE PROCEDURE `createTables2`()
BEGINDECLARE `@i` int(11);DECLARE `@createSql` VARCHAR(2560);DECLARE `@createIndexSql1` VARCHAR(2560);DECLARE `@createIndexSql2` VARCHAR(2560);DECLARE `@createIndexSql3` VARCHAR(2560);set `@i`=0;WHILE `@i`<=31 DO SET @createSql = CONCAT('CREATE TABLE IF NOT EXISTS order_items_',`@i`,'(`item_id` BIGINT NOT NULL COMMENT \'訂單詳情id\',`order_id` BIGINT COMMENT \'訂單id\',`user_id` BIGINT COMMENT \'用戶id\',`product_id` BIGINT COMMENT \'商戶id\',PRIMARY KEY (`item_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin');prepare stmt from @createSql;execute stmt;SET `@i`= `@i`+1; END WHILE;
END;-- 查詢存儲過程
SHOW CREATE PROCEDURE `createTables`;-- 調用存儲過程創建表
CALL createTables();-- 查詢存儲過程
SHOW CREATE PROCEDURE `createTables2`;-- 調用存儲過程創建表
CALL createTables2();
注意:由于我是用navicat直接執行,所以沒有分隔符語句,如果需要在mysql命令行執行,則需要定義分隔符
,如何定義分隔符,利用DELIMITER $$,替代mysql默認的分號作為分隔符。這在創建存儲過程的時候,防止存儲過程語句中的;結尾被mysql直接判定語句結束,并執行。。
最終表創建結果如下:
Sharding-jdbc分庫分表配置
- 創建springboot應用,并引入以下依賴
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.23</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-spring-boot-starter</artifactId><version>4.1.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
- 分別創建entity實體和對應的mapper,具體代碼就不列出來了。
- 分庫分表配置
- 配置m1,m2,m3三個數據源
- 配置廣播表user,business,product
- 配置分片表orders,order_items,并配置綁定關系
spring:shardingsphere:datasource:names: m1,m2,m3m1:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://47.109.94.124:3306/shardingdb?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=trueusername: rootpassword: 123456m2:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://47.109.188.99:3306/shardingdb?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=trueusername: rootpassword: 123456m3:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.56.102:3306/shardingdb?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=trueusername: rootpassword: 123456sharding:tables:user:actual-data-nodes: m1.user,m2.user,m3.userproduct:actual-data-nodes: m1.product,m2.product,m3.productbusiness:actual-data-nodes: m1.business,m2.business,m3.businessorders:actual-data-nodes: m$->{1..3}.orders_$->{0..31}database-strategy:complex:sharding-columns: order_id,user_idalgorithm-class-name: cn.axj.sharding.ShardingAlgorithmConfig.OrdersDatabaseComplexAlgorithmtable-strategy:complex:sharding-columns: order_id,user_idalgorithm-class-name: cn.axj.sharding.ShardingAlgorithmConfig.OrdersTableComplexAlgorithmorder_items:actual-data-nodes: m$->{1..3}.order_items_$->{0..31}key-generator:column: item_idtype: SNOWFLAKEprops:worker:id: 1database-strategy:complex:sharding-columns: order_id,user_idalgorithm-class-name: cn.axj.sharding.ShardingAlgorithmConfig.OrdersDatabaseComplexAlgorithmtable-strategy:complex:sharding-columns: order_id,user_idalgorithm-class-name: cn.axj.sharding.ShardingAlgorithmConfig.OrdersTableComplexAlgorithmbroadcast-tables:- user- product- businessbinding-tables:- orders,order_itemsprops:sql:#開啟sp sql日志show: true
分片策略下面講
基于業務的Sharding-key考慮
訂單id
若根據訂單id查詢訂單,則可以直接將訂單id作為分片鍵,這樣通過訂單id查詢訂單的時候,會快速定位到當前訂單id歸檔的表中去。
有一種場景,用戶需要查詢自己所有的訂單,這個時候沒有訂單id,怎么快速查詢到單個用戶的訂單?如果只用訂單id作為分片鍵,則這種情況勢必要進行全庫全表掃描。
用戶id
用戶端,用戶在查詢自己訂單的時候,需要的就是性能,不可能從忙忙分表中查詢用戶的訂單。
如果直接將用戶id作為分片鍵,則單個用戶所有的訂單,會落到某一個庫某一張表中去。這樣解決了用戶端查詢自己訂單。
當用戶想根據訂單id去查詢的時候呢?如果只根據用戶id去分片,則通過訂單id查詢的時候,勢必要全表去掃描。
分片策略
通過將用戶id集成到訂單id中,可以解決通過用戶id查詢和訂單id查詢的問題。
具體實現
- 具體的分片邏輯是使用用戶id進行分片。
- 當使用用戶id查詢的時候,可以快速定位到數據表
- 當使用訂單id的時候,解析出用戶id,也可以快速定位到數據表。
- 采用ShardingSphere的復合模式,使用用戶id和訂單id作為分片鍵
訂單id的設計與實現
設計思想:
設計一個64位的唯一ID,包含時間戳、用戶ID和序列號,同時確保生成的ID不為負數。
設計思路:
-
時間戳部分(22位): 使用毫秒級時間戳,并確保時間戳足夠長以覆蓋你預期的時間范圍。
-
用戶ID部分(32位): 用戶ID需要占據較大的位數,以保證足夠的唯一性。32位的用戶id可以容納40多億的用戶,已經足夠使用,具體根據系統的預期,可以將時間戳和用戶id對應的位數進行調整。
-
序列號部分(12位): 序列號用于解決同一毫秒內生成多個ID時的唯一性問題。12位的序列號表示,能生成4096個不同的訂單id,這里生成規則不能和用戶id綁定,應該是所有的用戶共享。
-
確保ID非負數: 使用無符號數或者適當的移位操作確保生成的ID不為負數。
-
解析用戶id:需要通過訂單id,解析出用戶的id
訂單id生成規則如下:
package cn.axj.sharding.util;/*** @author aoxiaojun* @date 2024/7/5 10:43**/
public class OrderGenerator {// 起始時間戳,可以根據需要設置private static final long EPOCH = 1625097600000L; // 2021-07-01 00:00:00 UTC// 每部分占據的位數private static final int TIMESTAMP_BITS = 22;private static final int USER_ID_BITS = 32;private static final int SEQUENCE_BITS = 12;// 最大值private static final long MAX_USER_ID = (1L << USER_ID_BITS) - 1;private static final long MAX_SEQUENCE = (1L << SEQUENCE_BITS) - 1;// 位偏移量private static final int USER_ID_SHIFT = SEQUENCE_BITS;private static final int TIMESTAMP_SHIFT = USER_ID_BITS + SEQUENCE_BITS;// 上次生成ID的時間戳private static long lastTimestamp = -1L;// 當前序列號private static long sequence = 0L;// 生成唯一IDpublic static synchronized long generateUniqueId(long userId) {long currentTimestamp = System.currentTimeMillis();if (currentTimestamp < lastTimestamp) {throw new RuntimeException("Clock moved backwards. Refusing to generate ID.");}if (lastTimestamp == currentTimestamp) {sequence = (sequence + 1) & MAX_SEQUENCE;if (sequence == 0) {// 序列號溢出,等待下一個毫秒currentTimestamp = waitNextMillis(lastTimestamp);}} else {sequence = 0L; // 重置序列號}lastTimestamp = currentTimestamp;long timestamp = currentTimestamp - EPOCH;// 移位操作組合生成IDlong uniqueId = (timestamp << TIMESTAMP_SHIFT) | (userId << USER_ID_SHIFT) | sequence;return uniqueId & Long.MAX_VALUE; // 確保ID非負數}// 解析用戶IDpublic static long getUserIdFromUniqueId(long uniqueId) {return (uniqueId >> USER_ID_SHIFT) & MAX_USER_ID;}// 等待直到下一個毫秒private static long waitNextMillis(long lastTimestamp) {long currentTimestamp = System.currentTimeMillis();while (currentTimestamp <= lastTimestamp) {currentTimestamp = System.currentTimeMillis();}return currentTimestamp;}public static void main(String[] args) {long userId = 1234567890L;long uniqueId = generateUniqueId(userId);System.out.println("Generated Unique ID: " + uniqueId);long parsedUserId = getUserIdFromUniqueId(uniqueId);System.out.println("Parsed User ID: " + parsedUserId);}
}
用戶id的生成規則如下:
public class DistributedUserIDGenerator {private static final long EPOCH = 1625097600000L; // 2021-07-01 00:00:00 UTCprivate static final int RANDOM_BITS = 32 - 13; // Use 13 bits for timestamp (enough until 2136)private static final SecureRandom random = new SecureRandom();public static long generateUserId() {// Current timestamp in millisecondslong currentTimestamp = System.currentTimeMillis();// Convert timestamp to seconds since epochlong secondsSinceEpoch = (currentTimestamp - EPOCH) / 1000;// Generate random bitsint randomBits = random.nextInt(1 << RANDOM_BITS);// Combine timestamp and random bitslong userId = (secondsSinceEpoch << RANDOM_BITS) | randomBits;return userId & Long.MAX_VALUE; // Ensure non-negative user ID}public static void main(String[] args) {long userId = generateUserId();System.out.println("Generated Distributed User ID: " + userId);}
}
具體分片策略實現
- Orders和order_items的分庫策略實現邏輯,根據用戶id或者通過訂單id解析出用戶id,通過對用戶id取數據源數量的模+1,可以分片到不同的數據源中。
public class OrdersDatabaseComplexAlgorithm implements ComplexKeysShardingAlgorithm<Long> {@Overridepublic Collection<String> doSharding(Collection<String> availableTargetNames, ComplexKeysShardingValue<Long> complexKeysShardingValue) {Long orderId = null;Collection<Long> orderId1 = complexKeysShardingValue.getColumnNameAndShardingValuesMap().get("order_id");if(!CollectionUtils.isEmpty(orderId1)){orderId = orderId1.iterator().next();}Long userId = null;Collection<Long> userIdColl = complexKeysShardingValue.getColumnNameAndShardingValuesMap().get("user_id");if(!CollectionUtils.isEmpty(userIdColl)){userId = userIdColl.iterator().next();}//如果要根據用戶去查找所有的訂單,怎么辦?//select * from orders where user_id = ?//orderId和用戶idif(Objects.nonNull(userId)) {String dataSourceName = "m" + ((userId) % availableTargetNames.size() + 1);for (String targetName : availableTargetNames) {if (targetName.endsWith(dataSourceName)) {return Collections.singleton(targetName); // 返回匹配的數據庫名}}}if(orderId != null){//截取orderId中的userId信息//截取orderId中的userId信息userId = OrderGenerator.getUserIdFromUniqueId(orderId);String dataSourceName = "m" + ((userId) % availableTargetNames.size() + 1);for (String targetName : availableTargetNames) {if (targetName.endsWith(dataSourceName)) {return Collections.singleton(targetName); // 返回匹配的數據庫名}}}Map<String, Range<Long>> columnNameAndRangeValuesMap = complexKeysShardingValue.getColumnNameAndRangeValuesMap();Range<Long> range = columnNameAndRangeValuesMap.get("order_id");if(Objects.nonNull(range)){return availableTargetNames;}throw new IllegalArgumentException("No precise sharding available for " + complexKeysShardingValue);}
}
- Orders和order_items的分庫策略實現邏輯,根據用戶id或者通過訂單id解析出用戶id,通過對用戶id取所有的分表數量的模,可以分片到不同的數據表中。
public class OrdersTableComplexAlgorithm implements ComplexKeysShardingAlgorithm<Long> {@Overridepublic Collection<String> doSharding(Collection<String> availableTargetNames, ComplexKeysShardingValue<Long> complexKeysShardingValue) {Long orderId = null;Collection<Long> orderId1 = complexKeysShardingValue.getColumnNameAndShardingValuesMap().get("order_id");if(!CollectionUtils.isEmpty(orderId1)){orderId = orderId1.iterator().next();}Long userId = null;Collection<Long> userIdColl = complexKeysShardingValue.getColumnNameAndShardingValuesMap().get("user_id");if(!CollectionUtils.isEmpty(userIdColl)){userId = userIdColl.iterator().next();}if(userId != null){String tableName = complexKeysShardingValue.getLogicTableName() + "_" + ((userId) % availableTargetNames.size());if(availableTargetNames.contains(tableName)){return Collections.singleton(tableName);}}if(orderId != null){//截取orderId中的userId信息userId = OrderGenerator.getUserIdFromUniqueId(orderId);String tableName = complexKeysShardingValue.getLogicTableName() + "_" + ((userId) % availableTargetNames.size());if(availableTargetNames.contains(tableName)){return Collections.singleton(tableName);}}Map<String, Range<Long>> columnNameAndRangeValuesMap = complexKeysShardingValue.getColumnNameAndRangeValuesMap();Range<Long> range = columnNameAndRangeValuesMap.get("order_id");if(Objects.nonNull(range)){return availableTargetNames;}throw new IllegalArgumentException("No precise sharding available for " + complexKeysShardingValue);}
}
至此,分庫分表相關邏輯已經完成。
測試數據插入
商戶
模擬200個商戶,并直接插入
@SpringBootTest
@RunWith(SpringRunner.class)
public class BusienssTest {@Resourceprivate BusinessMapper businessMapper;@Testpublic void addBusiness(){for (int i = 0; i < 200; i++) {Business business = new Business();business.setBusinessId(i+2L);business.setBusinessName("商戶" + i + 2);businessMapper.insert(business);}}
}
由于廣播表的緣故,所有的庫中都會插入
商品
模擬20個商品,并插入
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProductTest {@Resourceprivate ProductMapper productMapper;@Testpublic void addProduct(){for (int i = 0; i < 20; i++) {Product product = new Product();product.setProductId(i + 2L);product.setProductName("測試商品" + i+2);productMapper.insert(product);}}
}
由于廣播表的緣故,所有的庫中都會插入
用戶
模擬插入10000個用戶
@SpringBootTest
@RunWith(SpringRunner.class)
public class UserTest {@Resourceprivate UserMapper userMapper;@Testpublic void addUser(){for (int i = 0; i <10000; i++) {User user = new User();user.setUserId(DistributedUserIDGenerator.generateUserId());user.setUsername("用戶" + 5000 + i);userMapper.insert(user);}}@Testpublic void queryUser(){QueryWrapper<User> userQueryWrapper = new QueryWrapper<>();List<User> users = userMapper.selectList(userQueryWrapper);System.out.println(users.size());}
}
由于廣播表的緣故,所有的庫中都會插入
訂單和訂單詳情
利用mapper.insert形式插入1000條數據
@Testpublic void forInsertOrders(){QueryWrapper<User> userQueryWrapper = new QueryWrapper<>();List<User> users = userMapper.selectList(userQueryWrapper);QueryWrapper<Product> productQueryWrapper = new QueryWrapper<>();List<Product> products = productMapper.selectList(productQueryWrapper);QueryWrapper<Business> businessQueryWrapper = new QueryWrapper<>();List<Business> businesses = businessMapper.selectList(businessQueryWrapper);long l = System.currentTimeMillis();int total = 1000;for (int i = 0; i < total; i++) {Orders orders = new Orders();int iusers = ThreadLocalRandom.current().nextInt(users.size());orders.setUserId(users.get(iusers).getUserId());orders.setOrderId(OrderGenerator.generateUniqueId(users.get(iusers).getUserId()));int lbusiness = ThreadLocalRandom.current().nextInt(0, businesses.size());orders.setBusinessId(businesses.get(lbusiness).getBusinessId());orders.setOrderDate(new Date());Long orderId = orders.getOrderId();OrderItems orderItems = new OrderItems();orderItems.setOrderId(orderId);orderItems.setUserId(users.get(iusers).getUserId());int lproduct = ThreadLocalRandom.current().nextInt(0, products.size());orderItems.setProductId(products.get(lproduct).getProductId());ordersMapper.insert(orders);orderItemsMapper.insert(orderItems);}System.out.println("利用Mybatis insert插入"+total+"數據,耗時:" + (System.currentTimeMillis() - l));}
利用Mybatis insert插入1000數據,耗時:80649
可以看到通過mapper.insert的方式直接插入,僅僅1000條就需要80秒之多。這在大批量插入數據的時候,簡直是要折磨死人。
這是因為mapper.insert每次執行都需要反反復復去獲取數據庫連接,并關閉數據庫連接,這是非常耗時的。
如何批量新增數據
Mybatis的批量插入
利用Mybatis的sqlSessionFactory開啟執行批處理命令,打包所有的命令,并獲取一次數據庫連接,一次執行,最后commit的方式,可以有效的降低數據庫連接獲取和關閉消耗的時間
@Testpublic void batchInsertBySqlSession(){QueryWrapper<User> userQueryWrapper = new QueryWrapper<>();List<User> users = userMapper.selectList(userQueryWrapper);QueryWrapper<Product> productQueryWrapper = new QueryWrapper<>();List<Product> products = productMapper.selectList(productQueryWrapper);QueryWrapper<Business> businessQueryWrapper = new QueryWrapper<>();List<Business> businesses = businessMapper.selectList(businessQueryWrapper);long l = System.currentTimeMillis();int total = 1000;List<Orders> ordersList = new ArrayList<>();List<OrderItems> orderItemsList = new ArrayList<>();for (int i = 0; i < total; i++) {Orders orders = new Orders();int iusers = ThreadLocalRandom.current().nextInt(users.size());orders.setUserId(users.get(iusers).getUserId());orders.setOrderId(OrderGenerator.generateUniqueId(users.get(iusers).getUserId()));int lbusiness = ThreadLocalRandom.current().nextInt(0, businesses.size());orders.setBusinessId(businesses.get(lbusiness).getBusinessId());orders.setOrderDate(new Date());Long orderId = orders.getOrderId();OrderItems orderItems = new OrderItems();orderItems.setOrderId(orderId);orderItems.setUserId(users.get(iusers).getUserId());int lproduct = ThreadLocalRandom.current().nextInt(0, products.size());orderItems.setProductId(products.get(lproduct).getProductId());ordersList.add(orders);orderItemsList.add(orderItems);}try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) {OrdersMapper ordersMapper1 = sqlSession.getMapper(OrdersMapper.class);OrderItemsMapper orderItemsMapper1 = sqlSession.getMapper(OrderItemsMapper.class);for (Orders orders : ordersList) {ordersMapper1.insert(orders);}for (OrderItems orderItems : orderItemsList) {orderItemsMapper1.insert(orderItems);}sqlSession.commit();}System.out.println("利用Mybatis 批處理插入"+total+"數據,耗時:" + (System.currentTimeMillis() - l));}
利用Mybatis 批處理插入1000數據,耗時:11085
利用批處理,插入1000條數據,耗時11秒。比之前的80秒有很大的提升
線程池開啟多線程執行插入
利用線程池,當訂單數量達到一個特定的值,開啟新線程執行插入數據。這樣可以將特定的訂單,分成多個任務同步執行插入
- 構建線程池
@Component
public class ThreadPool {private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,48,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));public void execute(Runnable command){threadPoolExecutor.execute(command);}public void shutdown(){threadPoolExecutor.shutdown();}/*** 等待所有的任務執行關閉*/public void awaitTermination(){try {threadPoolExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
- 構建插入批量插入任務
private static class BatchInsertTask implements Runnable {private final List<Orders> ordersList;private final List<OrderItems> orderItemsList;public BatchInsertTask(List<Orders> ordersList, List<OrderItems> orderItemsList) {this.ordersList = ordersList;this.orderItemsList = orderItemsList;}@Overridepublic void run() {// 在這里執行批量插入操作// ...try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) {OrdersMapper ordersMapper1 = sqlSession.getMapper(OrdersMapper.class);OrderItemsMapper orderItemsMapper1 = sqlSession.getMapper(OrderItemsMapper.class);for (Orders orders : ordersList) {ordersMapper1.insert(orders);}for (OrderItems orderItems : orderItemsList) {orderItemsMapper1.insert(orderItems);}sqlSession.commit();}}
}
- 執行插入
public class CopyUtils {// 深拷貝列表方法:通過序列化和反序列化實現public static <T extends Serializable> List<T> deepCopyList(List<T> originalList) throws IOException, ClassNotFoundException {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(originalList);oos.flush();oos.close();ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());ObjectInputStream ois = new ObjectInputStream(bis);@SuppressWarnings("unchecked")List<T> copiedList = (List<T>) ois.readObject();ois.close();return copiedList;}
}
@Testpublic void batchInsertOrders() throws IOException, ClassNotFoundException {QueryWrapper<User> userQueryWrapper = new QueryWrapper<>();List<User> users = userMapper.selectList(userQueryWrapper);QueryWrapper<Product> productQueryWrapper = new QueryWrapper<>();List<Product> products = productMapper.selectList(productQueryWrapper);QueryWrapper<Business> businessQueryWrapper = new QueryWrapper<>();List<Business> businesses = businessMapper.selectList(businessQueryWrapper);List<Orders> ordersList = new ArrayList<>();List<OrderItems> orderItemsList = new ArrayList<>();int total = 1000;long start = System.currentTimeMillis();for (int i = 1; i <= total; i++) {Orders orders = new Orders();int iusers = ThreadLocalRandom.current().nextInt(users.size());orders.setUserId(users.get(iusers).getUserId());orders.setOrderId(OrderGenerator.generateUniqueId(users.get(iusers).getUserId()));int lbusiness = ThreadLocalRandom.current().nextInt(0, businesses.size());orders.setBusinessId(businesses.get(lbusiness).getBusinessId());orders.setOrderDate(new Date());ordersList.add(orders);Long orderId = orders.getOrderId();OrderItems orderItems = new OrderItems();orderItems.setOrderId(orderId);orderItems.setUserId(users.get(iusers).getUserId());int l = ThreadLocalRandom.current().nextInt(0, products.size());orderItems.setProductId(products.get(l).getProductId());orderItemsList.add(orderItems);if(i % 100 == 0){//這里需要用到對象的深拷貝List<Orders> ordersNewList = CopyUtils.deepCopyList(ordersList);List<OrderItems> orderItemsNewList = CopyUtils.deepCopyList(orderItemsList);threadPool.execute(new BatchInsertTask(ordersNewList, orderItemsNewList));ordersList = new ArrayList<>();orderItemsList = new ArrayList<>();}}//等待線程池執行完畢threadPool.shutdown();threadPool.awaitTermination();System.out.println("利用線程池結合Mybatis 批處理插入"+total+"數據,耗時:" + (System.currentTimeMillis() - start));}
利用線程池結合Mybatis 批處理插入1000數據,耗時:4399
可以看到插入耗時,4秒多,比之前又提升了不少。
里面有個深拷貝代碼如下
測試插入10000條,分十次執行需要耗時(24秒多)
利用線程池結合Mybatis 批處理插入10000數據,耗時:24984
關于查詢
準備好700萬的訂單數據,平均每張表在7萬多點。
根據用戶Id查詢
@Testpublic void queryByUser(){long start = System.currentTimeMillis();QueryWrapper<Orders> ordersQueryWrapper = new QueryWrapper<>();ordersQueryWrapper.eq("user_id", 49839282348229L);List<Orders> orders = ordersMapper.selectList(ordersQueryWrapper);System.out.println("花費時間 : \t" + (System.currentTimeMillis() - start));for (Orders order : orders) {System.out.println(order);}}
2024-07-06 02:07:27.131 INFO 14108 --- [ main] ShardingSphere-SQL : Logic SQL: SELECT order_id,user_id,business_id,order_date FROM orders WHERE (user_id = ?)
2024-07-06 02:07:27.131 INFO 14108 --- [ main] ShardingSphere-SQL : Actual SQL: m2 ::: SELECT order_id,user_id,business_id,order_date FROM orders_5 WHERE (user_id = ?) ::: [49839282348229]
花費時間 : 728
根據ShardingSphere日志,可以看到通過用戶Id去查詢的時候,會精準定位到m2,orders_5表中查詢,查詢耗時728毫秒
根據訂單id查詢
@Testpublic void queryByOrderId(){long l = System.currentTimeMillis();QueryWrapper<Orders> ordersQueryWrapper = new QueryWrapper<>();ordersQueryWrapper.eq("order_id", 2700119196034437376L);List<Orders> orders = ordersMapper.selectList(ordersQueryWrapper);System.out.println("花費時間 : \t" + (System.currentTimeMillis() - l));for (Orders order : orders) {System.out.println(order);}}
2024-07-06 02:09:31.991 INFO 5560 --- [ main] ShardingSphere-SQL : Logic SQL: SELECT order_id,user_id,business_id,order_date FROM orders WHERE (order_id = ?)
2024-07-06 02:09:31.992 INFO 5560 --- [ main] ShardingSphere-SQL : Actual SQL: m3 ::: SELECT order_id,user_id,business_id,order_date FROM orders_9 WHERE (order_id = ?) ::: [2700119196034437376]
花費時間 : 661
根據ShardingSphere日志,可以看到通過訂單Id去查詢的時候,會精準定位到m3,orders_9表中查詢,并且查詢花費661毫秒
分頁查詢模擬
模擬對order_id排序后,進行分頁查詢
有分片鍵的情況
public interface OrdersMapper extends BaseMapper<Orders> {@Select("select * from orders where user_id = #{userId} order by order_id limit #{limit} offset #{offset}")List<Orders> queryByUserIdAndPage(@Param("limit") int limit, @Param("offset") int offset, @Param("userId") Long userId);@Select("select * from orders order by order_id limit #{limit} offset #{offset}")List<Orders> query(@Param("limit") int limit, @Param("offset") int offset);}
在通過用戶id進行分頁查詢的時候
@Testpublic void queryByUserIdAndPage(){long start = System.currentTimeMillis();List<Orders> orders = ordersMapper.queryByUserIdAndPage(10, 0, 49839282348229L);for (Orders order : orders) {System.out.println(order);}}
2024-07-06 02:15:37.537 INFO 14288 --- [ main] ShardingSphere-SQL : Actual SQL: m2 ::: select * from orders_5 where user_id = ? order by order_id limit ? offset ? ::: [49839282348229, 10, 0]
2024-07-06 02:13:45.792 INFO 12584 --- [ main] ShardingSphere-SQL : Actual SQL: m2 ::: select * from orders_5 where user_id = ? order by order_id limit ? offset ? ::: [49839282348229, 10, 10]
可以看到,在有分片鍵的情況下,分頁是正常的單表分頁查詢
無分片鍵的情況
全局查詢法
就是將所有數據按照一定的規則進行查詢,最后聚合,篩選出需要的數據。
@Testpublic void query(){long l = System.currentTimeMillis();List<Orders> orders = ordersMapper.query(10, 0);System.out.println("花費時間 : \t" + (System.currentTimeMillis() - l));for (Orders order : orders) {System.out.println(order);}}
- 在查詢10條,偏移量為0的時候,
2024-07-06 02:17:47.746 INFO 32340 --- [ main] ShardingSphere-SQL : Actual SQL: m1 ::: select * from orders_0 order by order_id limit ? offset ? ::: [10, 0]
2024-07-06 02:17:47.746 INFO 32340 --- [ main] ShardingSphere-SQL : Actual SQL: m1 ::: select * from orders_1 order by order_id limit ? offset ? ::: [10, 0]
2024-07-06 02:17:47.746 INFO 32340 --- [ main] ShardingSphere-SQL : Actual SQL: m1 ::: select * from orders_2 order by order_id limit ? offset ? ::: [10, 0]
.....
2024-07-06 02:17:47.747 INFO 32340 --- [ main] ShardingSphere-SQL : Actual SQL: m1 ::: select * from orders_31 order by order_id limit ? offset ? ::: [10, 0]
2024-07-06 02:17:47.747 INFO 32340 --- [ main] ShardingSphere-SQL : Actual SQL: m2 ::: select * from orders_0 order by order_id limit ? offset ? ::: [10, 0]
from orders_2 order by order_id limit ? offset ? ::: [10, 0]
2024-07-06 02:17:47.747 INFO 32340 --- [ main] ShardingSphere-SQL : Actual SQL: m2 ::: select * from orders_29 order by order_id limit ? offset ? ::: [10, 0]
...
...
2024-07-06 02:17:47.749 INFO 32340 --- [ main] ShardingSphere-SQL : Actual SQL: m3 ::: select * from orders_30 order by order_id limit ? offset ? ::: [10, 0]
2024-07-06 02:17:47.749 INFO 32340 --- [ main] ShardingSphere-SQL : Actual SQL: m3 ::: select * from orders_31 order by order_id limit ? offset ? ::: [10, 0]
花費時間 : 888
對每一張orders表進行偏移量為0,limit為10的查詢,最后通過對每一張表取出的數據進行排序,選出最小的十條數據進行返回
花費時間 888毫秒,看起來還好
- 在查詢10條,偏移量為10的時候,情況如下
對每一張orders表進行偏移量為0,limit為20的查詢,最后通過對每一張表取出的數據進行排序,選出偏移量為10,limit為10的十條數據進行返回
這是為什么呢?為什么會取每張表的前20條數據進行匯總呢?
這是由于分表的緣故,由于是偏移量10,取10條,這十條數據不能保證不在同一張表中,而為啥從偏移量0開始取呢?那是因為這十條數據中的某一條都有可能在0-10這個區間中。
花費時間 814毫秒,看起來還好。
當分頁達到很大值的時候,也就是偏移量很大的情況下,這種方式會取出所有的偏移量之前的數據,再進行排序。這樣時間耗費就會上來了
- 偏移量在50000的情況下
2024-07-06 02:27:24.316 INFO 17864 --- [ main] ShardingSphere-SQL : Actual SQL: m3 ::: select * from orders_31 order by order_id limit ? offset ? ::: [50010, 0]
花費時間 : 12899
可以看到,時間花費達到了驚人的12秒多。
全局查詢法弊端:
- 頁碼增加后,查詢效率會非常低
- 頁碼增加后,查詢出來的數據量龐大,需要對數據進行二次排序。對內存和cpu要求也非常高。
禁止跳躍查詢
為了解決全局查詢法當頁碼很大,需要查詢出龐大的數據,進行排序的痛點。
禁止跳躍查詢就是在排序規則確定后,用上一頁某個排序字段的最大或者最小值,作為下一頁的查詢條件。這樣查詢的數據會固定在一個limit的值。
如查詢
@Select("select * from orders where order_id > #{orderMin} order by order_id limit #{limit} offset #{offset}")
List<Orders> queryProhibit(@Param("limit") int limit, @Param("offset") int offset,@Param("orderMin") Long orderMin);
首先從第一頁開始查詢
@Testpublic void queryProhibitPage(){long l = System.currentTimeMillis();List<Orders> orders = ordersMapper.queryProhibit(10, 0,0L);System.out.println("花費時間 : \t" + (System.currentTimeMillis() - l));for (Orders order : orders) {System.out.println(order);}}
結果
Orders(orderId=1289014780911571066, userId=30533477, businessId=63, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289014788336590870, userId=32346226, businessId=159, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289015043814797343, userId=94718835, businessId=184, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289015212125704194, userId=135810365, businessId=146, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289015307747590238, userId=159155552, businessId=110, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289015356776222844, userId=171125433, businessId=104, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289015389011095558, userId=178995275, businessId=75, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289015446032207967, userId=192916445, businessId=85, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289015457140670521, userId=195628472, businessId=167, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289015523201130581, userId=211756514, businessId=195, orderDate=Fri Jul 05 00:00:00 CST 2024)
查詢第二頁的時候,將第一頁order_id結果的最大值,作為條件放在第二頁查詢中,此處為1289015523201130581,
@Testpublic void queryProhibitPage(){long l = System.currentTimeMillis();List<Orders> orders = ordersMapper.queryProhibit(10, 0,1289015523201130581L);System.out.println("花費時間 : \t" + (System.currentTimeMillis() - l));for (Orders order : orders) {System.out.println(order);}}
花費時間 : 955
Orders(orderId=1289015650867736645, userId=242925119, businessId=36, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289015705371156506, userId=256231618, businessId=190, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289015767379583072, userId=271370394, businessId=109, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289015796777467972, userId=278547612, businessId=74, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289015893385383950, userId=302133529, businessId=156, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289016129480106035, userId=359773842, businessId=162, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289016200532709421, userId=377120669, businessId=67, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289016244286947432, userId=387802856, businessId=123, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289016295920742499, userId=400408763, businessId=125, orderDate=Fri Jul 05 00:00:00 CST 2024)
Orders(orderId=1289016348148285461, userId=413159628, businessId=50, orderDate=Fri Jul 05 00:00:00 CST 2024)
通過實際執行sql分析
2024-07-06 15:05:23.003 INFO 32504 --- [ main] ShardingSphere-SQL : Actual SQL: m1 ::: select * from orders_28 where order_id > ? order by order_id limit ? offset ? ::: [1289015523201130581, 10, 0]
2024-07-06 15:05:23.004 INFO 32504 --- [ main] ShardingSphere-SQL : Actual SQL: m1 ::: select * from orders_29 where order_id > ? order by order_id limit ? offset ? ::: [1289015523201130581, 10, 0]
可以看到,每個數據節點查詢出的數據永遠都是十條,最后再聚合排序。
禁止跳躍查詢法弊端:
- 不能跳頁查詢,只能一頁一頁的查詢,比如說從第一頁直接跳到第五頁,因為無法獲取到第四頁的最大值,所以無法直接從第一頁獲取第五頁的數據。
商戶端查詢
在商戶端想要查詢自己所有的訂單這種業務情況下,如何根據實現?
由于沒有商戶id不是分片鍵的情況,查詢商戶自己的訂單會走全庫全表查詢。數據量一旦過大,則查詢效率會顯著下降~
索引法(冗余)
- 就是在各自的庫中創建基于商戶id和訂單id的表,用商戶id去分表
- 查詢的時候通過商戶id可以快速定位到數據節點中。
- 獲取訂單id,通過訂單id再從訂單表中去查詢所有的訂單。
ES異構
-
在訂單創建的時候,異構一份訂單數據到elasticsearch中
-
elasticsearch對于這種數據量的查詢簡直效率很高。
-
商戶端的查詢可以直接從es中獲取