kafka 服務端消費者和生產者的配置

在kafka的安裝目錄下,config目錄下有個名字叫做producer.properties的配置文件

#指定kafka節點列表,用于獲取metadata,不必全部指定
#需要kafka的服務器地址,來獲取每一個topic的分片數等元數據信息。
metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092#生產者生產的消息被發送到哪個block,需要一個分組策略。
#指定分區處理類。默認kafka.producer.DefaultPartitioner,表通過key哈希到對應分區
#partitioner.class=kafka.producer.DefaultPartitioner#生產者生產的消息可以通過一定的壓縮策略(或者說壓縮算法)來壓縮。消息被壓縮后發送到broker集群,
#而broker集群是不會進行解壓縮的,broker集群只會把消息發送到消費者集群,然后由消費者來解壓縮。
#是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。
#壓縮后消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。
#文本數據會以1比10或者更高的壓縮比進行壓縮。
compression.codec=none#指定序列化處理類,消息在網絡上傳輸就需要序列化,它有String、數組等許多種實現。
serializer.class=kafka.serializer.DefaultEncoder#如果要壓縮消息,這里指定哪些topic要壓縮消息,默認empty,表示不壓縮。
#如果上面啟用了壓縮,那么這里就需要設置
#compressed.topics= 
#這是消息的確認機制,默認值是0。在面試中常被問到。
#producer有個ack參數,有三個值,分別代表:
#(1)不在乎是否寫入成功;
#(2)寫入leader成功;
#(3)寫入leader和所有副本都成功;
#要求非常可靠的話可以犧牲性能設置成最后一種。
#為了保證消息不丟失,至少要設置為1,也就
#是說至少保證leader將消息保存成功。
#設置發送數據是否需要服務端的反饋,有三個值0,1,-1,分別代表3種狀態:
#0: producer不會等待broker發送ack。生產者只要把消息發送給broker之后,就認為發送成功了,這是第1種情況;
#1: 當leader接收到消息之后發送ack。生產者把消息發送到broker之后,并且消息被寫入到本地文件,才認為發送成功,這是第二種情況;#-1: 當所有的follower都同步消息成功后發送ack。不僅是主的分區將消息保存成功了,
#而且其所有的分區的副本數也都同步好了,才會被認為發動成功,這是第3種情況。
request.required.acks=0#broker必須在該時間范圍之內給出反饋,否則失敗。
#在向producer發送ack之前,broker允許等待的最大時間 ,如果超時,
#broker將會向producer發送一個error ACK.意味著上一次消息因為某種原因
#未能成功(比如follower未能同步成功)
request.timeout.ms=10000#生產者將消息發送到broker,有兩種方式,一種是同步,表示生產者發送一條,broker就接收一條;
#還有一種是異步,表示生產者積累到一批的消息,裝到一個池子里面緩存起來,再發送給broker,
#這個池子不會無限緩存消息,在下面,它分別有一個時間限制(時間閾值)和一個數量限制(數量閾值)的參數供我們來設置。
#一般我們會選擇異步。
#同步還是異步發送消息,默認“sync”表同步,"async"表異步。異步可以提高發送吞吐量,
#也意味著消息將會在本地buffer中,并適時批量發送,但是也可能導致丟失未發送過去的消息
producer.type=sync#在async模式下,當message被緩存的時間超過此值后,將會批量發送給broker,
#默認為5000ms
#此值和batch.num.messages協同工作.
queue.buffering.max.ms = 5000#異步情況下,緩存中允許存放消息數量的大小。
#在async模式下,producer端允許buffer的最大消息量
#無論如何,producer都無法盡快的將消息發送給broker,從而導致消息在producer端大量沉積
#此時,如果消息的條數達到閥值,將會導致producer端阻塞或者消息被拋棄,默認為10000條消息。
queue.buffering.max.messages=20000#如果是異步,指定每次批量發送數據量,默認為200
batch.num.messages=500#在生產端的緩沖池中,消息發送出去之后,在沒有收到確認之前,該緩沖池中的消息是不能被刪除的,
#但是生產者一直在生產消息,這個時候緩沖池可能會被撐爆,所以這就需要有一個處理的策略。
#有兩種處理方式,一種是讓生產者先別生產那么快,阻塞一下,等會再生產;另一種是將緩沖池中的消息清空。
#當消息在producer端沉積的條數達到"queue.buffering.max.meesages"后阻塞一定時間后,
#隊列仍然沒有enqueue(producer仍然沒有發送出任何消息)
#此時producer可以繼續阻塞或者將消息拋棄,此timeout值用于控制"阻塞"的時間
#-1: 不限制阻塞超時時間,讓produce一直阻塞,這個時候消息就不會被拋棄
#0: 立即清空隊列,消息被拋棄
queue.enqueue.timeout.ms=-1#當producer接收到error ACK,或者沒有接收到ACK時,允許消息重發的次數
#因為broker并沒有完整的機制來避免消息重復,所以當網絡異常時(比如ACK丟失)
#有可能導致broker接收到重復的消息,默認值為3.
message.send.max.retries=3#producer刷新topic metada的時間間隔,producer需要知道partition leader
#的位置,以及當前topic的情況
#因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,
#將會立即刷新
#(比如topic失效,partition丟失,leader失效等),此外也可以通過此參數來配置
#額外的刷新機制,默認值600000
topic.metadata.refresh.interval.ms=60000

kafka的消費者配置(路徑和生產者配置文件路徑相同),名字叫做.consumer.properties

#消費者集群通過連接Zookeeper來找到broker。
#zookeeper連接服務器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181#zookeeper的session過期時間,默認5000ms,用于檢測消費者是否掛掉
zookeeper.session.timeout.ms=5000#當消費者掛掉,其他消費者要等該指定時間才能檢查到并且觸發重新負載均衡
zookeeper.connection.timeout.ms=10000#這是一個時間閾值。
#指定多久消費者更新offset到zookeeper中。
#注意offset更新時基于time而不是每次獲得的消息。
#一旦在更新zookeeper發生異常并重啟,將可能拿到已拿到過的消息
zookeeper.sync.time.ms=2000#指定消費
group.id=xxxxx#這是一個數量閾值,經測試是500條。
#當consumer消費一定量的消息之后,將會自動向zookeeper提交offset信息#注意offset信息并不是每消費一次消息就向zk提交
#一次,而是現在本地保存(內存),并定期提交,默認為true
auto.commit.enable=true# 自動更新時間。默認60 * 1000
auto.commit.interval.ms=1000# 當前consumer的標識,可以設定,也可以有系統生成,
#主要用來跟蹤消息消費情況,便于觀察
conusmer.id=xxx# 消費者客戶端編號,用于區分不同客戶端,默認客戶端程序自動產生
client.id=xxxx# 最大取多少塊緩存到消費者(默認10)
queued.max.message.chunks=50# 當有新的consumer加入到group時,將會reblance,此后將會
#有partitions的消費端遷移到新  的consumer上,如果一個
#consumer獲得了某個partition的消費權限,那么它將會向zk
#注冊 "Partition Owner registry"節點信息,但是有可能
#此時舊的consumer尚沒有釋放此節點, 此值用于控制,
#注冊節點的重試次數.
rebalance.max.retries=5#每拉取一批消息的最大字節數
#獲取消息的最大尺寸,broker不會像consumer輸出大于
#此值的消息chunk 每次feth將得到多條消息,此值為總大小,
#提升此值,將會消耗更多的consumer端內存
fetch.min.bytes=6553600#當消息的尺寸不足時,server阻塞的時間,如果超時,
#消息將立即發送給consumer
#數據一批一批到達,如果每一批是10條消息,如果某一批還
#不到10條,但是超時了,也會立即發送給consumer。
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360# 如果zookeeper沒有offset值或offset值超出范圍。
#那么就給個初始的offset。有smallest、largest、
#anything可選,分別表示給當前最小的offset、
#當前最大的offset、拋異常。默認largest
auto.offset.reset=smallest# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder

以上內容轉載自:https://www.cnblogs.com/jun1019/p/6256371.html

?

硬件的選擇

1.磁盤吞吐量

  生產者客戶端的性能直接受到服務器端磁盤吞吐量的影響。生產者生成的消息必須被提交到服務器保存,大多數客戶端在發送消息后會一直等待,直到至少有一個服務器確認消息已經提交成功為止。也就是說,磁盤寫入速度越快,生成消息的延遲就越低。機械硬盤(HDD)和固態盤(SSD)。固態盤的查找和訪問速度都很快,提供了最好的性能。機械盤更便宜,單塊容量也更大。在同一個服務器上使用多個機械盤,可以設置多個數據目錄,或者把它們設置成磁盤陣列,這樣可以提升機械硬盤的性能。

2.磁盤容量

  磁盤容量要多大取決于需要保存的消息的數量。

3.內存

  服務器端的內存容量是影響客戶端性能的主要因素,磁盤性能影響生產者,而內存影響消費者。消費者一般從分區尾部讀取消息,如果有生產者存在,就緊跟在生產者后面。這種情況下,消費者讀的消息會直接存放在系統的頁面緩存里,這比從磁盤上重新讀取要快。不建議把kafka同其他重要的應用部署在一起,因為它們需要共享頁面緩存,最終會降低kafka消費者的性能。

4.網絡

  網絡吞吐量決定了kafka能夠處理的最大數據流量。它和磁盤性能是制約kafka擴展規模的主要因素。kafka支持多個消費者,造成流入和流出的網絡流量不平衡,從而讓情況變得更加復雜。對于給定的主題,一個生產者可能每秒中寫入1MB數據,但可能同時有多個消費者瓜分網絡流量。其它操作也會占用網絡流量。

5.CPU

  kafka對計算處理能力的要求較低,不過他在一定程度上還是會影響整體性能。客戶端為了優化網絡和磁盤空間,會對消息進行壓縮。服務器需要對消息進行批量解壓,設置偏移量,然后重新進行批量壓縮,再保存到磁盤上。這就是kafka對計算能力有所要求的地方。

  使用kafka集群的最大好處就是可以跨服務器進行負載均衡,再則就是可以使用復制功能來避免因單點故障造成的數據丟失。在維護kafka或底層系統時,使用集群可以確保為客戶端提供高可用性。

?

轉載于:https://www.cnblogs.com/ToBeExpert/p/9823339.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/279129.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/279129.shtml
英文地址,請注明出處:http://en.pswp.cn/news/279129.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

如何在Windows 10上使用觸摸板手勢

If you’ve used a touchpad in Windows 10, you’re no doubt aware of the basic single-finger tapping and two-finger scrolling gestures. Windows 10 also packs in some additional gestures you might not have tried. 如果您在Windows 10中使用了觸摸板,那…

java全棧開發工程師_談談我對Java(J2EE)全棧工程師的理解

很多剛從事Java開發的同學都有一個疑問,到底是向全棧式程序員方向發展還是做精通某種技術的專才?對于這個問題也是見仁見智。 在給出我的觀點之前,我們先來分析一下全棧工程師的種類和專才的種類 ,之后關于這個問題的答案就很清楚…

多網卡命名規則

使用iptables做nat路由,需要幾張網卡,以下命令很有用 1.首先你要先確認你系統加載的網卡,lspci|grep -i eth,如果出現unknow情況或者未識別,最好換網卡,或者是驅動沒有加載,需要到/lib/modules的子目錄driv…

相機模擬光圈_我的相機應該使用什么光圈?

相機模擬光圈Aperture, along with shutter speed and ISO, is one of the three most important settings you control when you take a photo. It affects both the amount of light that hits your camera sensor and the depth of field of your images. Let’s look at ho…

2018-2019-1 20165234 《信息安全系統設計基礎》第四周學習總結

一、學習目標 了解ISA抽象的作用 掌握ISA,并能舉一反三學習其他體系結構 了解流水線和實現方式二、學習內容 Y86-64指令 movq指令 irmovq rrmovq mrmovq rmmovq四個整數操指令 addq,subq,andq,xorq只對寄存器數據進行操作7個跳轉指令 cmovle cmovl cmove cmovne cmo…

python數據庫實例_Python3.6簡單的操作Mysql數據庫的三個實例

安裝pymysql參考:https://github.com/PyMySQL/PyMySQL/pip install pymsql實例一import pymysql# 創建連接# 參數依次對應服務器地址,用戶名,密碼,數據庫conn pymysql.connect(host127.0.0.1, userroot, passwd123456, dbdemo)# …

Python之釘釘機器人推送天氣預報

通過Python腳本結合釘釘機器人,定時向釘釘群推送天氣預報 #!/usr/bin/python # -*- coding: utf-8 -*- # Author: aikergdedu.ml # My blog http://m51cto.51cto.blog.com import requests import re import urllib2 import json import sys import osheaders {Co…

google +按鈕_如何禁用或改善Google的Google+集成

google 按鈕If you’ve used Google lately, you’ve probably seen Google taking over Google’s search results. You don’t have to put up with it — you can disable the integration, show better social-networking pages or hide those pesky Google notifications.…

P2680 運輸計劃

傳送門 十分顯然完成工作的時間和航耗時最長的運輸計劃有關 所以題目意思就是要求最大值最小 所以可以想到二分 把所有大于mid時間的航線打上標記,顯然刪邊只能在所有這些航線的公共路徑上 要如何快速打標記是個問題 二分已經有一個log,所以只能承受O(n)…

java 集合讀寫同步_JAVA多線程學習十六 - 同步集合類的應用

1.引言在多線程的環境中,如果想要使用容器類,就需要注意所使用的容器類是否是線程安全的。在最早開始,人們一般都在使用同步容器(Vector,HashTable),其基本的原理,就是針對容器的每一個操作,都添加synchronized來進行同…

Linux下的parted工具的使用 GPT分區安裝系統

安裝系統是安裝前時候ctrlatlF2 fdisk -l parted select /dev/sdb mklabel msdos # 將GPT磁盤格式化為MBR磁盤 對大硬盤進行分區 xfs 和 ntfs Linux下的parted工具的使用也很簡單,具體操作如下: rootme:/mnt# parted /dev/sda Using /dev/sda Welcome to…

ubuntu自定義菜單_如何自定義Ubuntu的每日消息

ubuntu自定義菜單Ubuntu displays an informative message, known as the message of the day, when a user logs in at the terminal. The MOTD is fully customizable — you can add your own text and other dynamic data. 當用戶在終端上登錄時,Ubuntu將顯示信…

java避免使用orderby_java – @OrderBy在JPA中無法正常工作

OrderBy如何運作?它在以下代碼中不起作用:Employee.javapackage com.semanticbits.pojo;import java.util.List;import javax.persistence.CascadeType;import javax.persistence.Embedded;import javax.persistence.Entity;import javax.persistence.Ge…

BigDecimal四舍五入與保留位

1.引言 借用《Effactive Java》這本書中的話,float和double類型的主要設計目標是為了科學計算和工程計算。他們執行二進制浮點運算,這是為了在廣域數值范圍上提供較為精確的快速近似計算而精心設計的。然而,它們沒有提供完全精確的結果&#…

火狐web開發清楚緩存_如何使用Firefox的Web開發工具

火狐web開發清楚緩存Firefox’s Web Developer menu contains tools for inspecting pages, executing arbitrary JavaScript code, and viewing HTTP requests and other messages. Firefox 10 added an all-new Inspector tool and updated Scratchpad. Firefox的Web Develop…

Leetcode400Nth Digit第N個數字

在無限的整數序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, ...中找到第 n 個數字。 注意: n 是正數且在32為整形范圍內 ( n < 231)。 示例 1: 輸入: 3 輸出: 3 示例 2: 輸入: 11 輸出: 0 說明: 第11個數字在序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, ... 里是0&#xff0c;它是…

Java基類共同屬性設置_多選擇基類的訪問屬性-Java初學筆記

多選擇基類的訪問屬性你現在知道在定義類的訪間屬性時可用的選擇項&#xff0c;你希望使用這些類定義子類。你知道在類繼承上這些屬性所具有的效果&#xff0c;但是你如何決定到底應該使用哪一個呢?這里沒有死板和現成的規則&#xff0c;你選擇的訪問屬性取決于在將來你想用類…

IT:如何在Windows Server 2008 R2上安裝Hyper-V虛擬化

Windows Server 2008 R2 and later releases of the product ship with a virtualization platform called Hyper-V, which works quite well since it’s built into Windows. Today we’re going to show you how to install it. Windows Server 2008 R2和更高版本的產品附帶…

FineReport單行與數據庫交互的方法

1. 問題描述 我們在做一張報表填報的時候經常會遇到需要在一行進行添加動作&#xff0c;將該行數據直接與數據庫交互&#xff0c;執行存儲過程過程。我們可以通過每一行增加帆軟“插入”按鈕實現插入動作&#xff0c;并且在控件事件中增加和數據庫的交互&#xff0c;但當事件…

java cas volatile_每日一個知識點:Volatile 和 CAS 的弊端之總線風暴

每日一個知識點系列的目的是針對某一個知識點進行概括性總結&#xff0c;可在一分鐘內完成知識點的閱讀理解&#xff0c;此處不涉及詳細的原理性解讀。一、什么是總線風暴總線風暴&#xff0c;聽著真是一個帥氣的詞語&#xff0c;但如果發生在你的系統上那就不是很美麗了&#…