列式存儲數據庫:hbase clickhouse
簡介
ClickHouse入門
ClickHouse是俄羅斯的Yandex于2016年開源的列式存儲數據庫(DBMS),使用C++語言編寫,主要用于在線分析處理查詢(OLAP),能夠使用SQL查詢實時生成分析數據報告。
ClickHouse的特點
行式存儲如圖-1所示:
圖-1 行式存儲
列式存儲如圖-2所示:
圖-2 列式存儲
列式儲存的好處
1)對于列的聚合,計數,求和等統計操作優于行式存儲。
2)由于某一列的數據類型都是相同的,針對于數據存儲更容易進行數據壓縮,每一列選擇更優的數據壓縮算法,大大提高了數據的壓縮比重。
3)由于數據壓縮比更好,一方面節省了磁盤空間,另一方面對于 cache 也有了更大的發揮空間。
Clickhouse的特點
1)DBMS的功能:幾乎覆蓋了標準SQL的大部分語法,包括DDL和DML,以及配套的各種函數,用戶管理及權限管理,數據的備份與恢復。
2)多樣化引擎:ClickHouse和MySQL類似,把表級的存儲引擎插件化,根據表的不同需求可以設定不同的存儲引擎。目前包括合并樹、日志、接口和其他四大類20多種引擎。
3)高吞吐寫入能力:ClickHouse采用類LSM Tree的結構,數據寫入后定期在后臺Compaction。通過類LSM tree的結構,ClickHouse在數據導入時全部是順序append寫,寫入后數據段不可更改,在后臺compaction時也是多個段merge sort后順序寫回磁盤。順序寫的特性,充分利用了磁盤的吞吐能力,即便在HDD上也有著優異的寫入性能。
官方公開benchmark測試顯示能夠達到50MB/s~200MB/s的寫入吞吐能力,按照每行100Byte估算,大約相當于50W-200W條/s的寫入速度。
4)數據分區與線程級并行:ClickHouse將數據劃分為多個partition,每個partition再進一步劃分為多個index granularity(索引粒度),然后通過多個CPU核心分別處理其中的一部分來實現并行數據處理。在這種設計下,單條Query就能利用整機所有CPU。極致的并行處理能力,極大的降低了查詢延時,但是多個表聯表查詢時效率就慢了。
對于大量數據的查詢,ClickHouse能夠將查詢任務分割成多個小任務,實現并行處理,從而提高查詢效率。這種能力使得ClickHouse在處理大規模數據時表現出色。然而,ClickHouse在處理高并發的低延遲查詢方面可能存在一些挑戰。
一項弊端是,如果單個查詢需要利用多個CPU核心,這可能會降低ClickHouse同時處理多個查詢的能力。由于ClickHouse會優先為單個查詢分配多個CPU核心,這可能導致在高并發情況下效率下降,因為無法同時處理多個查詢。
因此,對于高QPS(Query Per Second)的查詢業務,ClickHouse可能并非最佳選擇。在這種情況下,可能需要考慮其他針對高并發低延遲查詢優化的解決方案。性能對比
某網站精華帖,中對幾款數據庫做了性能對比。
1)單表查詢如圖-3所示:
圖-3 單表查詢
2)關聯查詢如圖-4所示:
圖-4 關聯查詢
結論:ClickHouse像很多OLAP數?據庫一樣,單表查詢速度優于關聯查詢,而且ClickHouse的兩者差距更為明顯。
ClickHouse 的安裝
準備工作
1)確定防火墻處于關閉狀態。
2)CentOS 取消打開文件數限制。
在hadoop101的/etc/security/limits.conf文件的末尾加入以下內容:
*也要寫上
vim /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536
* soft nproc 131072
* hard nproc 131072
在hadoop101的/etc/security/limits.d/20-nproc.conf文件的末尾加入以下內容:
vim /etc/security/limits.d/20-nproc.conf
* soft nofile 65536
* hard nofile 65536
* soft nproc 131072
* hard nproc 131072
3)執行同步操作。
xsync /etc/security/limits.conf
Xsync /etc/security/limits.d/20-nproc.conf
4)安裝依賴。
clickhouse依賴于c語言包
yum install -y libtool
用于c語言連接數據庫
yum install -y *unixODBC*
在hadoop102、hadoop103上執行以上操作。
5)CentOS取消SELINUX。
修改/etc/selinux/config中的SELINUX=disabled:
vim /etc/selinux/config
SELINUX=disabled
6)執行同步操作。
xsync /etc/selinux/config
7)重啟三臺服務器。
單機安裝
官網:Fast Open-Source OLAP DBMS - ClickHouse
下載地址:Index of /clickhouse/rpm/stable/x86_64/
1)在hadoop101的/opt/software下創建clickhouse目錄。
mkdir clickhouse
2)將資料/ClickHouse下4個文件上傳到hadoop101的software/clickhouse目錄下,如圖-5所示:
圖-5 資料目錄
3)將安裝文件同步到hadoop102、hadoop103。
xsync clickhouse
4)分別在三臺機子上安裝這4個rpm文件。
rpm -ivh *.rpm
5)查看安裝情況。
rpm -qa|grep clickhouse
6)修改配置文件。
vim /etc/clickhouse-server/config.xml
把<listen_host>::</listen_host> 的注釋打開,這樣的話才能讓 ClickHouse 被除本機以外的服務器訪問,如圖-6所示:
圖-6 單機模式
7)分發配置文件。
xsync /etc/clickhouse-server/config.xml
在這個文件中,有ClickHouse?的一些默認路徑配置,比較重要的數據文件路徑:<path>/var/lib/clickhouse/</path>。
日志文件路徑:<log>/var/log/clickhouse-server/clickhouse-server.log</log>。
8)啟動Server。
systemctl start clickhouse-server
9)三臺機器上關閉開機自啟。
systemctl disable clickhouse-server
10)使用client連接server,如圖-7所示。
clickhouse-client -m
圖-7 連接客戶端
-m:可以在命令窗口輸入多行命令。
數據類型
整型
固定長度的整型,包括有符號整型或無符號整型。
1)整型范圍:
Int8 - [-128 : 127]
Int16 - [-32768 : 32767]
Int32 - [-2147483648 : 2147483647]
Int64 - [-9223372036854775808 : 9223372036854775807]
2)無符號整型范圍:
UInt8 - [0 : 255]
UInt16 - [0 : 65535]
UInt32 - [0 : 4294967295]
UInt64 - [0 : 18446744073709551615]
使用場景:個數、數量、也可以存儲型id。
浮點型
Float32 - float Float64 – double
建議盡可能以整數形式存儲數據。例如,將固定精度的數字轉換為整數值,如時間用毫秒為單位表示,因為浮點型進行計算時可能引起四舍五入的誤差。
使用場景:一般數據值比較小,不涉及大量的統計計算,精度要求不高的時候。比如保存商品的重量。
布爾型
沒有單獨的類型來存儲布爾值。可以使用UInt8類型,取值限制為0或1。
Decimal型
有符號的浮點數,可在加、減和乘法運算過程中保持精度。對于除法,最低有效數字會被丟棄(不舍入)。
有三種聲明:
1)Decimal32(s),相當于 Decimal(9-s,s),有效位數為1~9。
2)Decimal64(s),相當于 Decimal(18-s,s),有效位數為1~18。
3)Decimal128(s),相當于 Decimal(38-s,s),有效位數為1~38。
使用場景:一般金額字段、匯率、利率等字段為了保證小數點精度,都使用Decimal進行存儲。
字符串
1)String:字符串可以任意長度的。它可以包含任意的字節集,包含空字節。
2)FixedString(N):固定長度N的字符串,N必須是嚴格的正自然數。當服務端讀取長度小于N的字符串時候,通過在字符串末尾添加空字節來達到N字節長度。當服務端讀取長度大于N的字符串時候,將返回錯誤消息。與String相比,極少會使用FixedString,因為使用起來不是很方便。
使用場景:名稱、文字描述、字符型編碼。固定長度的可以保存一些定長的內容,比如一些編碼,性別等但是考慮到一定的變化風險,帶來收益不夠明顯,所以定長字符串使用意義有限。
時間類型
目前ClickHouse有三種時間類型:
1)Date接受年-月-日的字符串比如‘2019-12-16’。
2)Datetime接受年-月-日 時:分:秒的字符串比如‘2019-12-16 20:50:10’。
3)Datetime64接受年-月-日 時:分:秒.亞秒的字符串比如‘2019-12-16 20:50:10.66’日期類型,用兩個字節存儲,表示從 1970-01-01 (無符號) 到當前的日期值。
還有很多數據結構,可以參考官方文檔。
數組
Array(T):由T類型元素組成的數組。
T可以是任意類型,包含數組類型。但不推薦使用多維數組,ClickHouse對多維數組的支持有限。例如,不能在MergeTree表中存儲多維數組。
1)創建數組方式1,使用array函數。
array(T)
SELECT array(1, 2) AS x, toTypeName(x);
圖-8 執行結果
2)創建數組方式2:使用方括號。
[]
SELECT [1, 2] AS x, toTypeName(x);
圖-9 執行結果
表引擎
表引擎的使用
表引擎是ClickHouse的一大特色。可以說,表引擎決定了如何存儲表的數據。包括:
1)數據的存儲方式和位置,寫到哪里以及從哪里讀取數據。
2)支持哪些查詢以及如何支持。
3)并發數據訪問。
4)索引的使用。
5)是否可以執行多線程請求。
6)數據復制參數。
表引擎的使用方式就是必須顯式在創建表時定義該表使用的引擎,以及引擎使用的相關參數。
特別注意:引擎的名稱大小寫敏感。
TinyLog
以列文件的形式保存在磁盤上,不支持索引,沒有并發控制。一般保存少量數據的小表,生產環境上作用有限。可以用于平時練習測試用。
create table t_tinylog ( id String, name String) engine=TinyLog;
Memory
內存引擎,數據以未壓縮的原始形式直接保存在內存當中,服務器重啟數據就會消失。讀寫操作不會相互阻塞,不支持索引。簡單查詢下有非常非常高的性能表現(超過10G/s)。
一般用到它的地方不多,除了用來測試,就是在需要非常高的性能,同時數據量又不太大(上限大概1億行)的場景。
MergeTree
ClickHouse 中最強大的表引擎當屬MergeTree(合并樹)引擎及該系列(*MergeTree)中的其他引擎,支持索引和分區,地位可以相當于innodb之于Mysql。而且基于MergeTree,還衍生出了很多家族成員,也是非常有特色的引擎。
1)建表語句。
create table t_order_mt( id UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time Datetime
) engine =MergeTree
partition by toYYYYMMDD(create_time)?primary key (id)
order by (id,sku_id);
2)插入數據。
insert into t_order_mt values (101,'sku_001',1000.00,'2020-06-01 12:00:00') ,
(102,'sku_002',2000.00,'2020-06-01 11:00:00'),
(102,'sku_004',2500.00,'2020-06-01 12:00:00'),
(102,'sku_002',2000.00,'2020-06-01 13:00:00'),
(102,'sku_002',12000.00,'2020-06-01 13:00:00'),
(102,'sku_002',600.00,'2020-06-02 12:00:00');
- 指定分區進行查詢
SELECT?* FROM t_order_mt WHERE create_time = toDateTime('2020-06-02?12:00:00');
MergeTree其實還有很多參數(絕大多數用默認值即可),但是其中三個參數是非常重要的,也涉及了關于MergeTree的很多概念。
1)partition by 分區(可選)。
作用:學過hive的應該都不陌生,分區的目的主要是降低掃描的范圍,優化查詢速度。
如果不填:只會使用一個分區。
分區目錄:MergeTree是以列文件+索引文件+表定義文件組成的,但是如果設定了分區那么這些文件就會保存到不同的分區目錄中。
并行:分區后,面對涉及跨分區的查詢統計,ClickHouse 會以分區為單位并行處理。
數據寫入與分區合并:任何一個批次的數據寫入都會產生一個臨時分區,不會納入任何一個已有的分區。寫入后的某個時刻(大概 10-15 分鐘后),ClickHouse會自動執行合并操作(等不及也可以手動通過 optimize 執行),把臨時分區的數據,合并到已有分區中。
optimize table xxxx final;
例如:再次執行上面的插入操作。
insert into t_order_mt values (101,'sku_001',1000.00,'2020-06-01 12:00:00') ,
(102,'sku_002',2000.00,'2020-06-01 11:00:00'),
(102,'sku_004',2500.00,'2020-06-01 12:00:00'),
(102,'sku_002',2000.00,'2020-06-01 13:00:00'),
(102,'sku_002',12000.00,'2020-06-01 13:00:00'),
(102,'sku_002',600.00,'2020-06-02 12:00:00');
查看數據并沒有納入任何分區,如圖-10所示:
圖-10 執行結果
手動optimize之后:
optimize table t_order_mt final;
再次查詢,如圖-11所示:
圖-11 執行結果
MergeTree中建表是必須有order by 。
主鍵的字段必須是order by 中的字段。 主鍵可以省略,主鍵可以重復。
分區可以自選是否分區。
ClickHouse中的索引默認是稀疏索引,默認8192行創建一個索引,查找使用二分查找
TTL 數據保存時長,超過則清空數據(原本數據1000,清空為0), 一行一字段單獨。
2)primary key主鍵(可選)。
ClickHouse中的主鍵,和其他數據庫不太一樣,它只提供了數據的一級索引,但是卻不是唯一約束。這就意味著是可以存在相同primary key的數據的。
主鍵的設定主要依據是查詢語句中的where條件。
根據條件通過對主鍵進行某種形式的二分查找,能夠定位到對應的index granularity,避免了全表掃描。
index granularity: 直接翻譯的話就是索引粒度,指在稀疏索引中兩個相鄰索引對應數據的間隔。ClickHouse 中的 MergeTree 默認是 8192。官方不建議修改這個值,除非該列存在大量重復值,比如在一個分區中幾萬行才有一個不同數據。
稀疏索引如圖-12所示:
圖-12 稀疏索引
稀疏索引的好處就是可以用很少的索引數據,定位更多的數據,代價就是只能定位到索引粒度的第一行,然后再進行進行一點掃描。
3)order by(必選)。
order by設定了分區內的數據按照哪些字段順序進行有序保存。
order by是MergeTree中唯一一個必填項,甚至比primary key還重要,因為當用戶不設置主鍵的情況,很多處理會依照order by的字段進行處理(比如后面會講的去重和匯總)。
要求:主鍵必須是order by字段的前綴字段。
比如order by字段是(id,sku_id),那么主鍵必須是id或者(id,sku_id)。
設置TTL的列必須是時間類型的列,且不能TTL的字段不能在order by之中
列級別TTL
創建測試表。
create table t_order_mt3( id UInt32,
sku_id String,
total_amount Decimal(16,2) TTL create_time+interval 10 SECOND,
create_time Datetime
) engine =MergeTree
partition by toYYYYMMDD(create_time) primary key (id)
order by (id, sku_id);
插入數據(注意:根據實際時間改變)。
insert into t_order_mt3 values (106,'sku_001',1000.00,'2023-09-20?15:15:30'),
(107,'sku_002',2000.00,'2023-09-20?16:23:30'),
(110,'sku_003',600.00,'2023-09-21?16:00:00');
手動合并,查看效果。到期后,指定的字段數據歸0,如圖-13所示:
圖-13 執行結果
表級TTL
下面的這條語句是數據會在create_time之后10秒丟失。
alter table t_order_mt3 MODIFY TTL create_time + INTERVAL 10 SECOND;
涉及判斷的字段必須是Date或者Datetime類型,推薦使用分區的日期字段。能夠使用的時間周期:
SECOND
MINUTE
HOUR
DAY
WEEK
MONTH
QUARTER
YEAR
ReplacingMergeTree
ReplacingMergeTree是MergeTree的一個變種,它存儲特性完全繼承 MergeTree,只是多了一個去重的功能。盡管MergeTree可以設置主鍵,但是 primary key 其實沒有唯一約束的功能。如果你想處理掉重復的數據,可以借助這個 ReplacingMergeTree。
1)去重時機:數據的去重只會在合并的過程中出現。合并會在未知的時間在后臺進行,所以你無法預先作出計劃。有一些數據可能仍未被處理。
2)去重范圍:如果表經過了分區,去重只會在分區內部進行去重,不能執行跨分區的去重。所以 ReplacingMergeTree 能力有限, ReplacingMergeTree 適用于在后臺清除重復的數據以節省空間,只會在各個分區內部進行去重,不能保證整個表中的數據都是唯一的。
去重是根據Order by 中的字段去重
案例演示
1)創建表。
create table t_order_rmt( id UInt32,
sku_id String,
total_amount Decimal(16,2) ,
create_time Datetime
) engine =ReplacingMergeTree(create_time) partition by toYYYYMMDD(create_time) primary key (id)
order by (id, sku_id);
ReplacingMergeTree()填入的參數為版本字段,重復數據保留版本字段值最大的。如果不填版本字段,默認按照插入順序保留最后一條。
2)向表中插入數據。
insert into t_order_rmt values (101,'sku_001',1000.00,'2020-06-01 12:00:00') ,
(102,'sku_002',2000.00,'2020-06-01 11:00:00'),
(102,'sku_004',2500.00,'2020-06-01 12:00:00'),
(102,'sku_002',2000.00,'2020-06-01 13:00:00'),
(102,'sku_002',12000.00,'2020-06-01 13:00:00'),
(102,'sku_002',600.00,'2020-06-02 12:00:00');
3)執行第一次查詢,結果如圖-14所示。
select * from t_order_rmt;
圖-14 執行結果
4)手動合并。
OPTIMIZE TABLE t_order_rmt FINAL;
5)再執行一次查詢,結果如圖-15所示。
select * from t_order_rmt;
圖-15 執行結果
通過測試得到結論
1)去重不能跨分區。
2)只有同一批插入(新版本)或合并分區時才會進行去重。
3)認定重復的數據保留版本字段值最大的。
4)如果版本字段相同則按插入順序保留最后一筆。
SummingMergeTree
對于不查詢明細,只關心以維度進行匯總聚合結果的場景。如果只使用普通的MergeTree的話,無論是存儲空間的開銷,還是查詢時臨時聚合的開銷都比較大。
ClickHouse為了這種場景,提供了一種能夠“預聚合”的引擎SummingMergeTree。
案例演示
1)創建表。
create table t_order_smt( id UInt32,
sku_id String,
total_amount Decimal(16,2) ,?
create_time Datetime
) engine =SummingMergeTree(total_amount) partition by toYYYYMMDD(create_time) primary key (id)
order by (id,sku_id );
2)插入數據。
insert into t_order_smt values (101,'sku_001',1000.00,'2020-06-01 12:00:00'),
(102,'sku_002',2000.00,'2020-06-01 11:00:00'),
(102,'sku_004',2500.00,'2020-06-01 12:00:00'),
(102,'sku_002',2000.00,'2020-06-01 13:00:00'),
(102,'sku_002',12000.00,'2020-06-01 13:00:00'),
(102,'sku_002',600.00,'2020-06-02 12:00:00');
3)執行第一次查詢,結果如圖-16所示。
select * from t_order_smt;
圖-16 執行結果
4)手動合并。
OPTIMIZE TABLE t_order_smt FINAL;
5)再執行一次查詢,結果如圖-17所示。
select * from t_order_smt;
圖-17 執行結果
通過結果得到結論
1)以SummingMergeTree()中指定的列作為匯總數據列。
2)可以填寫多列必須數字列,如果不填,以所有非維度列且為數字列的字段為匯總數據列。
3)以order by的列為準,作為維度列。
4)其他的列按插入順序保留第一行。
5)不在一個分區的數據不會被聚合。
6)只有在同一批次插入(新版本)或分片合并時才會進行聚合。
SQL 操作
基本上來說傳統關系型數據庫(以MySQL為例)的SQL語句,ClickHouse基本都支持,這里不會從頭講解SQL語法,只介紹ClickHouse與標準SQL(MySQL)不一致的地方。
Insert
基本與標準SQL(MySQL)一致。
1)標準。
insert into [table_name] values(…),(….)
2)從表到表的插入。
insert into [table_name] select a,b,c from [table_name_2]
Update 和 Delete
ClickHouse提供了Delete和Update的能力,這類操作被稱為Mutation查詢,它可以看做Alter的一種。
雖然可以實現修改和刪除,但是和一般的OLTP數據庫不一樣,Mutation語句是一種很“重”的操作,而且不支持事務。
“重”的原因主要是每次修改或者刪除都會導致放棄目標數據的原有分區,重建新分區。所以盡量做批量的變更,不要進行頻繁小數據的操作。
1)刪除操作。
alter table t_order_smt delete where sku_id ='sku_001';
2)修改操作。
alter table t_order_smt update total_amount=toDecimal32(2000.00,2) where id
=102;
由于操作比較“重”,所以Mutation語句分兩步執行,同步執行的部分其實只是進行新增數據新增分區和并把舊分區打上邏輯上的失效標記。直到觸發分區合并的時候,才會刪除舊數據釋放磁盤空間,一般不會開放這樣的功能給用戶,由管理員完成。
查詢操作
ClickHouse基本上與標準SQL差別不大。
1)支持子查詢。
2)支持CTE(Common Table Expression公用表表達式with子句)。
3)支持各種JOIN,但是JOIN操作無法使用緩存,所以即使是兩次相同的JOIN語句,ClickHouse也會視為兩條新SQL。
4)窗口函數(官方正在測試中...)。
5)不支持自定義函數。
alter 操作
同MySQL的修改字段基本一致。
- 新增字段。
- first ?第一個字段
alter table tableName add column newcolname String after col1;
2)修改字段類型。
alter table tableName modify column colname NewType;
3)刪除字段。
alter table tableName drop column newcolname;
導出數據
方式一:交互式
select * from tableName into outfile 'path/file'
方式二:非交互式
clickhouse-client ?--database bdName -u default --password password --query='select * from tableName' >?abc
更多支持格式參照:
Formats for Input and Output Data | ClickHouse Docs
第三方工具
簡介
方法提供的第三方工具蠻多的,它們提供了一些ClickHouse的接口。它可以是可視化界面、命令行界面或API,比如:Client libraries、Integrations、GUI、Proxies等等。
默認用戶: ?密碼無
需要設定端口號嗎?
DataGrip的使用
這里簡要說一下GUI工具,也很多,這里提一個DataGrip,和Idea都是同一個母公司——JetBrains開發出來了,大家可以下載嘗試一下,比較簡單。
下載地址:DataGrip: The Cross-Platform IDE for Databases & SQL by JetBrains
Clickhouse整合
操作原理也相對比較簡單,就是基于jdbc的方式寫入clickhouse即可。
Clickhouse整合spark
導入依賴:
<properties>
????<maven.compiler.source>1.8</maven.compiler.source>
????<maven.compiler.target>1.8</maven.compiler.target>
????<encoding>UTF-8</encoding>
????<scala.version>2.11.12</scala.version>
????<scala.compat.version>2.11</scala.compat.version>
????<flink.version>1.9.1</flink.version>
</properties>
<dependencies>
????<dependency>
????????<groupId>org.apache.flink</groupId>
????????<artifactId>flink-scala_2.11</artifactId>
????????<version>${flink.version}</version>
????</dependency>
????<dependency>
????????<groupId>org.apache.flink</groupId>
????????<artifactId>flink-streaming-scala_2.11</artifactId>
????????<version>${flink.version}</version>
????</dependency>
????<dependency>
????????<groupId>org.projectlombok</groupId>
????????<artifactId>lombok</artifactId>
????????<version>1.18.12</version>
????</dependency>
????<dependency>
????????<groupId>ru.yandex.clickhouse</groupId>
????????<artifactId>clickhouse-jdbc</artifactId>
????????<version>0.2.4</version>
????????<exclusions>
????????????<exclusion>
????????????????<groupId>com.fasterxml.jackson.core</groupId>
????????????????<artifactId>jackson-databind</artifactId>
????????????</exclusion>
????????</exclusions>
????</dependency>
????<dependency>
????????<groupId>org.apache.spark</groupId>
????????<artifactId>spark-sql_2.11</artifactId>
????????<version>2.4.7</version>
????</dependency>
</dependencies>
<build>
????????<sourceDirectory>src/main/scala</sourceDirectory>
????????<testSourceDirectory>src/test/scala</testSourceDirectory>
????????<plugins>
????????????<!-- 指定編譯scala的插件 -->
????????????<plugin>
????????????????<groupId>net.alchim31.maven</groupId>
????????????????<artifactId>scala-maven-plugin</artifactId>
????????????????<version>3.2.2</version>
????????????????<executions>
????????????????????<execution>
????????????????????????<goals>
????????????????????????????<goal>compile</goal>
????????????????????????????<goal>testCompile</goal>
????????????????????????</goals>
????????????????????????<configuration>
????????????????????????????<args>
????????????????????????????????<arg>-dependencyfile</arg>
????????????????????????????????<arg>${project.build.directory}/.scala_dependencies</arg>
????????????????????????????</args>
????????????????????????</configuration>
????????????????????</execution>
????????????????</executions>
????????????</plugin>
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-surefire-plugin</artifactId>
????????????????<version>2.18.1</version>
????????????????<configuration>
????????????????????<useFile>false</useFile>
????????????????????<disableXmlReport>true</disableXmlReport>
????????????????????<includes>
????????????????????????<include>**/*Test.*</include>
????????????????????????<include>**/*Suite.*</include>
????????????????????</includes>
????????????????</configuration>
????????????</plugin>
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-shade-plugin</artifactId>
????????????????<version>2.3</version>
????????????????<executions>
????????????????????<execution>
????????????????????????<phase>package</phase>
????????????????????????<goals>
????????????????????????????<goal>shade</goal>
????????????????????????</goals>
????????????????????????<configuration>
????????????????????????????<filters>
????????????????????????????????<filter>
????????????????????????????????????<artifact>*:*</artifact>
????????????????????????????????????<excludes>
????????????????????????????????????????<exclude>META-INF/*.SF</exclude>
????????????????????????????????????????<exclude>META-INF/*.DSA</exclude>
????????????????????????????????????????<exclude>META-INF/*.RSA</exclude>
????????????????????????????????????</excludes>
????????????????????????????????</filter>
????????????????????????????</filters>
????????????????????????????<transformers>
????????????????????????????????<transformer
????????????????????????????????????????implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
????????????????????????????????????<mainClass>chapter7.SparkStreamingDriverHAOps</mainClass>
????????????????????????????????</transformer>
????????????????????????????</transformers>
????????????????????????</configuration>
????????????????????</execution>
????????????????</executions>
????????????</plugin>
????????</plugins>
????</build>
sparkcore方式
創建Person表:
create table person(id int, name varchar(30), age int) engine = TinyLog;
核心代碼:
import java.sql.DriverManager
import org.apache.spark.{SparkConf, SparkContext}
case class Person(id: Int, name: String, age: Int)
object SparkCore2ClickHouseApp {
????def main(args: Array[String]): Unit = {
????????val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("SparkCore2ClickHouse")
????????val sc = new SparkContext(conf)
????????sc.setLogLevel("WARN")
????????val persons = sc.parallelize(List(
????????????????Person(1, "zhangsan", 13),
????????????????Person(2, "lisi", 14),
????????????????Person(3, "wangwu", 15),
????????????????Person(4, "zhaoliu", 16),
????????????????Person(5, "zhouqi", 17)))
????????persons.foreachPartition(persons => {
//連接clickhouse用9000端口(什么時候用?本地client連接用,少),但是9000和hdfs的重復,之后同時用到時需要更改。 datagrip連接用8123啊
????????????val url = "jdbc:clickhouse://hadoop101:8123/default"
????????????val user = "default"
????????????val password = null
????????????val connection = DriverManager.getConnection(url, user, password)
????????????val sql = "insert into person values(?, ?, ?)"
????????????val ps = connection.prepareStatement(sql)
????????????persons.foreach(person => {
????????????????ps.setInt(1, person.id)
????????????????ps.setString(2, person.name)
????????????????ps.setInt(3, person.age)
????????????????ps.addBatch()})
????????????ps.executeBatch()
????????????ps.close()
????????????connection.close()
?????????????})
????????sc.stop()
?????????}
}
sparksql方式:
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkSql2ClickHouseApp {
????def main(args: Array[String]): Unit = {
????????val spark: SparkSession = SparkSession.builder()
.appName("SparkSql2ClickHouse")
.master("local[*]")
.getOrCreate()
????????val sc: SparkContext = spark.sparkContext
????????sc.setLogLevel("WARN")
????????import spark.implicits._
????????val persons = spark.createDataset(List(
????????Person(6, "lilei", 23),
????????Person(7, "hanmeimei", 24),
????????Person(8, "xiaohong", 25)
????????) )
//sql寫入模式的區別
????????persons.write.mode(SaveMode.Append).format("jdbc")
????????.option("url", "jdbc:clickhouse://hadoop101:8123/default")
????????.option("user", "default")
????????.option("dbtable", "person")
?????????//不加以下倆個配置,會報警告:WARN jdbc.JdbcUtils: Requested isolation level 1, but transactions are unsupported
????????// 因為clickhouse不支持事務,所以可以關閉事務
????????.option("isolationLevel", "NONE")
????????//設置并發數為1,避免亂序
????????.option("numPartitions", "1")
????????.save()
????????spark.stop()
????}
}
Clickhouse整合flink
自定義Sink:
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import java.sql.{Connection, DriverManager, PreparedStatement}
case class Person(id: Int, name: String, age: Int)
class ClickHouseSink extends RichSinkFunction[Person] {
????private val url = "jdbc:clickhouse://hadoop101:8123/default"
????private val user = "default"
????private val password: String = null
????private var connection:Connection = null
????private var ps:PreparedStatement = null
????override def open(parameters: Configuration): Unit = {
????????connection = DriverManager.getConnection(url, user, password)
????????val sql = "insert into person values(?, ?, ?)"
????????ps = connection.prepareStatement(sql)
??????????}
????override def invoke(person: Person, context: SinkFunction.Context[_]): Unit = {
????????ps.setInt(1, person.id)
????????ps.setString(2, person.name)
????????ps.setInt(3, person.age)
????????ps.execute()
??????????}
????override def close(): Unit = {
????????ps.close()
????????connection.close()
????}
}
核心代碼:
import org.apache.flink.streaming.api.scala._
object Flink2ClickhouseApp {
????def main(args: Array[String]): Unit = {
????????val env = StreamExecutionEnvironment.getExecutionEnvironment
????????val inputs = env.fromCollection(List(
????????????Person(9, "jack", 17),
????????????Person(10, "tom", 18),
????????????Person(11, "lucy", 19)))
????????inputs.addSink(new ClickHouseSink())
????????env.execute("Flink2ClickhouseApp")
????}
}
副本
副本的目的主要是保障數據的高可用性,即使一臺ClickHouse節點宕機,那么也可以從其他服務器獲得相同的數據。
Data Replication | ClickHouse Docs
副本寫入流程
寫入流程如圖-18所示:
圖-18 寫入流程
配置步驟
1)啟動zookeeper集群。
2)在hadoop101的/etc/clickhouse-server/config.d目錄下創建一個名為metrika.xml的配置文件,內容如下:
注:也可以不創建外部文件,直接在config.xml中指定<zookeeper>。
<?xml version="1.0"?>
<yandex>
<zookeeper-servers>
<node index="1">
<host>hadoop101</host>
<port>2181</port>
</node>
<node index="2">
<host>hadoop102</host>
<port>2181</port>
</node>
<node index="3">
<host>hadoop103</host>
<port>2181</port>
</node>
</zookeeper-servers>
</yandex>
3)同步到hadoop102和hadoop103上。
xsync /etc/clickhouse-server/config.d/metrika.xml
4)在hadoop101的/etc/clickhouse-server/config.xml中增加。
<zookeeper incl="zookeeper-servers" optional="true" />
<include_from>/etc/clickhouse-server/config.d/metrika.xml</include_from>
圖-19 配置文件
5)同步到hadoop102和hadoop103上。
xsync /etc/clickhouse-server/config.xml
6)分別在hadoop102和hadoop103上啟動ClickHouse服務。
clickhouse restart
注意:因為修改了配置文件,如果以前啟動了服務需要重啟。
注意:我們演示副本操作只需要在hadoop101和hadoop102兩臺服務器即可,上面的操作,我們hadoop103可以你不用同步,我們這里為了保證集群中資源的一致性,做了同步。
7)在hadoop101和hadoop102上分別建表。
副本只能同步數據,不能同步表結構,所以我們需要在每臺機器上自己手動建表。
hadoop101:
create table t_order_re ( id UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time Datetime
) engine =ReplicatedMergeTree('/clickhouse/table/01/t_order','rep_001') partition by toYYYYMMDD(create_time)
primary key (id) order by (id,sku_id);
hadoop102:
create table t_order_re ( id UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time Datetime
) engine =ReplicatedMergeTree('/clickhouse/table/01/t_order','rep_002') partition by toYYYYMMDD(create_time)
primary key (id) order by (id,sku_id);
參數解釋
ReplicatedMergeTree中,第一個參數是分片的zk_path一般按照:/clickhouse/table/{shard}/{table_name}的格式寫,如果只有一個分片就寫 01 即可。
第二個參數是副本名稱,相同的分片副本名稱不能相同。
在hadoop101上執行insert語句,結果如圖-20所示:
insert into t_order_re values (101,'sku_001',1000.00,'2020-06-01 12:00:00'),
(102,'sku_002',2000.00,'2020-06-01 12:00:00'),
(103,'sku_004',2500.00,'2020-06-01 12:00:00'),
(104,'sku_002',2000.00,'2020-06-01 12:00:00'),
(105,'sku_003',600.00,'2020-06-02 12:00:00');
圖-20 插入數據
在hadoop102上執行select,可以查詢出結果,如圖-21所示,說明副本配置正確。
圖-21 執行結果
分片集群
副本雖然能夠提高數據的可用性,降低丟失風險,但是每臺服務器實際上必須容納全量數據,對數據的橫向擴容沒有解決。
要解決數據水平切分的問題,需要引入分片的概念。通過分片把一份完整的數據進行切分,不同的分片分布到不同的節點上,再通過Distributed表引擎把數據拼接起來一同使用。
Distributed表引擎本身不存儲數據,有點類似于 MyCat 之于 MySql,成為一種中間件, 通過分布式邏輯表來寫入、分發、路由來操作多臺節點不同分片的分布式數據。
注意:ClickHouse的集群是表級別的,實際企業中,大部分做了高可用,但是沒有用分片,避免降低查詢性能以及操作集群的復雜性。
集群寫入流程(3 分片 2 副本共 6 個節點)
寫入流程如圖-22所示:
圖-22 寫入流程
集群讀取流程(3 分片 2 副本共 6 個節點)
讀取流程如圖-23所示:
圖-23 讀取流程
集群搭建
數據副本需要是指定引擎,否則還是單機模式。
配置的位置可以在之前的/etc/clickhouse-server/config.d/metrika.xml,內容如下。
注:也可以不創建外部文件,直接在config.xml的<remote_servers>中指定。
hadoop101配置如下:
<yandex>
????...省略...
????<listen_host>::</listen_host>
????<remote_servers>
????????<perftest_3shards_1replicas>
????????????<shard>
????????????????<replica>
????????????????????<host>hadoop101</host>
????????????????????<port>9000</port>
????????????????</replica>
????????????</shard>
????????????<shard>
????????????????<replica>
????????????????????<host>hadoop102</host>
????????????????????<port>9000</port>
????????????????</replica>
????????????</shard>
????????????<shard>
????????????????<replica>
????????????????????<host>hadoop103</host>
????????????????????<port>9000</port>
????????????????</replica>
????????????</shard>
????????</perftest_3shards_1replicas>
????</remote_servers>
????<!-- zookeeper相關配置 -->
????<zookeeper-servers>
????????<node>
????????????<host>hadoop101</host>
????????????<port>2181</port>
????????</node>
????????<node>
????????????<host>hadoop102</host>
????????????<port>2181</port>
????????</node>
????????<node>
????????????<host>hadoop103</host>
????????????<port>2181</port>
????????</node>
????</zookeeper-servers>
????<!-- 復制標識的配置,也稱為宏配置,這里唯一標識一個副本名稱,每個實例都要配置并且都是唯一的 -->
????<macros>
<shard>01</shard>
<replica>01</replica>
????</macros> ?
????...省略... ???
</yandex>
hadoop102配置如下:
<yandex>
????...省略...
????<listen_host>::</listen_host>
????<remote_servers>
????????<perftest_3shards_1replicas>
????????????<shard>
????????????????<replica>
????????????????????<host>hadoop101</host>
????????????????????<port>9000</port>
????????????????</replica>
????????????</shard>
????????????<shard>
????????????????<replica>
????????????????????<host>hadoop102</host>
????????????????????<port>9000</port>
????????????????</replica>
????????????</shard>
????????????<shard>
????????????????<replica>
????????????????????<host>hadoop103</host>
????????????????????<port>9000</port>
????????????????</replica>
????????????</shard>
????????</perftest_3shards_1replicas>
????</remote_servers>
????<!-- zookeeper相關配置 -->
????<zookeeper-servers>
????????<node>
????????????<host>hadoop101</host>
????????????<port>2181</port>
????????</node>
????????<node>
????????????<host>hadoop102</host>
????????????<port>2181</port>
????????</node>
????????<node>
????????????<host>hadoop103</host>
????????????<port>2181</port>
????????</node>
????</zookeeper-servers>
????<!-- 復制標識的配置,也稱為宏配置,這里唯一標識一個副本名稱,每個實例都要配置并且都是唯一的 -->
????<macros>
<shard>02</shard>
<replica>01</replica>
????</macros> ?
????...省略... ???
</yandex>
hadoop103配置如下:
????<remote_servers>
????????<perftest_3shards_1replicas>
????????????<shard>
????????????????<replica>
????????????????????<host>hadoop101</host>
????????????????????<port>9000</port>
????????????????</replica>
????????????</shard>
????????????<shard>
????????????????<replica>
????????????????????<host>hadoop102</host>
????????????????????<port>9000</port>
????????????????</replica>
????????????</shard>
????????????<shard>
????????????????<replica>
????????????????????<host>hadoop103</host>
????????????????????<port>9000</port>
????????????????</replica>
????????????</shard>
????????</perftest_3shards_1replicas>
????</remote_servers>
????<!-- zookeeper相關配置 -->
<zookeeper-servers>
????????<node>
????????????<host>hadoop101</host>
????????????<port>2181</port>
????????</node>
????????<node>
????????????<host>hadoop102</host>
????????????<port>2181</port>
????????</node>
????????<node>
????????????<host>hadoop103</host>
????????????<port>2181</port>
????????</node>
????</zookeeper-servers>
????<!-- 復制標識的配置,也稱為宏配置,這里唯一標識一個副本名稱,每個實例都要配置并且都是唯一的 -->
????<macros>
<shard>03</shard>
<replica>01</replica>
????</macros> ?