Kafka:安裝和配置

?producer:發布消息的對象,稱為消息產生者 (Kafka topic producer)

topic:Kafka將消息分門別類,每一個消息稱為一個主題(topic)

consumer:訂閱消息并處理發布消息的對象稱為消費者(consumer)

broker:已發布的消息保存在一組服務器中,稱為kafka集群,集群中的每一個服務器都是一個代理(broker),消費者(consumer)可以訂閱一個或者多個主題(topic),并從broker中拉取數據,從而消費這些已發布的信息。

1、Kafka對zookeeper是一個強依賴,保存Kafka相關的節點數據,所以安裝kafka之前要先安裝zookeeper

下載鏡像

docker pull zookeeper:3.4.14

創建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

下載鏡像

docker pull wurstmeister/kafka:2.12-2.3.1

創建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

2、入門案例

①創建kafka-demo工程并引入依賴

        <!--kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

②創建ProducerQuickStart生產者類并實現

package com.heima.kafkademo.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*生產者*/
public class ProducerQuickStart {public static void main(String[] args) {/*1、kafka配置信息*/Properties properties = new Properties();//kafka連接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//發送失敗,失敗的重試次數properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);//key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*2、生產對象*/KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封裝發送消息的對象ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");/*3、發送消息*/producer.send(record);/*4、關閉通道,負責消息發送不成功*/producer.close();}
}

③創建ConsumerQuickStart消費者類并實現

package com.heima.kafkademo.sample;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*消費者*/
public class ConsumerQuickStart {public static void main(String[] args) {/*1、kafka配置信息*/Properties properties = new Properties();//kafka連接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//發送失敗,失敗的重試次數properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);//key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*2、消費者對象*/KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);/*3、訂閱主題*/consumer.subscribe(Collections.singletonList("itheima-topic"));//當前線程處于一直監聽狀態while (true){//4、獲取消息ConsumerRecords<String,String> consumerRecords =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : consumerRecords) {System.out.println(record.key());System.out.println(record.value());}}}
}

④運行測試

? ? ? ? 成功接收到消息

總結

  • 生產者發送消息,多個消費者訂閱同一個主題,只能有一個消費者收到消息(一對一)

  • 生產者發送消息,多個消費者訂閱同一個主題,所有消費者都能收到消息(一對多)

下一篇:?springboot集成kafka收發消息

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/36091.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/36091.shtml
英文地址,請注明出處:http://en.pswp.cn/news/36091.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

模擬 枚舉

分享牛客算法基礎精選題單題目打卡!!! 目錄 字符串的展開 多項式輸出 機器翻譯 : 鋪地毯 : [NOIP2016]回文日期 字符串的展開 原題鏈接 : 字符串的展開 思路 : 模擬 代碼 : #include<iostream> #include<cstring> #include<algorithm> using na…

Java課題筆記~ ServletContext

單個Servlet的配置對象 web.xml <servlet><servlet-name>FirstServlet</servlet-name><servlet-class>com.ambow.test.FirstServlet</servlet-class><init-param><param-name>charset</param-name><param-value>utf-8&…

centos自動同步北京時間

1、安裝ntpdate服務 yum -y install ntpdate 2、加入自動任務計劃 查找ntpdate的路徑&#xff1a; which ntpdate 復制這個路徑。 編輯自動任務計劃并加入ntpdate&#xff1a; crontab -e # 每小時第30分鐘同步AD域控時間 30 * * * * /usr/sbin/ntpdate -u 192.168.2.8 > …

DP——動態規劃

DP——動態規劃 動態規劃算法動態規劃的一般步驟特殊DP——背包0-1背包問題完全背包問題 總結 動態規劃算法 當涉及到解決具有重疊子問題的優化問題時&#xff0c;動態規劃是一種常用的算法技術。它通過將問題分解為一系列重疊子問題&#xff0c;并使用遞歸或迭代的方式來解決…

Spring Cloud Gateway系例—GatewayFilter 工廠

目錄 6.1.AddRequestHeader 6.2.AddRequestHeadersIfNotPresent 6.3.AddRequestParameter 6.4.AddResponseHeader 6.5.CircuitBreaker 6.5.1. 熔斷指定的狀態碼 6.6.CacheRequestBody 6.7.DedupeResponseHeader 6.8.FallbackHeaders 6.9.JsonToGrpc 6.10.LocalRespo…

TypeScript 非空斷言

TypeScript 非空斷言 發布于 2020-04-08 15:20:15 17.5K0 舉報 一、非空斷言有啥用 介紹非空斷言前&#xff0c;先來看個示例&#xff1a; function sayHello(name: string | undefined) {let sname: string name; // Error } 對于以上代碼&#xff0c;TypeScript 編譯器…

用戶端Web自動化測試-L1

目錄&#xff1a; Web自動化測試價值與體系環境安裝與使用自動化用例錄制自動化測試用例結構分析web瀏覽器控制常見控件定位方法強制等待與隱式等待常見控件交互方法自動化測試定位策略搜索功能自動化測試用戶端Web自動化測試 1.Web自動化測試價值與體系 功能測試場景: UI 自…

IntelliJ Idea 編譯時控制臺上中文輸出亂碼

猜測原因是IDEA啟動時未指定編碼信息&#xff0c;故與系統編碼保持一致&#xff08;windows中文系統默認為GBK編碼&#xff09;,當以UTF-8編碼進行編譯在控制臺會以GBK編碼輸出,從而導致亂碼 解決方案 指定Idea啟動時JVM的默認編碼為UTF-8 Help -> Edit Custom Options P…

本地圖片的image加密解密-Python 3.10-win10

本地圖片的image加密解密- Python 3.10 pyt3int22 -根據1zip下圖片批量生成加密的-物體識別.py import ioimport os import base64 import json # 指定圖片文件夾 image_dir = "./1zip/" base64code_dir = "./base64code/" base64_to_dir = "./bas…

AUTOSAR規范與ECU軟件開發(基礎篇)2.5 AUTOSAR方法論

前言 AUTOSAR方法論(AUTOSAR Methodology) 中車用控制器軟件的開發涉及系統級、 ECU級和軟件組件級。 系統級主要考慮系統功能需求、 硬件資源、 系統約束, 然后建立系統架構; ECU級根據抽象后的信息對ECU進行配置; 系統級和ECU級設計的同時, 伴隨著軟件組件級的開發。 上…

Sql server還原失敗(數據庫正在使用,無法獲得對數據庫的獨占訪問權)

一.Sql server還原失敗(數據庫正在使用,無法獲得對數據庫的獨占訪問權) 本次測試使用數據庫實例SqlServer2008r2版 錯誤詳細&#xff1a; 標題: Microsoft SQL Server Management Studio ------------------------------ 還原數據庫“Mvc_HNHZ”時失敗。 (Microsoft.SqlServer.…

《甲午》觀后感——GPT-3.5所寫

《甲午》是一部令人深思的紀錄片&#xff0c;通過生動的畫面和真實的故事&#xff0c;向觀眾展示了中國歷史上的一段重要時期。觀看這部紀錄片&#xff0c;我深受觸動&#xff0c;對歷史的認識也得到了深化。 首先&#xff0c;這部紀錄片通過精心搜集的歷史資料和珍貴的影像資料…

低成本搭建NAS,利用HFS進行內網穿透,實現公網訪問

通過HFS低成本搭建NAS&#xff0c;并內網穿透實現公網訪問 文章目錄 通過HFS低成本搭建NAS&#xff0c;并內網穿透實現公網訪問前言1.下載安裝cpolar1.1 設置HFS訪客1.2 虛擬文件系統 2. 使用cpolar建立一條內網穿透數據隧道2.1 保留隧道2.2 隧道名稱2.3 成功使用cpolar創建二級…

JMS 消息隊列接口基本使用指南

概述 介紹 JMS&#xff08;Java Message Service&#xff09;即 Java 消息服務應用程序接口&#xff0c;是一個 Java 平臺中關于面向消息中間件&#xff08;MOM&#xff09;的 API&#xff0c;用于在兩個應用程序之間&#xff0c;或分布式系統中發送消息&#xff0c;進行異步…

[保研/考研機試] KY103 2的冪次方 上海交通大學復試上機題 C++實現

題目鏈接&#xff1a; KY103 2的冪次方 https://www.nowcoder.com/share/jump/437195121691999575955 描述 Every positive number can be presented by the exponential form.For example, 137 2^7 2^3 2^0。 Lets present a^b by the form a(b).Then 137 is present…

k8s containerd 配置 http訪問harbor image【最新--官方文檔】

不看官方文檔的代價&#xff1a;在搜索了很多中文資料發現配置了都不起作用&#xff0c;浪費了很多時間。 https://github.com/containerd/containerd/blob/main/docs/cri/config.md#registry-configuration The old CRI config pattern for specifying registry.mirrors and…

MySQL8安裝和刪除教程 保姆級(Windows)

下載 官網: mysql官網點擊Downloads->MySQL Community(GPL) Downloads->MySQL Community Server(或者點擊MySQL installer for Windows) Windows下有兩種安裝方式 在線安裝 一般帶有 web字樣 這個需要聯網離線安裝 一般沒有web字樣 安裝 下載好之后,版本號可以不一樣&…

Postman中,既想傳遞文件,還想傳遞多個參數(后端)

需求:既想傳文件又想傳多個參數可以用以下方式實現

Django rest_framework Serializer中的create、Views中的create/perform_create的區別

Django rest_framework Serializer中的create、Views中的create/perform_create的區別 對于后端來說&#xff0c;前后端分離的方式能讓前后端的開發都爽。和所有的爽一樣&#xff0c;每爽一次都要付出一定的代價。而前后端分離的代價&#xff0c;就是后端要面對巨量的模塊化的功…

C語言實現插入排序

什么是插入排序&#xff1f; 插入排序&#xff08;Insertion Sort&#xff09; 是一種簡單且逐步構建有序序列的排序算法。它的思想是將數組分為兩部分&#xff1a;已排序的部分和未排序的部分。初始時&#xff0c;已排序部分只包含數組的第一個元素&#xff0c;然后逐步將未排…