Tranquility

本頁目錄
  • 與Kafka集群交互
  • Druid使用Tranquility Kafka

本文以Kafka為例,介紹在E-MapReduce中如何使用Tranquility從Kafka集群采集數據,并實時推送至Druid集群。

Tranquility是一個以push方式向Druid實時發送數據的應用。它替用戶解決了分區、多副本、服務發現、防止數據丟失等多個問題,簡化了用戶使用Druid的難度。它支持多種數據來源,包括Samza、Spark、Storm、Kafka、Flink等等。

與Kafka集群交互

首先是Druid集群與Kafka集群的交互。兩個集群交互的配置方式大體和Hadoop集群類似,均需要設置連通性、hosts等。對于非安全Kafka集群,請按照以下步驟操作:
  1. 確保集群間能夠通信(兩個集群在一個安全組下,或兩個集群在不同安全組,但兩個安全組之間配置了訪問規則)。
  2. 將 Kafka 集群的 hosts 寫入到 Druid 集群每一個節點的 hosts 列表中,注意 Kafka 集群的 hostname 應采用長名形式,如 emr-header-1.cluster-xxxxxxxx。
對于安全的 Kafka 集群,您需要執行下列操作(前兩步與非安全 Kafka 集群相同):
  1. 確保集群間能夠通信(兩個集群在一個安全組下,或兩個集群在不同安全組,但兩個安全組之間配置了訪問規則)。
  2. 將 Kafka 集群的 hosts 寫入到 Druid 集群每一個節點的 hosts 列表中,注意 Kafka 集群的 hostname 應采用長名形式,如 emr-header-1.cluster-xxxxxxxx。
  3. 設置兩個集群間的 Kerberos 跨域互信(詳情參考跨域互信),且最好做雙向互信。
  4. 準備一個客戶端安全配置文件: 
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=truekeyTab="/etc/ecm/druid-conf/druid.keytab"principal="druid@EMR.1234.COM";};

? ? ? ? ? ? 之后將該配置文件同步到 Druid 集群的所有節點上,放置于某一個目錄下面(例如/tmp/kafka/kafka_client_jaas.conf)。

  5、在 Druid 配置頁面的 overlord.jvm 里新增如下選項:

Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf

  6、在 Druid 配置頁面的 middleManager.runtime 里配置druid.indexer.runner.javaOpts=-Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf和其他JVM啟動參數。

7、重啟Druid服務。

Druid使用Tranquility Kafka

由于Tranquility是一個服務,它對于Kafka來說是消費者,對于Druid來說是客戶端。您可以使用中立的機器來運行Tranquility,只要這臺機器能夠同時連通 Kafka 集群和 Druid 集群即可。

  1、Kafka端創建一個名為 pageViews 的 topic。

-- 如果開啟了kafka 高安全:export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"--./bin/kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2:2181,emr-header-3:2181/kafka-1.0.1 --partitions 1 --replication-factor 1 --topic pageViews

  2、下載 Tranquility 安裝包,并解壓至某一路徑下。

  3、配置 datasource。

  這里假設您的 topic name 為 pageViews,并且每條 topic 都是如下形式的 json 文件:

  

{"time": "2018-05-23T11:59:43Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}{"time": "2018-05-23T11:59:44Z", "url": "/", "user": "bob", "latencyMs": 11}{"time": "2018-05-23T11:59:45Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}

對應的 dataSrouce 的配置如下:

{"dataSources" : {"pageViews-kafka" : {"spec" : {"dataSchema" : {"dataSource" : "pageViews-kafka","parser" : {"type" : "string","parseSpec" : {"timestampSpec" : {"column" : "time","format" : "auto"},"dimensionsSpec" : {"dimensions" : ["url", "user"],"dimensionExclusions" : ["timestamp","value"]},"format" : "json"}},"granularitySpec" : {"type" : "uniform","segmentGranularity" : "hour","queryGranularity" : "none"},"metricsSpec" : [{"name": "views", "type": "count"},{"name": "latencyMs", "type": "doubleSum", "fieldName": "latencyMs"}]},"ioConfig" : {"type" : "realtime"},"tuningConfig" : {"type" : "realtime","maxRowsInMemory" : "100000","intermediatePersistPeriod" : "PT10M","windowPeriod" : "PT10M"}},"properties" : {"task.partitions" : "1","task.replicants" : "1","topicPattern" : "pageViews"}}},"properties" : {"zookeeper.connect" : "localhost","druid.discovery.curator.path" : "/druid/discovery","druid.selectors.indexing.serviceName" : "druid/overlord","commit.periodMillis" : "15000","consumer.numThreads" : "2","kafka.zookeeper.connect" : "emr-header-1.cluster-500148518:2181,emr-header-2.cluster-500148518:2181,   emr-header-3.cluster-500148518:2181/kafka-1.0.1","kafka.group.id" : "tranquility-kafka",}}

  4、運行如下命令啟動 Tranquility。

./bin/tranquility kafka -configFile 

  5、在 Kafka 端啟動 producer 并發送一些數據。

./bin/kafka-console-producer.sh --broker-list emr-worker-1:9092,emr-worker-2:9092,emr-worker-3:9092 --topic pageViews

輸入:

{"time": "2018-05-24T09:26:12Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}{"time": "2018-05-24T09:26:13Z", "url": "/", "user": "bob", "latencyMs": 11}{"time": "2018-05-24T09:26:14Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}

在Tranquility日志中查看相應的消息,在Druid端則可以看到啟動了相應的實時索引 task。

——————————————————————————————
原文:https://help.aliyun.com/document_detail/72704.html

?

?

?

?

?

轉載于:https://www.cnblogs.com/wynjauu/articles/10369007.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/449951.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/449951.shtml
英文地址,請注明出處:http://en.pswp.cn/news/449951.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Iot相關雜燴

人工智能就像人的大腦,而 IoT 就像人的神經網絡 1)在天空中巨大的鳥群里,每一只鳥兒都實時判斷自己和四周同伴的距離。這時,它們各自都是一個物聯網節點。2)這些“節點”并不是簡單地收集數據,而是在實時計…

水滴石穿C語言之指針、數組和函數

基本解釋   1、指針的本質是一個與地址相關的復合類型,它的值是數據存放的位置(地址);數組的本質則是一系列的變量。   2、數組名對應著(而不是指向)一塊內存,其地址與容量在生命期內保持…

告訴你銀行在年底為存儲做的小動作

25年前,銀行的存款利率是10.98%,可謂巔峰時刻。15年前,銀行的存款利率開始下降,降到了8%的利率。 到了5年前,銀行的存款利率毫無回轉之勢,直線下降到了5%的利率。 而如今,我們無可奈何地接受了2…

爬蟲學習(五)——百度貼吧的爬取

import osimport timeimport urllib.requestimport urllib.parse# 輸入目標頁碼和吧名def header(): url "https://tieba.baidu.com/f?" baming input("請輸入要爬取的吧名") start_page int(input("請輸入起始頁")) end_page …

什么是嵌入式設備?/ 嵌入式設備的定義

什么是嵌入式設備?/ 嵌入式設備的定義 區別于通用計算機的其他設備都可以稱之為嵌入式設備 (個人電腦,服務器) 一段時期內,必備的硬件配置。 嵌入式開發包括哪些部分: 底層驅動開發: 關鍵字…

Linux mv命令、Linux cp命令、Linux scp命令

前些天發現了一個巨牛的人工智能學習網站,通俗易懂,風趣幽默,忍不住分享一下給大家。點擊跳轉到教程。 Linux mv命令用來為文件或目錄改名、或將文件或目錄移入其它位置。 語法 mv [options] source dest mv [options] source... director…

創業者談:畏懼失敗,但也要擁抱失敗

摘要:本文作者為Paydirt創始人Tristan Gamilis,他在文中分享了如何面對創業過程中的失敗。作為一個創業者,開始的時候并非全才,很多知識都是經歷了創業中的失敗,摸爬滾打之后才學會的。所以,我們在創業過程…

基于STM32F4移植W5500官方驅動庫ioLibrary_Driver(轉)

源: 基于STM32F4移植W5500官方驅動庫ioLibrary_Driver 參考: 基于STM32W5500 的Ethernet和Internet移植 Upgrade W5500 Throughput on Nucleo STM32F401RE Using SPI DMA

redis 資料

redis是什么: Redis is an open source, BSD licensed, advanced key-value store. It is often referred to as a data structure server since keys can contain strings, hashes, lists, sets and sorted sets. redis是開源,BSD許可,高級的key-value存儲系統. 可以用來存儲字…

Android應用開發——onStop的調用時機

onStop的調用時機,網上搜索到的說法大概是:“ onStop的調用是“The activity is no longer visible”,也就是完全不可見的時候調用的,這個完全不可見真的就是指視覺上的完全看不到而已,無論是按home鍵返回桌面&#xf…

UnaryOperator函數式接口

2019獨角獸企業重金招聘Python工程師標準>>> 這是一個函數式接口&#xff0c;因此可以用作lambda表達式或方法引用的賦值目標。 可以看到UnaryOperator<T>繼承了Function<T,T>接口&#xff0c;這里可是兩個T,T,還增加了static修飾的identity()方法。 然…

從程序員到項目經理

推薦研發工程師必看的內容 從程序員到項目經理 從程序員到項目經理”&#xff0c;這個標題讓我想起了很久以前一本書的名字《從Javascript到Java》。然而&#xff0c;從Javascript到Java充其量只是工具的更新&#xff0c;而從程序員到項目經理&#xff0c;卻是一個脫胎換骨的過…

linux--命令rcp和scp

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 rcp代表“remote file copy”&#xff08;遠程文件拷貝&#xff09;。該命令用于在計算機之間拷貝文件。rcp命令有兩種格式。第一種格式…

Android Camera 2.0 Api

二次圖像處理 Camera2的API擴展了對YUV的支持&#xff0c;及圖像再處理支持。要知道是否據有這個能力&#xff0c;可以調getCameraCharacteristics()方法&#xff0c;檢查REPROCESS_MAX_CAPTURE_STALL這個鍵值 。如果設備支持再處理&#xff0c;則可以調用createReprocessableC…

scala-數組操作

package com.bigdataimport scala.collection.mutable.ArrayBufferobject ArrayO {def main(args: Array[String]): Unit {val arrayBuffer ArrayBuffer[Int]()//默認情況下都是在ArrayBuffer末尾增加元素arrayBuffer 1arrayBuffer (4,5,6,7,8,9,10)arrayBuffer Array(1,2…

spring cloud微服務分布式云架構 - Spring Cloud集成項目簡介

Spring Cloud集成項目有很多&#xff0c;下面我們列舉一下和Spring Cloud相關的優秀項目&#xff0c;我們的企業架構中用到了很多的優秀項目&#xff0c;說白了&#xff0c;也是站在巨人的肩膀上去整合的。在學習Spring Cloud之前大家必須了解一下相關項目&#xff0c;希望可以…

Nokia落寞身影下 三星成為全球最大手機廠商

摘要&#xff1a;在諾基亞統治全球最大手機廠商寶座長達14年后&#xff0c;三星今年首次取代諾基亞&#xff0c;成為全球最大手機廠商。據IHS iSuppli的數據顯示&#xff0c;三星預計今年手機出貨量將占全球29&#xff05;&#xff0c;而落寞的諾基亞市場份額將下降到24&#x…

Linux中cp和scp命令的使用方法

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 Linux為我們提供了兩個用于文件copy的命令&#xff0c;一個是cp&#xff0c;一個是scp&#xff0c;但是他們略有不同。 cp --- 主要是用…

Django:學習筆記(2)——創建第一個應用

Django&#xff1a;學習筆記(2)——創建第一個應用 創建應用 在 Django 中&#xff0c;每一個應用都是一個 Python 包&#xff0c;并且遵循著相同的約定。Django 自帶一個工具&#xff0c;可以幫你生成應用的基礎目錄結構&#xff0c;這樣你就能專心寫代碼&#xff0c;而不是創…

dubbo源碼解析(十)遠程通信——Exchange層

遠程通訊——Exchange層 目標&#xff1a;介紹Exchange層的相關設計和邏輯、介紹dubbo-remoting-api中的exchange包內的源碼解析。前言 上一篇文章我講的是dubbo框架設計中Transport層&#xff0c;這篇文章我要講的是它的上一層Exchange層&#xff0c;也就是信息交換層。官方文…