在數據庫的日常使用中,會經常遇到以下場景:
- ?數據復制?:將一個或多個表中的數據復制到目標表中,可能是復制全部數據,也可能僅復制部分數據。
- 數據合并:將數據從一個表轉移到另一個表,或者將多個表的數據合并為一個表
- ?表備份?:常用于備份表數據,以防止在修改或刪除數據時出現問題,需要還原數據的情況。
- 數據加工:將當前數據按照特定規則進行處理,并將處理后的數據放入一個新的表中。
OceanBase?支持并行導入功能,其并行執行框架能夠并發執行DML語句(即Parallel DML),使得在多節點數據庫環境中能夠實現跨節點的并發寫入,同時,它還能確保大事務的一致性。現在,讓我們通過實驗來體驗OceanBase的并行導入能力。
一、準備環境
我準備了兩套環境一套單節點的,一套1-1-1的集群
單節點:8C16G
集群:4C10G*3 (吐槽一下太費資源了,試驗機快冒煙了/(ㄒoㄒ)/~~)
二、準備表結構和數據
找了一張經常使用的表修改為oceanbase腳本
表結構:
CREATE TABLE `test_bingxing` ( `fzssuuid` bigint AUTO_INCREMENT , `jcxxuuid` varchar(32) NOT NULL , `gtfwwrfzsslbdm` char(1) NULL DEFAULT NULL , `cshssmc` text NULL , `zxxzm_dm` char(16) NULL DEFAULT NULL , `gtfwly_dm` char(1) NULL DEFAULT NULL , `zhlycw` text NULL , `werjbqk` longtext NULL , `lrrq` datetime NOT NULL , `lrr_dm` char(11) NOT NULL , `xgrq` datetime NULL DEFAULT NULL , `sjgsdq` char(11) NOT NULL , `sjtb_sj` datetime NULL DEFAULT NULL ,`zhlyhcsdfg` decimal(18, 6) NULL DEFAULT NULL ,`zhlyzyfss_dm` varchar(45) NULL DEFAULT NULL , `ssbm` varchar(45) NULL DEFAULT NULL , `yxbz` char(1) NULL DEFAULT NULL , `sjblbz` decimal(2, 0) NULL DEFAULT NULL , PRIMARY KEY (fzssuuid)
) DEFAULT CHARSET = utf8mb4 ROW_FORMAT = COMPACT COMPRESSION = 'zstd_1.3.8' REPLICA_NUM = 1 BLOCK_SIZE = 16384;
準備造數腳本:
寫了一個簡單的python造數腳本僅供參考,構造500萬左右的數據
import random
import string
import pymysql
import time
import datetime
import multiprocessing
import threading # 數據生成部分------------公共------------------------------------------------------------------------------------------------------------------------------(始)
# 隨機時間 2000年-2023年
class CreateData(object):def randdatetime(self):mintime = datetime.datetime(2000, 1, 1, 0, 0, 0)maxtime = datetime.datetime(2023, 12, 31, 23, 23, 59)mintime_ts = int(time.mktime(mintime.timetuple()))maxtime_ts = int(time.mktime(maxtime.timetuple()))random_ts = random.randint(mintime_ts, maxtime_ts)randomtime = datetime.datetime.fromtimestamp(random_ts)# print(randomtime)return (randomtime)# # 獲取fundid隨機6位,乙級差不多相同會又1000左右def get_fundid(self):fundid = '25010'for ii in range(6):fundid += str(random.randint(0, 9))return fundid# 生成數據def get_one_data(self,xh):matchsno = 1000000+xh# 此處返回的數據順序最好與創建表時的結構順序一致,以便插入數據時一一對應aa=["dsdfsdf","Y",self.get_fundid(),"WRFG3452","Y",self.get_fundid(),"FTERGDFFSGHTRT324RDAFAGDA",self.randdatetime().strftime('%Y-%m-%d %H:%M:%S'),"12032032",self.randdatetime().strftime('%Y-%m-%d %H:%M:%S'),"中國",self.randdatetime().strftime('%Y-%m-%d %H:%M:%S'),"1232143.12","sadfkjjdfw231","dsfwer23Dcxf","n","1"]return aadef insert_data(tablename,*args):get_conn = pymysql.connect(host="192.168.150.117",user="banjin",password="oracle123",port=2881,database="test") # 連接數據庫get_cursor = get_conn.cursor() # 獲取游標str_sql = "insert into {0}(jcxxuuid,gtfwwrfzsslbdm,cshssmc,zxxzm_dm,gtfwly_dm,zhlycw,werjbqk,lrrq,lrr_dm,xgrq,sjgsdq,sjtb_sj,zhlyhcsdfg,zhlyzyfss_dm,ssbm,yxbz,sjblbz) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);".format(tablename) # 定義SQL語句get_cursor.executemany(str_sql,args) # 批量執行SQL語句get_conn.commit() # 提交保存數據
# # 定義一個關閉對象的方法get_conn.close()# print(str_sql)
# # 測試代碼
# #生成數據當前批量寫入,三個線程,需手工大致畫區間
if __name__ == "__main__":createdata = CreateData() # 實例化CreateData類方法list1 = []for i in range(1,5000000): # 循環1000000次,生成1000000條數據get_one = createdata.get_one_data(i) # 調用CreateData類的get_one_data方法list1.append(get_one) # 在list1列表中插入get_one列表(一行數據)start_time = time.time()print("執行程序開始時間:",start_time)p1 = multiprocessing.Process(target=insert_data,args=("`test_bingxing`",*list1[:1800000]))p2 = multiprocessing.Process(target=insert_data,args=("`test_bingxing`",*list1[1800001:3600000]))p3 = multiprocessing.Process(target=insert_data,args=("`test_bingxing`",*list1[3600001:]))p1.start()p2.start()p3.start()p1.join()p2.join()p3.join()end_time = time.time()print("執行程序結束時間:", end_time)print("執行程序總耗費時間:",end_time - start_time) #時間統計為腳本數據庫執行時間,不包含腳本生成數據時間
三、并行導入驗證
測試腳本
set ob_query_timeout = 1000000000;
set ob_trx_timeout = 1000000000; insert into test_bingxing1 select * from test_bingxing;truncate table test_bingxing1;
insert /*+ parallel(8) enable_parallel_dml */ into test_bingxing1 select * from test_bingxing;
1、單機環境
先不開啟并行的方式插入,然后清空數據添加一個 Hint,開啟 PDML 的執行選項,執行結果如下:
非并行插入時間消耗兩分多,并插入時間相差不多
2、集群環境
先不開啟并行的方式插入,然后清空數據添加一個 Hint,開啟 PDML 的執行選項,執行結果如下:
非并行插入時間消耗六分多,并插入時間差不多節約一半,(集群環境因為電腦沒有空間了,外插了一塊老的移動硬盤,寫入速度有點慢)
3、執行計劃對比
未開啟并行導入的執行計劃
開啟并行導入的執行計劃,可以看到使用了并行算子
四、總結
并行插入可以實現更高效的數據插入,因為實驗環境是虛擬機,磁盤都是一塊,性能提升沒有官網文檔說明的那么大,有條件的伙伴可以用真實環境進行測試。