?
?進入正文前,感謝寶子們訂閱專題、點贊、評論、收藏!關注IT貧道,獲取高質量博客內容!
🏡個人主頁:含各種IT體系技術,IT貧道_Apache Doris,大數據OLAP體系技術棧,Kerberos安全認證-CSDN博客
📌訂閱:擁抱獨家專題,你的訂閱將點燃我的創作熱情!
👍點贊:贊同優秀創作,你的點贊是對我創作最大的認可!
?? 收藏:收藏原創博文,讓我們一起打造IT界的榮耀與輝煌!
??評論:留下心聲墨跡,你的評論將是我努力改進的方向!
目錄
1.?HDFS
1.1 語法
1.2 其他配置
1.3 示例
??????????????2.?MySQL
2.1 語法
2.2 示例
2.3 測試 replace_query??
2.4 測試 on_duplicate_clause
??????????????3.?Kafka
3.1 語法
3.2 示例
3.3 示例
ClickHouse提供了許多與外部系統集成的方法,包括一些表引擎。這些表引擎與其他類型的表引擎類似,可以用于將外部數據導入到ClickHouse中,或者在ClickHouse中直接操作外部數據源。
???????1.?HDFS
HDFS引擎支持ClickHouse 直接讀取HDFS中特定格式的數據文件,目前文件格式支持Json,Csv文件等,ClickHouse通過HDFS引擎建立的表,不會在ClickHouse中產生數據,讀取的是HDFS中的數據,將HDFS中的數據映射成ClickHouse中的一張表,這樣就可以使用SQL操作HDFS中的數據。
ClickHouse并不能夠刪除HDFS上的數據,當我們在ClickHouse客戶端中刪除了對應的表,只是刪除了表結構,HDFS上的文件并沒有被刪除,這一點跟Hive的外部表十分相似。
1.1 語法
ENGINE = HDFS(URI, format)
注意:URI是HDFS文件路徑,format指定文件格式。HDFS文件路徑中文件為多個時,可以指定成some_file_?,或者當數據映射的是HDFS多個文件夾下數據時,可以指定somepath/* 來指定URI
1.2 其他配置
由于HDFS配置了HA 模式,有集群名稱,所以URI使用mycluster HDFS集群名稱時,ClickHouse不識別,這時需要做以下配置:
- 將hadoop路徑下$HADOOP_HOME/etc/hadoop下的hdfs-site.xml文件復制到/etc/clickhouse-server目錄下。
- 修改/etc/init.d/clickhouse-server 文件,加入一行 “export LIBHDFS3_CONF=/etc/clickhouse-server/hdfs-site.xml”
- 重啟ClickHouse-server 服務
serveice clickhouse-server restart
當然,這里也可以不做以上配置,在寫HDFS URI時,直接寫成對應的節點+端口即可。
1.3 示例
#在HDFS路徑 hdfs://mycluster/ch/路徑下,創建多個csv文件,寫入一些數據c1.csv文件內容:1,張三,192,李四,20c2.csv文件內容:3,王五,214,馬六,22#創建表 t_hdfs,使用HDFS引擎node1 :) create table t_hdfs(id UInt8,name String,age UInt8) engine = HDFS('hdfs://mycluster/ch/*.csv','CSV')#查詢表 t_hdfs中的數據node1 :) select * from t_hdfs;┌─id─┬─name─┬─age─┐│ ?3 ?│ 王五 ?│ ?21 ││ ?4 ?│ 馬六 ?│ ?22 │└────┴──────┴─────┘┌─id─┬─name─┬─age─┐│ ?1 ?│ 張三 ?│ ?19 ││ ?2 ?│ 李四 ?│ ?20 │└────┴──────┴─────┘注意:這里表t_hdfs不會在clickhouse對應的節點路徑下創建數據目錄,同時這種表映射的是HDFS路徑中的csv文件,不能插入數據,t_hdfs是只讀表。#創建表 t_hdfs2 文件 ,使用HDFS引擎node1 :)?create table t_hdfs2(id UInt8,name String,age UInt8) engine = HDFS('hdfs://mycluster/chdata','CSV');#向表 t_hdfs2中寫入數據node1 :) insert into t_hdfs2 values(5,'田七',23),(6,'趙八',24);#查詢表t_hdfs2中的數據node1 :) select * from t_hdfs2;┌─id─┬─name─┬─age─┐│ ?5 ?│ 田七??│ ?23 ││ ?6 ?│??趙八 │ ?24 │└────┴──────┴─────┘注意:t_hdfs2表沒有直接映射已經存在的HDFS文件,這種表允許查詢和插入數據。
??????????????2.?MySQL
ClickHouse MySQL數據庫引擎可以將MySQL某個庫下的表映射到ClickHouse中,使用ClickHouse對數據進行操作。ClickHouse同樣支持MySQL表引擎,即映射一張MySQL中的表到ClickHouse中,使用ClickHouse進行數據操作,與MySQL數據庫引擎一樣,這里映射的表只能做查詢和插入操作,不支持刪除和更新操作。
2.1 語法
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster](name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],...) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
- 以上語法的解釋如下:
- host:port - MySQL服務器名稱和端口
- database - MySQL 數據庫。
- table - 映射的MySQL中的表
- user - 登錄mysql的用戶名
- password - 登錄mysql的密碼
- replace_query??- 將INSERT INTO 查詢是否替換為 REPLACE INTO 的標志,默認為0,不替換。當設置為1時,所有的insert into 語句更改為 replace into 語句。當插入的數據有重復主鍵數據時,此值為0默認報錯,此值為1時,主鍵相同這條數據,默認替換成新插入的數據。
- on_duplicate_clause?- 默認不使用。當插入數據主鍵相同時,可以指定只更新某列的數據為新插入的數據,對應于on duplicate key 后面的語句,其他的值保持不變,需要replace_query 設置為0。
2.2 示例
#在mysql 中創建一張表 t_ch,指定id為主鍵CREATE TABLE t_ch (id INT,NAME VARCHAR (255),age INT,PRIMARY KEY (id))#向表中增加一些數據insert into ?t_ch values (1,"張三",18),(2,"李四",19),(3,"王五",20)#在ClickHouse中創建MySQL引擎表 t_mysql_enginenode1 :) create table t_mysql_engine (:-] ?id UInt8,:-] ?name String,:-] ?age UInt8:-]?)engine = MySQL('node2:3306','test','t_ch','root','123456');#查詢ClickHouse表 t_mysql_engine 中的數據:node1 :) select * from t_mysql_engine;┌─id─┬─name─┬─age─┐│ ?1 ?│ 張三 ?│ ?18 ││ ?2 ?│ 李四 ?│ ?19 ││ ?3 ?│ 王五 ?│ ?20 │└────┴──────┴─────┘#在ClickHouse中向表 t_mysql_engine中插入一條數據node1 :) insert into t_mysql_engine values (4,'馬六','21');┌─id─┬─name─┬─age─┐│ ?1 ??│ 張三 ???│ ?18 ??││ ?2 ??│ 李四 ???│ ?19 ??││ ?3 ??│ 王五 ???│ ??20 ?││ ?4 ??│ 馬六 ???│ ?21 ??│└───┴─────┴───┘#在ClickHouse中向表 t_mysql_engine中再插入一條數據,這里主鍵重復,報錯。node1 :) insert into t_mysql_engine values (4,'田七','22');Exception: mysqlxx::BadQuery: Duplicate entry '4' for key'PRIMARY' (node2:3306).注意:在clickhouse 中 t_mysql_engine表不會在ClickHouse服務器節點上創建數據目錄。
2.3 測試 replace_query??
#在mysql 中刪除表 t_ch,重新創建,指定id為主鍵CREATE TABLE t_ch (id INT,NAME VARCHAR (255),age INT,PRIMARY KEY (id))#向表中增加一些數據insert into ?t_ch values (1,"張三",18),(2,"李四",19),(3,"王五",20)#在ClickHouse中刪除MySQL引擎表 t_mysql_engine,重建node1 :) create table t_mysql_engine (:-] ?id UInt8,:-] ?name String,:-] ?age UInt8:-]?)engine = MySQL('node2:3306','test','t_ch','root','123456',1);#查詢ClickHouse表 t_mysql_engine 中的數據:node1 :) select * from t_mysql_engine;┌─id─┬─name─┬─age─┐│ ?1 ?│ 張三 ?│ ?18 ││ ?2 ?│ 李四 ?│ ?19 ││ ?3 ?│ 王五 ?│ ?20 │└────┴──────┴─────┘#在ClickHouse中向表 t_mysql_engine中插入一條數據,主鍵重復。這里由于指定了replace_query = 1 ,所以當前主鍵數據會被替換成新插入的數據。node1 :) insert into t_mysql_engine values (3,'馬六','21');#查詢ClichHouse t_mysql_engine表數據node1 :)?select * from t_mysql_engine;┌─id─┬─name─┬─age─┐│ ?1 ?│ 張三 ?│ ?18 ││ ?2 ?│ 李四 ?│ ?19 ││ ?3 ?│ 馬六 ?│ ?21 │└────┴──────┴─────┘
2.4 測試 on_duplicate_clause
#在mysql 中刪除表 t_ch,重新創建,指定id為主鍵CREATE TABLE t_ch (id INT,NAME VARCHAR (255),age INT,PRIMARY KEY (id))#向表中增加一些數據insert into ?t_ch values (1,"張三",18),(2,"李四",19),(3,"王五",20)#在ClickHouse中刪除MySQL引擎表 t_mysql_engine,重建node1 :) create table t_mysql_engine (:-] ?id UInt8,:-] ?name String,:-] ?age UInt8:-]?)engine = MySQL('node2:3306','test','t_ch','root','123456',0,'update age = values(age)');#查詢ClickHouse表 t_mysql_engine 中的數據:node1 :) select * from t_mysql_engine;┌─id─┬─name─┬─age─┐│ ?1 ?│ 張三 ?│ ?18 ││ ?2 ?│ 李四 ?│ ?19 ││ ?3 ?│ 王五 ?│ ?20 │└────┴──────┴─────┘#在ClickHouse 中向表 t_mysql_engine中插入一條數據node1 :) insert into t_mysql_engine values (4,'馬六','21');┌─id─┬─name─┬─age─┐│ ?1 ??│ 張三 ???│ ?18 ??││ ?2 ??│ 李四 ???│ ??19 ?││ ?3 ??│ 王五 ???│ ?20 ??││ ?4 ??│ 馬六 ???│ ?21 ??│└──┴─────┴────┘#在ClickHouse中向表 t_mysql_engine中插入一條數據,主鍵重復。node1 :) insert into t_mysql_engine values (4,'田七','100');#查詢ClichHouse t_mysql_engine表數據node1 :)?select * from t_mysql_engine;┌─id─┬─name─┬─age─┐│ ?1 ?│ 張三 ?│ ?18 ││ ?2 ?│ 李四 ?│ ?19 ││ ?3 ?│ 王五 ?│ ?20 ││ ?4 ?│ 馬六 ?│ 100 │└────┴──────┴─────┘
??????????????3.?Kafka
ClickHouse中還可以創建表指定為Kafka為表引擎,這樣創建出的表可以查詢到Kafka中的流數據。對應創建的表不會將數據存入ClickHouse中,這里這張kafka引擎表相當于一個消費者,消費Kafka中的數據,數據被查詢過后,就不會再次被查詢到。
3.1 語法
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster](name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],...) ENGINE = Kafka()SETTINGSkafka_broker_list = 'host:port',kafka_topic_list = 'topic1,topic2,...',kafka_group_name = 'group_name',kafka_format = 'data_format'[,]
- 對以上參數的解釋:
- kafka_broker_list: 以逗號分隔的Kafka Broker節點列表
- kafka_topic_list?: topic列表
- kafka_group_name?: kafka消費者組名稱
- kafka_format : Kafka中消息的格式,例如:JSONEachRow、CSV等等,具體參照https://clickhouse.tech/docs/en/interfaces/formats/。這里一般使用JSONEachRow格式數據,需要注意的是,json字段名稱需要與創建的Kafka引擎表中字段的名稱一樣,才能正確的映射數據。
3.2 示例
#創建表 t_kafka_consumer ,使用Kafka表引擎node1 :)?create table t_kafka_consumer (:-] id UInt8,:-] name String,:-] age UInt8:-] ) engine = Kafka():-] settings:-]?kafka_broker_list='node1:9092,node2:9092,node3:9092',:-] kafka_topic_list='ck-topic',:-]?kafka_group_name='group1',:-] kafka_format='JSONEachRow';#啟動kafka,在kafka中創建ck-topic topic,并向此topic中生產以下數據:創建topic:kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic ck-topic --partitions 3 --replication-factor 3生產數據:kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic ck-topic生產數據如下:{"id":1,"name":"張三","age":18}{"id":2,"name":"李四","age":19}{"id":3,"name":"王五","age":20}{"id":4,"name":"馬六","age":21}{"id":5,"name":"田七","age":22}#在ClickHouse中查詢表 t_kafka_consumer數據,可以看到生產的數據node1 :) select * from t_kafka_consumer;┌─id─┬─name─┬─age─┐│ ?2 ?│ 李四 ?│ ?19 ││ ?5 ?│ 田七 ?│ ?22 ││ ?1 ?│ 張三??│ ?18 ││ ?4 ?│ 馬六??│ ?21 ││ ?3 ?│ 王五??│ ?20 │└────┴──────┴─────┘注意:再次查看表 t_kafka_consumer數據 ,我們發現讀取不到任何數據,這里對應的ClikcHouse中的Kafka引擎表,只是相當于是消費者,消費讀取Kafka中的數據,數據被消費完成之后,不能再次查詢到對應的數據。
以上在ClickHouse中創建的Kafka引擎表 t_kafka_consumer 只是一個數據管道,當查詢這張表時就是消費Kafka中的數據,數據被消費完成之后,不能再次被讀取到。如果想將Kafka中topic中的數據持久化到ClickHouse中,我們可以通過物化視圖方式訪問Kafka中的數據,可以通過以下三個步驟完成將Kafka中數據持久化到ClickHouse中:
- 創建Kafka 引擎表,消費kafka中的數據。
- 再創建一張ClickHouse中普通引擎表,這張表面向終端用戶查詢使用。這里生產環境中經常創建MergeTree家族引擎表。
- 創建物化視圖,將Kafka引擎表數據實時同步到終端用戶查詢表中。
3.3 示例
#在ClickHouse中創建 t_kafka_consumer2 表,使用Kafka引擎node1 :)?create table t_kafka_consumer2?(:-] id UInt8,:-] name String,:-] age UInt8:-] ) engine = Kafka():-] settings:-]?kafka_broker_list='node1:9092,node2:9092,node3:9092',:-] kafka_topic_list='ck-topic',:-]?kafka_group_name='group1',:-] kafka_format='JSONEachRow';#在ClickHouse中創建一張終端用戶查詢使用的表,使用MergeTree引擎node1 :) create table t_kafka_mt(:-] id UInt8,:-] name String,:-] age UInt8:-] ) engine = MergeTree():-] order by id;#創建物化視圖,同步表t_kafka_consumer2數據到t_kafka_mt中node1 :) create materialized view ?view_consumer to t_kafka_mt:-] as select id,name,age from t_kafka_consumer2;注意:物化視圖在ClickHouse中也是存儲數據的,create ?materialized view ?view_consumer to t_kafka_mt?語句是將物化視圖view_consumer中的數據存儲到到對應的t_kafka_mt?表中,這樣同步的目的是如果不想繼續同步kafka中的數據,可以直接刪除物化視圖即可。#向Kafka ck-topic中生產以下數據:生產數據:kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic ck-topic生產數據如下:{"id":1,"name":"張三","age":18}{"id":2,"name":"李四","age":19}{"id":3,"name":"王五","age":20}{"id":4,"name":"馬六","age":21}{"id":5,"name":"田七","age":22}#查詢表 t_kafka_mt中的數據,數據同步完成。node1 :)?select * from t_kafka_mt;┌─id─┬─name─┬─age─┐│ ?1 ?│ 張三 ?│ ?18 ││ ?2 ?│ 李四 ?│??19 ││ ?3 ?│ 王五 ?│ ?20 ││ ?4 ?│ 馬六 ?│ ?21 ││ ?5??│ 田七 ?│ ?22 │└────┴──────┴─────┘
👨?💻如需博文中的資料請私信博主。