一、kafka原理
1、kafka是一個高性能的消息隊列系統,能夠處理大規模的數據流,并提供低延遲的數據傳輸,它能夠以每秒數十萬條消息的速度進行讀寫操作。
二、kafka優點
1、服務解耦
(1)提高系統的可維護性?
? ?通過服務解耦,可以將系統分解為獨立的部分,當需要更新或修復某個服務時,可以獨立地進行操作,而不會影響到其他服務的正常運作。這大大減少了維護工作的難度和所需時間。
?(2)增強系統的可擴展性?
? ? ? 解耦后的系統更容易擴展。添加新功能或服務通常不會影響現有系統的其他部分,從而快速響應市場和用戶的需求變化。
2、高吞吐量、低延遲
? ? kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。
3、可擴展性
? ? 集群支持熱擴展(kafka-reassign-partitions.sh)分區重分配、遷移
4、持久性、可靠性
消息被持久化到本地磁盤,并且支持數據備份防止數據丟失
5、容錯性
允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
6、高并發
支持數千個客戶端同時讀寫
三、主要概念
1. 主題 (Topic)
Kafka 中的消息以主題 (Topic) 為單位進行組織。每個主題代表一個消息流,消息生產者向主題發送消息,消息消費者從主題消費消息。
2. 分區 (Partition)
每個主題可以分為多個分區 (Partition),每個分區是一個有序、不可變的消息序列。分區的存在使得 Kafka 能夠水平擴展,可以處理大量數據并提供高吞吐量。
3. 副本 (Replica)
為了保證數據的高可用性,Kafka 允許每個分區有多個副本 (Replica),這些副本存儲在不同的服務器上。這樣,即使某個服務器故障,數據仍然可用。
4. 生產者 (Producer)
生產者是向 Kafka 主題發送消息的客戶端。生產者可以選擇將消息發送到特定的分區,也可以讓 Kafka 根據某種策略(如輪詢)決定將消息發送到哪個分區。
5. 消費者 (Consumer)
消費者是從 Kafka 主題消費消息的客戶端。消費者通常屬于某個消費者組 (Consumer Group),一個消費者組中的多個消費者可以并行消費同一個主題的不同分區,提高消費速度和效率。
6. 經紀人 (Broker)
Kafka 集群由多個經紀人 (Broker) 組成,每個經紀人是一個 Kafka 實例。經紀人負責存儲消息并處理消息的讀寫請求。
7. ZooKeeper
ZooKeeper 是一個分布式協調服務,Kafka 使用 ZooKeeper 來管理集群元數據,如主題、分區、經紀人等信息。
四、kafka安裝教程
? ?1、?點此鏈接進入官網下載地址
? ?2、點擊 圖1紅色方框DOWNLOAD KAFKA?
? ? ? ? ? ? ? ? ? ? ? ? ? ? 圖1
3、點擊圖2選中的鏈接下載即可
圖2
3、將kafka解壓到服務器后修改配置項kafka_2.13-4.0.0\config\zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=true
admin.enable=true
audit.enable=true
# admin.serverPort=8080
4、修改配置項?kafka_2.13-4.0.0\config\server.properties? ? ?
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
############################## Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=1############################# Socket Server Settings ############################## The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.11.22.122:9092# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/app/test/kafka/kafka_2.13-3.2.3/kafka-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper.connect=10.11.22.121:2181,10.11.22.122:2181,10.11.22.124:2181
zookeeper.connect=10.11.22.122:2181# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=180000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
5、下載zookeeper
按照圖3所示點擊網站鏈接下載zookeeper
(1點擊鏈接到zookeeper下載網址
? ? ? ? ? ?圖3
解壓到服務器進入到apache-zookeeper-3.9.3-bin\conf目錄下新建文件zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/app/install-test/zk/zookeeper-3.4.6/zkdata
dataLogDir=/app/install-test/zk/zookeeper-3.4.6/logs
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
audit.enable=true
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=trueserver.1=10.11.22.122:2888:2889
#server.2=10.11.22.123:2888:2889
#server.3=10.11.22.124:2888:2889
? 6、修改各配置項后先啟動zookeeper服務,進入到kafka_2.13-4.0.0文件夾啟動命令如下
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
啟動成功后查看進程
ps -ef| grep zookeeper
啟動成功后如圖4所示
圖4
7、切換到目錄apache-zookeeper-3.9.3-bin\bin目錄下輸入命令啟動zookeeper客戶端
./zkCli.sh -daemon
啟動成功后如圖5所示
圖5
8、切換到kafka_2.13-4.0.0文件夾輸入kafka啟動命令
./bin/kafka-server-start.sh -daemon ./config/server.properties
啟動成功后查看進程如圖6所示
ps -ef| grep kafka
? ? ?圖6?