本文以Kafka為例,介紹在E-MapReduce中如何使用Tranquility從Kafka集群采集數據,并實時推送至Druid集群。
Tranquility是一個以push方式向Druid實時發送數據的應用。它替用戶解決了分區、多副本、服務發現、防止數據丟失等多個問題,簡化了用戶使用Druid的難度。它支持多種數據來源,包括Samza、Spark、Storm、Kafka、Flink等等。
與Kafka集群交互
- 確保集群間能夠通信(兩個集群在一個安全組下,或兩個集群在不同安全組,但兩個安全組之間配置了訪問規則)。
- 將 Kafka 集群的 hosts 寫入到 Druid 集群每一個節點的 hosts 列表中,注意 Kafka 集群的 hostname 應采用長名形式,如 emr-header-1.cluster-xxxxxxxx。
- 確保集群間能夠通信(兩個集群在一個安全組下,或兩個集群在不同安全組,但兩個安全組之間配置了訪問規則)。
- 將 Kafka 集群的 hosts 寫入到 Druid 集群每一個節點的 hosts 列表中,注意 Kafka 集群的 hostname 應采用長名形式,如 emr-header-1.cluster-xxxxxxxx。
- 設置兩個集群間的 Kerberos 跨域互信(詳情參考跨域互信),且最好做雙向互信。
- 準備一個客戶端安全配置文件:
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=truekeyTab="/etc/ecm/druid-conf/druid.keytab"principal="druid@EMR.1234.COM";};
? ? ? ? ? ? 之后將該配置文件同步到 Druid 集群的所有節點上,放置于某一個目錄下面(例如/tmp/kafka/kafka_client_jaas.conf)。
5、在 Druid 配置頁面的 overlord.jvm 里新增如下選項:
Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf
6、在 Druid 配置頁面的 middleManager.runtime 里配置druid.indexer.runner.javaOpts=-Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf
和其他JVM啟動參數。
Druid使用Tranquility Kafka
由于Tranquility是一個服務,它對于Kafka來說是消費者,對于Druid來說是客戶端。您可以使用中立的機器來運行Tranquility,只要這臺機器能夠同時連通 Kafka 集群和 Druid 集群即可。
1、Kafka端創建一個名為 pageViews 的 topic。
-- 如果開啟了kafka 高安全:export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"--./bin/kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2:2181,emr-header-3:2181/kafka-1.0.1 --partitions 1 --replication-factor 1 --topic pageViews
2、下載 Tranquility 安裝包,并解壓至某一路徑下。
3、配置 datasource。
這里假設您的 topic name 為 pageViews,并且每條 topic 都是如下形式的 json 文件:
{"time": "2018-05-23T11:59:43Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}{"time": "2018-05-23T11:59:44Z", "url": "/", "user": "bob", "latencyMs": 11}{"time": "2018-05-23T11:59:45Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}
對應的 dataSrouce 的配置如下:
{"dataSources" : {"pageViews-kafka" : {"spec" : {"dataSchema" : {"dataSource" : "pageViews-kafka","parser" : {"type" : "string","parseSpec" : {"timestampSpec" : {"column" : "time","format" : "auto"},"dimensionsSpec" : {"dimensions" : ["url", "user"],"dimensionExclusions" : ["timestamp","value"]},"format" : "json"}},"granularitySpec" : {"type" : "uniform","segmentGranularity" : "hour","queryGranularity" : "none"},"metricsSpec" : [{"name": "views", "type": "count"},{"name": "latencyMs", "type": "doubleSum", "fieldName": "latencyMs"}]},"ioConfig" : {"type" : "realtime"},"tuningConfig" : {"type" : "realtime","maxRowsInMemory" : "100000","intermediatePersistPeriod" : "PT10M","windowPeriod" : "PT10M"}},"properties" : {"task.partitions" : "1","task.replicants" : "1","topicPattern" : "pageViews"}}},"properties" : {"zookeeper.connect" : "localhost","druid.discovery.curator.path" : "/druid/discovery","druid.selectors.indexing.serviceName" : "druid/overlord","commit.periodMillis" : "15000","consumer.numThreads" : "2","kafka.zookeeper.connect" : "emr-header-1.cluster-500148518:2181,emr-header-2.cluster-500148518:2181, emr-header-3.cluster-500148518:2181/kafka-1.0.1","kafka.group.id" : "tranquility-kafka",}}
4、運行如下命令啟動 Tranquility。
./bin/tranquility kafka -configFile
5、在 Kafka 端啟動 producer 并發送一些數據。
./bin/kafka-console-producer.sh --broker-list emr-worker-1:9092,emr-worker-2:9092,emr-worker-3:9092 --topic pageViews
輸入:
{"time": "2018-05-24T09:26:12Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}{"time": "2018-05-24T09:26:13Z", "url": "/", "user": "bob", "latencyMs": 11}{"time": "2018-05-24T09:26:14Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}
在Tranquility日志中查看相應的消息,在Druid端則可以看到啟動了相應的實時索引 task。
?
?
?
?
?