[轉]kafka介紹

轉自 https://www.cnblogs.com/hei12138/p/7805475.html

  1.   kafka介紹

1.1. 主要功能

根據官網的介紹,ApacheKafka?是一個分布式流媒體平臺,它主要有3種功能:

  1:It lets you publish and subscribe to streams of records.發布和訂閱消息流,這個功能類似于消息隊列,這也是kafka歸類為消息隊列框架的原因

  2:It lets you store streams of records in a fault-tolerant way.以容錯的方式記錄消息流,kafka以文件的方式來存儲消息流

  3:It lets you process streams of records as they occur.可以再消息發布的時候進行處理

1.2. 使用場景

1:Building real-time streaming data pipelines that reliably get data between systems or applications.在系統或應用程序之間構建可靠的用于傳輸實時數據的管道,消息隊列功能

2:Building real-time streaming applications that transform or react to the streams of data。構建實時的流數據處理程序來變換或處理數據流,數據處理功能

1.3. 詳細介紹

Kafka目前主要作為一個分布式的發布訂閱式的消息系統使用,下面簡單介紹一下kafka的基本機制

  1.3.1 消息傳輸流程

    Producer即生產者,向Kafka集群發送消息,在發送消息之前,會對消息進行分類,即Topic,上圖展示了兩個producer發送了分類為topic1的消息,另外一個發送了topic2的消息。

    Topic即主題,通過對消息指定主題可以將消息分類,消費者可以只關注自己需要的Topic中的消息

    Consumer即消費者,消費者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息,然后可以對這些消息進行處理。

    從上圖中就可以看出同一個Topic下的消費者和生產者的數量并不是對應的。

  1.3.2 kafka服務器消息存儲策略

    談到kafka的存儲,就不得不提到分區,即partitions,創建一個topic時,同時可以指定分區數目,分區數越多,其吞吐量也越大,但是需要的資源也越多,同時也會導致更高的不可用性,kafka在接收到生產者發送的消息之后,會根據均衡策略將消息存儲到不同的分區中。

  在每個分區中,消息以順序存儲,最晚接收的的消息會最后被消費。

  1.3.3 與生產者的交互

    生產者在向kafka集群發送消息的時候,可以通過指定分區來發送到指定的分區中

    也可以通過指定均衡策略來將消息發送到不同的分區中

    如果不指定,就會采用默認的隨機均衡策略,將消息隨機的存儲到不同的分區中

  1.3.4 與消費者的交互

    在消費者消費消息時,kafka使用offset來記錄當前消費的位置

    在kafka的設計中,可以有多個不同的group來同時消費同一個topic下的消息,如圖,我們有兩個不同的group同時消費,他們的的消費的記錄位置offset各不項目,不互相干擾。

    對于一個group而言,消費者的數量不應該多余分區的數量,因為在一個group中,每個分區至多只能綁定到一個消費者上,即一個消費者可以消費多個分區,一個分區只能給一個消費者消費

    因此,若一個group中的消費者數量大于分區數量的話,多余的消費者將不會收到任何消息。

  1.   Kafka安裝與使用

2.1. 下載

  你可以在kafka官網 http://kafka.apache.org/downloads下載到最新的kafka安裝包,選擇下載二進制版本的tgz文件,根據網絡狀態可能需要fq,這里我們選擇的版本是0.11.0.1,目前的最新版

2.2. 安裝

  Kafka是使用scala編寫的運行與jvm虛擬機上的程序,雖然也可以在windows上使用,但是kafka基本上是運行在linux服務器上,因此我們這里也使用linux來開始今天的實戰。

  首先確保你的機器上安裝了jdk,kafka需要java運行環境,以前的kafka還需要zookeeper,新版的kafka已經內置了一個zookeeper環境,所以我們可以直接使用

  說是安裝,如果只需要進行最簡單的嘗試的話我們只需要解壓到任意目錄即可,這里我們將kafka壓縮包解壓到/home目錄

2.3. 配置

  在kafka解壓目錄下下有一個config的文件夾,里面放置的是我們的配置文件

  consumer.properites 消費者配置,這個配置文件用于配置于2.5節中開啟的消費者,此處我們使用默認的即可

  producer.properties 生產者配置,這個配置文件用于配置于2.5節中開啟的生產者,此處我們使用默認的即可

  server.properties kafka服務器的配置,此配置文件用來配置kafka服務器,目前僅介紹幾個最基礎的配置

    broker.id 申明當前kafka服務器在集群中的唯一ID,需配置為integer,并且集群中的每一個kafka服務器的id都應是唯一的,我們這里采用默認配置即可listeners 申明此kafka服務器需要監聽的端口號,如果是在本機上跑虛擬機運行可以不用配置本項,默認會使用localhost的地址,如果是在遠程服務器上運行則必須配置,例如:

          listeners=PLAINTEXT:// 192.168.180.128:9092。并確保服務器的9092端口能夠訪問

      3.zookeeper.connect 申明kafka所連接的zookeeper的地址 ,需配置為zookeeper的地址,由于本次使用的是kafka高版本中自帶zookeeper,使用默認配置即可

          zookeeper.connect=localhost:2181
2.4. 運行

啟動zookeeper

cd進入kafka解壓目錄,輸入

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動zookeeper成功后會看到如下的輸出

    2.啟動kafka

cd進入kafka解壓目錄,輸入

bin/kafka-server-start.sh config/server.properties

啟動kafka成功后會看到如下的輸出

2.5. 第一個消息

   2.5.1 創建一個topic

    Kafka通過topic對同一類的數據進行管理,同一類的數據使用同一個topic可以在處理數據時更加的便捷

    在kafka解壓目錄打開終端,輸入

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    創建一個名為test的topic

     在創建topic后可以通過輸入bin/kafka-topics.sh --list --zookeeper localhost:2181

來查看已經創建的topic

  2.4.2 創建一個消息消費者

   在kafka解壓目錄打開終端,輸入

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

   可以創建一個用于消費topic為test的消費者

     消費者創建完成之后,因為還沒有發送任何數據,因此這里在執行后沒有打印出任何數據不過別著急,不要關閉這個終端,打開一個新的終端,接下來我們創建第一個消息生產者

  2.4.3 創建一個消息生產者

    在kafka解壓目錄打開一個新的終端,輸入

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    在執行完畢后會進入的編輯器頁面

在發送完消息之后,可以回到我們的消息消費者終端中,可以看到,終端中已經打印出了我們剛才發送的消息

  1.   使用java程序

    跟上節中一樣,我們現在在java程序中嘗試使用kafka

    3.1 創建Topic

public static void main(String[] args) {
//創建topic
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.180.128:9092");
AdminClient adminClient = AdminClient.create(props);
ArrayList topics = new ArrayList();
NewTopic newTopic = new NewTopic("topic-test", 1, (short) 1);
topics.add(newTopic);
CreateTopicsResult result = adminClient.createTopics(topics);
try {
result.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

  使用AdminClient API可以來控制對kafka服務器進行配置,我們這里使用NewTopic(String name, int numPartitions, short   replicationFactor)的構造方法來創建了一個名為“topic-test”,分區數為1,復制因子為1的Topic.

3.2 Producer生產者發送消息

public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.180.128:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++)producer.send(new ProducerRecord<String, String>("topic-test", Integer.toString(i), Integer.toString(i)));producer.close();

}

使用producer發送完消息可以通過2.5中提到的服務器端消費者監聽到消息。也可以使用接下來介紹的java消費者程序來消費消息

3.3 Consumer消費者消費消息

public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.12.65:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("topic-test"),new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection collection) {
}
public void onPartitionsAssigned(Collection collection) {
//將偏移設置到最開始
consumer.seekToBeginning(collection);
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}

這里我們使用Consume API 來創建了一個普通的java消費者程序來監聽名為“topic-test”的Topic,每當有生產者向kafka服務器發送消息,我們的消費者就能收到發送的消息。

  1.   使用spring-kafka

Spring-kafka是正處于孵化階段的一個spring子項目,能夠使用spring的特性來讓我們更方便的使用kafka

4.1 基本配置信息

與其他spring的項目一樣,總是離不開配置,這里我們使用java配置來配置我們的kafka消費者和生產者。

引入pom文件


org.apache.kafka
kafka-clients
0.11.0.1


org.apache.kafka
kafka-streams
0.11.0.1


org.springframework.kafka
spring-kafka
1.3.0.RELEASE

創建配置類

我們在主目錄下新建名為KafkaConfig的類

@Configuration
@EnableKafka
public class KafkaConfig {

}

配置Topic

在kafkaConfig類中添加配置

//topic config Topic的配置開始
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<String, Object>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.180.128:9092");
return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {return new NewTopic("foo", 10, (short) 2);
}

//topic的配置結束

配置生產者Factort及Template

//producer config start
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<Integer,String>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<String,Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.128:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
//producer config end

5.配置ConsumerFactory

//consumer config start
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<Integer, String>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

@Bean
public ConsumerFactory<Integer,String> consumerFactory(){return new DefaultKafkaConsumerFactory<Integer, String>(consumerConfigs());
}@Bean
public Map<String,Object> consumerConfigs(){HashMap<String, Object> props = new HashMap<String, Object>();props.put("bootstrap.servers", "192.168.180.128:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;
}

//consumer config end

4.2 創建消息生產者

//使用spring-kafka的template發送一條消息 發送多條消息只需要循環多次即可
public static void main(String[] args) throws ExecutionException, InterruptedException {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(KafkaConfig.class);
KafkaTemplate<Integer, String> kafkaTemplate = (KafkaTemplate<Integer, String>) ctx.getBean("kafkaTemplate");
String data="this is a test message";
ListenableFuture<SendResult<Integer, String>> send = kafkaTemplate.send("topic-test", 1, data);
send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
public void onFailure(Throwable throwable) {

        }public void onSuccess(SendResult<Integer, String> integerStringSendResult) {}});

}

4.3 創建消息消費者

我們首先創建一個一個用于消息監聽的類,當名為”topic-test”的topic接收到消息之后,我們的這個listen方法就會調用。

public class SimpleConsumerListener {
private final static Logger logger = LoggerFactory.getLogger(SimpleConsumerListener.class);
private final CountDownLatch latch1 = new CountDownLatch(1);

@KafkaListener(id = "foo", topics = "topic-test")
public void listen(byte[] records) {//do something herethis.latch1.countDown();
}

}

     我們同時也需要將這個類作為一個Bean配置到KafkaConfig中

@Bean
public SimpleConsumerListener simpleConsumerListener(){
return new SimpleConsumerListener();
}

默認spring-kafka會為每一個監聽方法創建一個線程來向kafka服務器拉取消息

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/390598.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/390598.shtml
英文地址,請注明出處:http://en.pswp.cn/news/390598.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

如何開始android開發_如何開始進行Android開發

如何開始android開發Android開發簡介 (An intro to Android Development) Android apps can be a great, fun way to get into the world of programming. Officially programmers can use Java, Kotlin, or C to develop for Android. Though there may be API restrictions, …

httpd2.2的配置文件常見設置

目錄 1、啟動報錯&#xff1a;提示沒有名字fqdn2、顯示服務器版本信息3、修改監聽的IP和Port3、持久連接4 、MPM&#xff08; Multi-Processing Module &#xff09;多路處理模塊5 、DSO&#xff1a;Dynamic Shared Object6 、定義Main server &#xff08;主站點&#xff09; …

leetcode 149. 直線上最多的點數

題目 給你一個數組 points &#xff0c;其中 points[i] [xi, yi] 表示 X-Y 平面上的一個點。求最多有多少個點在同一條直線上。 示例 1&#xff1a; 輸入&#xff1a;points [[1,1],[2,2],[3,3]] 輸出&#xff1a;3 示例 2&#xff1a; 輸入&#xff1a;points [[1,1],[3,…

solidity開發以太坊代幣智能合約

智能合約開發是以太坊編程的核心之一&#xff0c;而代幣是區塊鏈應用的關鍵環節&#xff0c;下面我們來用solidity語言開發一個代幣合約的實例&#xff0c;希望對大家有幫助。 以太坊的應用被稱為去中心化應用&#xff08;DApp&#xff09;&#xff0c;DApp的開發主要包括兩大部…

2019大數據課程_根據數據,2019年最佳免費在線課程

2019大數據課程As we do each year, Class Central has tallied the best courses of the previous year, based on thousands of learner reviews. (Here are the rankings from 2015, 2016, 2017, and 2018.) 與我們每年一樣&#xff0c;根據數千名學習者的評論&#xff0c; …

2017-12-07 socket 讀取問題

1.用socke阻塞方式讀取服務端發送的數據時會出現讀取一直阻塞的情況&#xff0c;如果設置了超時時間會在超時時間后讀取到數據: 原因&#xff1a;在不確定服務器會不會發送 socket發送的數據不會返回null 或者-1 所以用常規的判斷方法是不行的。 解決辦法有兩個&#xff1a;1 …

靜態代理設計與動態代理設計

靜態代理設計模式 代理設計模式最本質的特質&#xff1a;一個真實業務主題只完成核心操作&#xff0c;而所有與之輔助的功能都由代理類來完成。 例如&#xff0c;在進行數據庫更新的過程之中&#xff0c;事務處理必須起作用&#xff0c;所以此時就可以編寫代理設計模式來完成。…

svm機器學習算法_SVM機器學習算法介紹

svm機器學習算法According to OpenCVs "Introduction to Support Vector Machines", a Support Vector Machine (SVM):根據OpenCV“支持向量機簡介”&#xff0c;支持向量機(SVM)&#xff1a; ...is a discriminative classifier formally defined by a separating …

6.3 遍歷字典

遍歷所有的鍵—值對 遍歷字典時&#xff0c;鍵—值對的返回順序也與存儲順序不同。 6.3.2 遍歷字典中的所有鍵 在不需要使用字典中的值時&#xff0c;方法keys() 很有用。 6.3.3 按順序遍歷字典中的所有鍵 要以特定的順序返回元素&#xff0c;一種辦法是在for 循環中對返回的鍵…

Google Guava新手教程

以下資料整理自網絡 一、Google Guava入門介紹 引言 Guavaproject包括了若干被Google的 Java項目廣泛依賴 的核心庫&#xff0c;比如&#xff1a;集合 [collections] 、緩存 [caching] 、原生類型支持 [primitives support] 、并發庫 [concurrency libraries] 、通用注解 [comm…

HTML DOM方法

querySelector() (querySelector()) The Document method querySelector() returns the first element within the document that matches the specified selector, or group of selectors. If no matches are found, null is returned.Document方法querySelector()返回文檔中與…

leetcode 773. 滑動謎題

題目 在一個 2 x 3 的板上&#xff08;board&#xff09;有 5 塊磚瓦&#xff0c;用數字 1~5 來表示, 以及一塊空缺用 0 來表示. 一次移動定義為選擇 0 與一個相鄰的數字&#xff08;上下左右&#xff09;進行交換. 最終當板 board 的結果是 [[1,2,3],[4,5,0]] 謎板被解開。…

數據科學領域有哪些技術_領域知識在數據科學中到底有多重要?

數據科學領域有哪些技術Jeremie Harris: “In a way, it’s almost like a data scientist or a data analyst has to be like a private investigator more than just a technical person.”杰里米哈里斯(Jeremie Harris) &#xff1a;“ 從某種意義上說&#xff0c;這就像是數…

python 算術運算

1. 算術運算符與優先級 # -*- coding:utf-8 -*-# 運算符含有,-,*,/,**,//,% # ** 表示^ , 也就是次方 a 2 ** 4 print 2 ** 4 , aa 16 / 5 print 16 / 5 , aa 16.0 / 5 print 16.0 / 5 , a# 結果再進行一次floor a 16.0 // 5.0 print 16.0 // 5.0 , aa 16 // 5 print …

c語言編程時碰到取整去不了_碰到編程墻時如何解開

c語言編程時碰到取整去不了Getting stuck is part of being a programmer, no matter the level. The so-called “easy” problem is actually pretty hard. You’re not exactly sure how to move forward. What you thought would work doesn’t.無論身在何處&#xff0c;陷…

初創公司怎么做銷售數據分析_為什么您的初創企業需要數據科學來解決這一危機...

初創公司怎么做銷售數據分析The spread of coronavirus is delivering a massive blow to the global economy. The lockdown and work from home restrictions have forced thousands of startups to halt expansion plans, cancel services, and announce layoffs.冠狀病毒的…

leetcode 909. 蛇梯棋

題目 N x N 的棋盤 board 上&#xff0c;按從 1 到 N*N 的數字給方格編號&#xff0c;編號 從左下角開始&#xff0c;每一行交替方向。 例如&#xff0c;一塊 6 x 6 大小的棋盤&#xff0c;編號如下&#xff1a; r 行 c 列的棋盤&#xff0c;按前述方法編號&#xff0c;棋盤格…

Python基礎之window常見操作

一、window的常見操作&#xff1a; cd c:\ #進入C盤d: #從C盤切換到D盤 cd python #進入目錄cd .. #往上走一層目錄dir #查看目錄文件列表cd ../.. #往上上走一層目錄 二、常見的文件后綴名&#xff1a; .txt 記事本文本文件.doc word文件.xls excel文件.ppt PPT文件.exe 可執行…

WPF效果(GIS三維篇)

二維的GIS已經被我玩爛了&#xff0c;緊接著就是三維了&#xff0c;哈哈&#xff01;先來看看最簡單的效果&#xff1a; 轉載于:https://www.cnblogs.com/OhMonkey/p/8954626.html

css注釋_CSS注釋示例–如何注釋CSS

css注釋Comments are used in CSS to explain a block of code or to make temporary changes during development. The commented code doesn’t execute.CSS中使用注釋來解釋代碼塊或在開發過程中進行臨時更改。 注釋的代碼不執行。 Both single and multi-line comments in…