kafka菜鳥教程

一、kafka原理

1、kafka是一個高性能消息隊列系統,能夠處理大規模的數據流,并提供低延遲的數據傳輸,它能夠以每秒數十萬條消息的速度進行讀寫操作。

二、kafka優點

1、服務解耦

(1)提高系統的可維護性?

? ?通過服務解耦,可以將系統分解為獨立的部分,當需要更新或修復某個服務時,可以獨立地進行操作,而不會影響到其他服務的正常運作。這大大減少了維護工作的難度和所需時間。

?(2)增強系統的可擴展性?

? ? ? 解耦后的系統更容易擴展。添加新功能或服務通常不會影響現有系統的其他部分,從而快速響應市場和用戶的需求變化。

2、高吞吐量、低延遲

? ? kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。

3、可擴展性

? ? 集群支持熱擴展(kafka-reassign-partitions.sh)分區重分配、遷移

4、持久性、可靠性


消息被持久化到本地磁盤,并且支持數據備份防止數據丟失

5、容錯性


允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)

6、高并發


支持數千個客戶端同時讀寫

三、主要概念


1. 主題 (Topic)


Kafka 中的消息以主題 (Topic) 為單位進行組織。每個主題代表一個消息流,消息生產者向主題發送消息,消息消費者從主題消費消息。

2. 分區 (Partition)


每個主題可以分為多個分區 (Partition),每個分區是一個有序、不可變的消息序列。分區的存在使得 Kafka 能夠水平擴展,可以處理大量數據并提供高吞吐量。

3. 副本 (Replica)


為了保證數據的高可用性,Kafka 允許每個分區有多個副本 (Replica),這些副本存儲在不同的服務器上。這樣,即使某個服務器故障,數據仍然可用。

4. 生產者 (Producer)


生產者是向 Kafka 主題發送消息的客戶端。生產者可以選擇將消息發送到特定的分區,也可以讓 Kafka 根據某種策略(如輪詢)決定將消息發送到哪個分區。

5. 消費者 (Consumer)


消費者是從 Kafka 主題消費消息的客戶端。消費者通常屬于某個消費者組 (Consumer Group),一個消費者組中的多個消費者可以并行消費同一個主題的不同分區,提高消費速度和效率。

6. 經紀人 (Broker)


Kafka 集群由多個經紀人 (Broker) 組成,每個經紀人是一個 Kafka 實例。經紀人負責存儲消息并處理消息的讀寫請求。

7. ZooKeeper


ZooKeeper 是一個分布式協調服務,Kafka 使用 ZooKeeper 來管理集群元數據,如主題、分區、經紀人等信息。

四、kafka安裝教程

? ?1、?點此鏈接進入官網下載地址

? ?2、點擊 圖1紅色方框DOWNLOAD KAFKA?

? ? ? ? ? ? ? ? ? ? ? ? ? ? 圖1

3、點擊圖2選中的鏈接下載即可

圖2

3、將kafka解壓到服務器后修改配置項kafka_2.13-4.0.0\config\zookeeper.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=true
admin.enable=true
audit.enable=true
# admin.serverPort=8080

4、修改配置項?kafka_2.13-4.0.0\config\server.properties? ? ?

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
############################## Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=1############################# Socket Server Settings ############################## The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.11.22.122:9092# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/app/test/kafka/kafka_2.13-3.2.3/kafka-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper.connect=10.11.22.121:2181,10.11.22.122:2181,10.11.22.124:2181
zookeeper.connect=10.11.22.122:2181# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=180000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

5、下載zookeeper

按照圖3所示點擊網站鏈接下載zookeeper

(1點擊鏈接到zookeeper下載網址

? ? ? ? ? ?圖3

解壓到服務器進入到apache-zookeeper-3.9.3-bin\conf目錄下新建文件zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/app/install-test/zk/zookeeper-3.4.6/zkdata
dataLogDir=/app/install-test/zk/zookeeper-3.4.6/logs
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
audit.enable=true
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=trueserver.1=10.11.22.122:2888:2889
#server.2=10.11.22.123:2888:2889
#server.3=10.11.22.124:2888:2889

? 6、修改各配置項后先啟動zookeeper服務,進入到kafka_2.13-4.0.0文件夾啟動命令如下

./bin/zookeeper-server-start.sh  -daemon ./config/zookeeper.properties 

啟動成功后查看進程

ps -ef| grep zookeeper

啟動成功后如圖4所示

圖4

7、切換到目錄apache-zookeeper-3.9.3-bin\bin目錄下輸入命令啟動zookeeper客戶端

./zkCli.sh  -daemon

啟動成功后如圖5所示

圖5

8、切換到kafka_2.13-4.0.0文件夾輸入kafka啟動命令

./bin/kafka-server-start.sh  -daemon ./config/server.properties 

啟動成功后查看進程如圖6所示

ps -ef| grep kafka

? ? ?圖6?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/901964.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/901964.shtml
英文地址,請注明出處:http://en.pswp.cn/news/901964.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

SQLMap工具使用

一、SQLMap介紹 SQLMap 是一款強大的開源自動化 SQL 注入工具,用于檢測和利用 Web 應用程序中的 SQL 注入漏洞。其工作原理是SQLMap 通過向目標 URL 發送帶有特殊構造的 SQL 語句的請求,觀察目標應用程序的響應,來判斷是否存在 SQL 注入漏洞…

virtualbox安裝xp系統卡頓的解決

安裝virtualbox的增強功能即可。 先去下載 — Oracle VirtualBox下載 VirtualBox Guest Additions iso鏡像 然后在這里導入iso鏡像 再按照這幾步操作 virtualbox按鍵 強制關閉xp-cuckoo的虛擬機 VBoxManage controlvm "xp-cuckoo" poweroff

觀察者 ? 事件總線:一路走來的碎碎念

寫給未來的自己:每次手敲事件模型都要 Google,干脆把思路和踩坑一次性記清楚。文章很長,都是嘮叨,目的是讓自己看兩眼就能把設計理由找回來。 目錄 為什么我要折騰事件模型?V0 ─ 單一事件的觀察者模式V1 ─ 多事件同步總線(類型拆分)V2 ─ 訂閱者優先級(鏈式調用可控)…

windwos腳本 | 基于scrcpy,只投聲音、只投畫面

安裝scrcpy,scrcpy自帶adb 寫腳本命名為 .bat 結尾 注意這里的set "PATHD:\tools\scrcpy-win64-v3.2;%PATH%" 替換成scrcpy的安裝目錄 echo off :: 設置UTF-8編碼 chcp 65001 > nul :: 設置標題 title 手機投屏工具:: 添加 scrcpy 路徑到 PATH set &q…

Android device PCO (protocol configuration options) intro

術語 英文縮寫英文全稱中文PCOprotocol configuration options協議配置選項RILradio interface layer 無線電接口層PCO介紹 PCO(Protocol Configuration Options) 是 3GPP 標準協議(TS 24.008)中定義的核心概念,用于在 LTE/5G 網絡建立 PDN 連接時傳遞動態配置參數(如 D…

Spring Boot配置文件優先級全解析:如何優雅覆蓋默認配置?

📚 一、為什么需要了解配置文件優先級? 想象一下,你正在玩一個游戲🎮,游戲里有默認設置,但你可以通過不同的方式修改這些設置: 游戲內置的默認設置(就像Spring Boot的默認配置&…

汽車行駛工況特征參數:從“速度曲線”到“駕駛DNA”的硬核解碼

作為新能源汽車行業的從業者,你是否曾困惑于這些問題: 為什么同一款電動車,不同用戶的實際續航差異高達30%?如何精準量化駕駛行為對電池壽命的影響?車企標定的“NEDC續航”與真實路況差距的根源是什么? 這…

HTTP 2.0 協議特性詳解

1. 使用二進制協議,簡化傳輸的復雜性,提高了效率 2. 支持一個 TCP 鏈接發起多請求,移除 pipeline HTTP/2 移除了 HTTP/1.1中的管道化(pipeline)機制,轉而采用多路復用(Multiplexing&#xff0…

完美解決瀏覽器不能復制的問題(比如賽氪網的中題庫練習題)

僅供復制題庫題目進行打印學習使用! 最近想把賽氪網題庫中的題目打印出來做練習,發現題庫中的題目不能復制,不能在試卷上勾畫標記太難受了,而且不能留作材料以后復習,故出此策。 而且CtrlP打印出的pdf會缺少題目。(我…

std::set (C++)

std::set 1. 概述定義特點 2. 內部實現3. 性能特征4. 常用 API5. 使用示例6. 自定義比較器7. 注意事項與優化8. 使用建議 1. 概述 定義 template<class Key,class Compare std::less<Key>,class Allocator std::allocator<Key> > class std::set;特點 有…

SSM省市區三級聯動和三表聯查附帶數據庫

SSM省市區三級聯動和三表聯查 ------附帶數據庫碼云地址&#xff1a;https://gitee.com/Mr_ZKC/NO1 數據庫在項目中

曲棍球·棒球1號位

中國女子曲棍球隊曾涌現過馬弋博、李紅俠等優秀選手&#xff0c;但“李紅”這一名字可能為信息誤差。以下為您系統介紹曲棍球&#xff0c;并結合棒球進行對比分析&#xff1a; 曲棍球&#xff08;Hockey&#xff09;核心特點 運動形式 分為草地曲棍球&#xff08;夏季奧運會項…

12芯束裝光纖不同包層線顏色之間的排列順序

為什么光纖線必須按照以下顏色順序進行排序&#xff1f;這其實是為了防止光污染的問題&#xff0c;不同顏色在傳遞光時從包層表皮漏光傳感到梳妝的其它纖芯上&#xff0c;會有光污染的問題&#xff0c;而為了減少并防止光污染的現象&#xff0c;所以在光通信之中&#xff0c;需…

c++程序的打包編譯cmake+make

c打包編譯 1 在不用系統中打包介紹1.1 linux中打包c程序的2種方式1.2 windows中打包c程序1.3 cmakeNinja和cmakemake的兩種方式對比1.3.1 Ninja是什么&#xff08;可以認為是make工具的一個替代產品&#xff09;1.3.2 cmakeNinja可以用于linux和windows系統中&#xff0c;編譯效…

Spark on K8s 在 vivo 大數據平臺的混部實戰與優化

一、Spark on K8s 簡介 (一)定義與架構 Spark on K8s 是一種將 Spark 運行在 Kubernetes(K8s)集群上的架構,由 K8s 直接創建 Driver 和 Executor 的 Pod 來運行 Spark 作業。其架構如下。 Driver Pod:相當于 Spark 集群中的 Driver,負責作業的調度和管理,它會根據作業…

MDA測量數據查看器【內含工具和源碼地址】

一、工具介紹 MDA測量數據查看器用于顯示和分析以MDF格式提供的測量數據。 支持MDF3.3之前含MDF3.3的二進制格式&#xff0c;支持Vector CANape and ETAS Inca. Kvaser CAN Logger (MDF 3.2) 文件。 MDF (Measurement Data Format)是一種二進制文件&#xff0c;用來記錄、交換…

番外篇 | SEAM-YOLO:引入SEAM系列注意力機制,提升遮擋小目標的檢測性能

前言:Hello大家好,我是小哥談。SEAM(Squeeze-and-Excitation Attention Module)系列注意力機制是一種高效的特征增強方法,特別適合處理遮擋和小目標檢測問題。該機制通過建模通道間關系來自適應地重新校準通道特征響應。在遮擋小目標檢測中的應用優勢包括:1)通道注意力增強…

使用VHDL語言實現TXT文件的讀寫操作

使用FPGA進行圖像處理時&#xff0c;通常需要將TXT文件中的圖像數據讀出到TestBench中&#xff0c;并將仿真的結果寫入到TXT文件中&#xff0c;用于確認圖像處理的結果是否正確。 VHDL中TXT文件的讀寫操作如下所示&#xff0c; --------------------------------------------…

基于Redis的4種延時隊列實現方式

延時隊列是一種特殊的消息隊列&#xff0c;它允許消息在指定的時間后被消費。在微服務架構、電商系統和任務調度場景中&#xff0c;延時隊列扮演著關鍵角色。例如&#xff0c;訂單超時自動取消、定時提醒、延時支付等都依賴延時隊列實現。 Redis作為高性能的內存數據庫&#x…