kafka 3.5.0 raft協議安裝

前言

最近做項目,需要使用kafka進行通信,且只能使用kafka,筆者沒有測試集群,就自己搭建了kafka集群,實際上筆者在很早之前就搭建了,因為當時還是zookeeper(簡稱ZK)注冊元數據,現在新版kafka(3.0.0開始)已經自帶了元數據能力(使用raft協議)減少了kafka對zk的依賴性。筆者在查詢資料發現,說jdk至少jdk11,實測jdk8也能運行,且并不需要網上說的3+4節點,3+3即可,當然理論上broker節點越多越好,但是元數據節點建議3、5個最合適,raft的過半一致性和容錯性的綜合取舍。

準備

準備kafka安裝包:Apache Kafka

筆者使用的kafka 3.5.0和scala 2.13,采用3臺虛擬機,當然容器也不是不行,注意持久化pv pvc和配置的管理(ip換成域名,dns的切換支持),中間件建議使用虛擬機,可以降低很多容錯性。

jdk使用open jdk,配置java_home和path,以Ubuntu為例

?sudo apt install openjdk-8-jdk-headless

以macOS為例,創建一個ubuntu-server 最小安裝的虛擬機(vmware,畢竟個人使用不要錢),然后安裝openssh 和 openjdk,然后shutdown now

網絡選擇橋接,相當于一臺“真實在”網絡上的一臺物理機

這樣就得到了

192.168.0.108

192.168.0.107

192.168.0.106

3臺虛擬機

步驟

先看kafka集群的架構圖,實際上安裝的過程就是架構圖的執行過程

?

從圖中可以看出已經沒有zk的存在了,從kafka節點自己管理元數據,通過raft協議選主的方式。

1. kafka的準備

上傳kafka安裝包,必須是二進制安裝包,不要源碼包,編譯比較麻煩,然后解壓

tar -zxvf ?kafka_2.13-3.5.0.gz

查看配置目錄會發現

明顯多了kraft的配置目錄,那么如果使用kafka raft元數據中心,則需要修改kraft目錄,啟動時指定kraft目錄的配置

2. 配置修改

raft協議實際上跟zk差不多,使用raft協議的中間件就太多了,但是本質上每個節點都需要一個唯一id,zk也是如此,所以kafka kraft就相當于集成的zk。

在kraft下的有3個文件文件,其中啟動相關的是server.properties中

執行配置修改


# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller# The node id associated with this instance's roles
node.id=1# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093############################# Socket Server Settings ############################## The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-combined-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

每一行都有注釋,重點關注

筆者設定

192.168.0.106 nodeid 1?

192.168.0.107 nodeid 2

192.168.0.108 nodeid 3

至此配置基本上完成,同理一個節點可以同時是controller和broker,也可以僅僅是controller或者broker,因為controller的負載比較輕,所以一般是和broker一起。其中有個log.dir這個的路徑是下面元數據生成的路徑(選主)和數據事務日志,索引日志的存儲目錄

3. 啟動

1. 生成uuid

任意找一個節點執行:

./kafka-storage.sh random-uuid

每次執行uuid會不一樣,這個uuid標識是一個集群,所以所有節點公用一個uuid,不要每個節點重新生成,會識別不了?

?

然后執行format,如下標紅是我生成的,這個每次不是固定的

?./kafka-storage.sh format -t gZzkfRm4T1y8wSAY-ZNG5Q -c ../config/kraft/server.properties??

?格式化配置文件,同步其他節點

配置文件有什么變化?在日志配置的目錄下出現

關鍵還是meta的文件,有集群id和節點id,版本號,這個對啟動至關重要。

即在上面的log.dir的目錄生成,所以盡量不能使用臨時目錄

2. 啟動

啟動就很簡單了,使用剛剛配置的server.properties執行啟動即可

./kafka-server-start.sh -daemon ../config/kraft/server.properties

不過為了方便查看啟動日志,建議執行日志的console文件輸出

?先看事務日志和索引

驗證

驗證很簡單,查看bin同級目錄下的日志即可

日志帶有[2025-02-08 08:34:12,286] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)?

如果生成用途可以安裝kafka的控制臺,kafka-ui,不過我這里就不安裝了,因為docker安裝比較容易。

總結

kafka從3.0.0開始推出了raft模式的元數據中心,實際上類似zk,kafka自己命名kraft。使用這種方式搭建kafka集群將不再需要zk,同理,kafka的集群的每個節點可以同時是broker和controller(以前zk充當),也可以是單獨的broker,controller(負載不重,不建議單獨controller,跟zk沒區別),官方說明需要jdk11及以上,實測jdk8可以運行,但是生成建議嚴格按照官方標定的jdk執行,jdk是向下兼容的,但是不確定是否會涉及新api或新特性的使用。

另外實際使用中,可能會涉及使用iptables做nat限制kafka的連接方,比如在kafka節點通過iptables限制發送者或者消費端的ip

iptables -t nat -A PREROUTING -p tcp -m tcp --dport 9093 -j DNAT --to-destination kafkaxxx:9093

kafkaxxx --- 指定的是 Kafka 服務所在的機器地址

如果kafka是對接方提供,則在nat打通時,需要客戶端連接的服務器也執行iptables,否則可能出現連接kafka正常,但是不能消費。

iptables -t nat -A POSTROUTING -p tcp -m tcp --dport 9093 -j SNAT --to-source natxxx

natxxx ---?指定的是配置 iptables 的本機的地址

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/895095.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/895095.shtml
英文地址,請注明出處:http://en.pswp.cn/news/895095.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Unity項目接入xLua的一種流程

1. 導入xlua 首先導入xlua,這個不用多說 2. 編寫C#和Lua交互腳本 基礎版本,即xlua自帶的版本 using System.Collections; using System.Collections.Generic; using UnityEngine; using XLua; using System; using System.IO;[Serializable] public…

四次揮手詳解

文章目錄 一、四次揮手各狀態FIN_WAIT_1CLOSE_WAITFIN_WAIT_2LAST_ACKTIME_WAITCLOSE 二、雙方同時調用close(),FIN_WAIT_1狀態后進入CLOSING狀態CLOSING狀態 三、TIME_WAIT狀態詳解(1) TIME_WAIT狀態下的2MSL是什么MSL (報文最大生存時間)為…

【嵌入式 Linux 音視頻+ AI 實戰項目】瑞芯微 Rockchip 系列 RK3588-基于深度學習的人臉門禁+ IPC 智能安防監控系統

前言 本文主要介紹我最近開發的一個個人實戰項目,“基于深度學習的人臉門禁 IPC 智能安防監控系統”,全程滿幀流暢運行。這個項目我目前全網搜了一圈,還沒發現有相關類型的開源項目。這個項目只要稍微改進下,就可以變成市面上目前…

java: framework from BLL、DAL、IDAL、MODEL、Factory using oracle

oracel 21c sql: -- 創建 School 表 CREATE TABLE School (SchoolId CHAR(5) NOT NULL,SchoolName NVARCHAR2(500) NOT NULL,SchoolTelNo VARCHAR2(8) NULL,PRIMARY KEY (SchoolId) );CREATE OR REPLACE PROCEDURE addschool(p_school_id IN CHAR,p_school_name IN NVARCHAR2,p…

解決錯誤:CondaHTTPError: HTTP 000 CONNECTION FAILED for url

解決錯誤:CondaHTTPError: HTTP 000 CONNECTION FAILED for url 查看channels:vim ~/.condarcshow_channel_urls: true channels:- http://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/- http://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/…

Apache APISIX 快速入門

文章目錄 apisix 快速入門什么是apisix有了 NGINX 和 Kong,為什么還需要 Apache APISIX?軟件架構基于 Nginx 開源版本,而 Nginx 并不支持動態配置,為什么 Apache APISIX 聲稱自己可以實現動態配置? 安裝配置 APISIX配置…

2025嵌入式高頻面試題解析

一、概述 到了年初,是求職者最活躍的時間。本文梳理了嵌入式高頻面試題,幫助求職者更好地準備面試,同時也為技術愛好者提供深入學習嵌入式知識的參考。 二、C 語言基礎 2.1 指針與數組 問題 1:指針和數組的區別是什么&#xf…

1.攻防世界 baby_web

題目描述這里有提示,初始頁面 進入題目頁面如下 很簡潔的頁面只有一行HELLO WORLD ctrlu查看了源碼也沒有信息 用burp suite抓包,并發送到重放器 根據提示(初始頁面)修改訪問index.php文件 index.php index.php 是一種常見的…

什么是三層交換技術?與二層有什么區別?

什么是三層交換技術?讓你的網絡飛起來! 一. 什么是三層交換技術?二. 工作原理三. 優點四. 應用場景五. 總結 前言 點個免費的贊和關注,有錯誤的地方請指出,看個人主頁有驚喜。 作者:神的孩子都在歌唱 大家好…

【機器學習】數據預處理之數據歸一化

數據預處理之數據歸一化 一、摘要二、數據歸一化概念三、數據歸一化實現方法3.1 最值歸一化方法3.2 均值方差歸一化方法 一、摘要 本文主要講述了數據歸一化(Feature Scaling)的重要性及其方法。首先通過腫瘤大小和發現時間的例子,說明了不同…

【AIGC】語言模型的發展歷程:從統計方法到大規模預訓練模型的演化

博客主頁: [小????????] 本文專欄: AIGC | ChatGPT 文章目錄 💯前言💯語言模型的發展歷程:從統計方法到大規模預訓練模型的演化1 統計語言模型(Statistical Language Model, SLM):統…

高效知識管理與分類優化指南:從目錄設計到實踐應用

摘要 本文旨在幫助讀者在信息爆炸時代構建高效的知識管理體系,提供了知識收藏目錄、瀏覽器書簽和電腦文件夾的優化分類方案。知識收藏目錄方案包括工作與項目、記錄與日常、知識管理等八大類,具有邊界清晰、擴展靈活、貼合實際場景等優勢。瀏覽器書簽分類…

OpenAI 實戰進階教程 - 第十二節 : 多模態任務開發(文本、圖像、音頻)

適用讀者與目標 適用讀者:已經熟悉基礎的 OpenAI API 調用方式,對文本生成或數據處理有一定經驗的計算機從業人員。目標:在本節中,你將學會如何使用 OpenAI 提供的多模態接口(圖像生成、語音轉錄等)開發更…

Java面試題2025-JVM

JVM 1.為什么需要JVM,不要JVM可以嗎? 1.JVM可以幫助我們屏蔽底層的操作系統 一次編譯,到處運行 2.JVM可以運行Class文件 2.JDK,JRE以及JVM的關系 3.我們的編譯器到底干了什么事? 僅僅是將我們的 .java 文件轉換成了…

Deepseek的MLA技術原理介紹

DeepSeek的MLA(Multi-head Latent Attention)技術是一種創新的注意力機制,旨在優化Transformer模型的計算效率和內存使用,同時保持模型性能。以下是MLA技術的詳細原理和特點: 1. 核心思想 MLA技術通過低秩聯合壓縮技術,將多個注意力頭的鍵(Key)和值(Value)映射到一…

QML初識

目錄 一、關于QML 二、布局定位和錨點 1.布局定位 2.錨點詳解 三、數據綁定 1.基本概念 2.綁定方法 3.數據模型綁定 四、附加屬性及信號 1.附加屬性 2.信號 一、關于QML QML是Qt框架中的一種聲明式編程語言,用于描述用戶界面的外觀和行為;Qu…

java項目之美妝產品進銷存管理系統的設計與開發源碼(ssm+mysql)

項目簡介 美妝產品進銷存管理系統的設計與開發實現了以下功能: 美妝產品進銷存管理系統的設計與開發的主要使用者分為管理員登錄后修改個人的密碼。產品分類管理中,對公司內的所有產品分類進行錄入,也可以對產品分類進行修改和刪除。產品管…

Python(pymysql包)操作MySQL【增刪改查】

下載pymysql: pip install pymysql 在MySQL中創建數據庫:unicom create database unicom DEFAULT CHARSET utf8 COLLATE utf8_general_ci;use unicom; 在unicom中創建數據表:admin create table admin(id int not null primary key auto_i…

日志2025.2.9

日志2025.2.9 1.增加了敵人揮砍類型 2.增加了敵人的死亡狀態 在敵人身上添加Ragdoll,死后激活布偶模式 public class EnemyRagdoll : MonoBehaviour { private Rigidbody[] rigidbodies; private Collider[] colliders; private void Awake() { rigidbodi…

HTTP無狀態的概念以及對后端服務的設計會產生的影響

HTTP無狀態(Statelessness) 是指每個HTTP請求都是獨立的,服務器不會記住或依賴于前一個請求的任何信息。每次請求的處理都與其他請求沒有直接關系。也就是說,服務器在處理請求時,不會存儲關于客戶端狀態的信息。 一、HTTP無狀態的具體含義 ①每個請求獨立:每個請求包含了…