關鍵目標:
- 零丟失:端到端 Exactly Once(Source 端事務 + Streams exactly_once_v2 + Sink DLQ)。
- 低延遲:Producer 端批量壓縮 + Streams 緩存 + 合理 poll/commit 間隔。
- 可恢復:Connect/Streams 的 rebootstrap、backoff、standby、副本與快照。
二、把“數據源”和“數據去向”都交給它
2.1 Worker(Connect 集群)怎么配?
-
集群標識與元數據存儲
group.id
:多個 worker 同屬一個 Connect 集群(比如connect-realtime
)。config.storage.topic
/offset.storage.topic
/status.storage.topic
:存配置、偏移、狀態的 Topic。生產建議 RF=3、分區數默認即可。
-
連接 Kafka
bootstrap.servers
:至少寫 2~3 臺,提升可達性。metadata.recovery.strategy=rebootstrap
:斷聯后自動重引導,配合reconnect.backoff.*
。
-
端到端一致性
exactly.once.source.support
:新集群直接設enabled
;老集群先preparing
,滾動升級后再enabled
。
-
REST & 插件
listeners=http://:8083
(或加admin.listeners
做隔離)plugin.path
:放 Debezium / 目標 Sink 的插件目錄。plugin.discovery=hybrid_warn
:啟動時能發現異常但不至于直接失敗(生產逐步轉service_load
)。
這些就是必須動的,其余如 CORS、metrics、SSL/SASL、backoff、Ciphers 等保持默認即可,文末完整清單可對照。
一份 Worker 示例(最小可用):
group.id=connect-realtime
bootstrap.servers=broker-a:9092,broker-b:9092,broker-c:9092config.storage.topic=_connect_configs
offset.storage.topic=_connect_offsets
status.storage.topic=_connect_status
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3listeners=http://:8083
plugin.path=/opt/connectors,/usr/local/share/kafka/plugins
exactly.once.source.support=enabledmetadata.recovery.strategy=rebootstrap
reconnect.backoff.ms=100
reconnect.backoff.max.ms=1000
request.timeout.ms=40000
2.2 把訂單/庫存 CDC推入 Kafka
必填三件套:
name
:debezium-orders-src
connector.class
:io.debezium.connector.mysql.MySqlConnector
tasks.max
:按庫/表分片、數據庫負載與 MySQL binlog 速率評估(如2~4
)
轉換/容錯:
transforms
/predicates
:常用 SMT(去字段、展平、補全業務時間)。errors.*
:生產建議errors.tolerance=all + DLQ
(Source 沒有 DLQ?那就堅持 fail-fast或在 SMT 封裝補償邏輯)。- EOS 檢查:
exactly.once.support=required
(若 Connector 未宣稱支持則可requested
,但要讀清文檔)。
Topic 創建:
topic.creation.groups
:用 Connect 的 AdminClient 在禁 Broker 自動建 Topic時創建orders.cdc
/inventory.cdc
等,指定 RF/分區/清理策略。
其余如
config.action.reload
(外部密鑰輪轉自動重啟)、header.converter
、offsets.storage.topic
(單獨 offsets 主題)等,視規范決定。所有字段詳見文末 Source 完整表。
2.3 把處理結果發去 ES/CK/緩存,并兜底 DLQ
必填:
name
/connector.class
/tasks.max
topics
或topics.regex
(二選一)。我們常用.regex
訂閱一類結果主題(如^orders\.result\..+
)。
強烈建議打開 DLQ:
errors.deadletterqueue.topic.name=_dlq.orders.sink
errors.deadletterqueue.topic.replication.factor=3
errors.deadletterqueue.context.headers.enable=true
(帶上上下文,更好排障)
重試與容忍:
errors.retry.timeout=-1
(無限重試,或業務接受的時間窗)errors.retry.delay.max.ms=60000
errors.tolerance=all
(不讓單條臟數據拖垮任務)
三、把“訂單 + 行為 + 庫存”流在一起
目標作業:
- 以
orders.cdc
、inventory.cdc
、user.behavior
為輸入; - 滾動聚合 5 分鐘銷量窗口、Join 出“超賣風險”,并實時寫出
orders.result.picklist
、orders.result.alert
。
3.1 可靠性與并發
processing.guarantee=exactly_once_v2
:端到端事務(要求 Broker ≥ 2.5,生產建議三節點以上)。replication.factor=3
:內部 changelog / repartition topic 的 RF。num.stream.threads=2~4
:按分區數與主機核數定。num.standby.replicas=1
+max.warmup.replicas=2
+probing.rebalance.interval.ms=600000
:熱備與平滑遷移,縮短 failover 暫停時間。
3.2 性能與亂序
- 緩存:
cache.max.bytes.buffering
(如 128MB)+statestore.cache.max.bytes
(默認 10MB,可上調)。 - 亂序控制:
max.task.idle.ms=0
(默認不等生產者)。如果你跨多流 Join 嚴格按時間輸出,可以給一點 idle(>0)等待其它分區追上;極限低延遲就設-1
不等。 - 提交節奏:
commit.interval.ms=100
(在 exactly_once_v2 下默認就是 100ms)。
3.3 狀態存儲與可觀測
state.dir=/var/lib/kafka-streams/orders-etl
(每實例唯一)。default.dsl.store=rocksDB
(默認)+ 可自定義rocksdb.config.setter
做壓縮/并發。- 指標:
enable.metrics.push=true
+metric.reporters
(接入你的監控棧)。 - 運維:
metadata.recovery.strategy=rebootstrap
,配合reconnect.backoff.*
;log.summary.interval.ms=120000
。
一份 Streams 示例:
application.id=orders-etl
bootstrap.servers=broker-a:9092,broker-b:9092,broker-c:9092processing.guarantee=exactly_once_v2
replication.factor=3
num.stream.threads=3
num.standby.replicas=1
max.warmup.replicas=2
probing.rebalance.interval.ms=600000cache.max.bytes.buffering=134217728
statestore.cache.max.bytes=33554432
state.dir=/var/lib/kafka-streams/orders-etlcommit.interval.ms=100
topology.optimization=all
max.task.idle.ms=0metadata.recovery.strategy=rebootstrap
reconnect.backoff.ms=50
reconnect.backoff.max.ms=1000
request.timeout.ms=40000
四、把配置聯到“可交付”的工單
-
SLA:端到端 P95 ≤ 2s;可用性 99.9%;恢復時間 ≤ 1 分鐘。
-
Topic 規劃:
orders.cdc
(compact+delete,7 天),inventory.cdc
(同上),user.behavior
(delete,3 天)。- Repartition/Changelog 由 Streams 代管,RF=3。
-
Connect 工單:
- Worker:上面示例即可,插件路徑、EOS 開啟;
- Source(Debezium):庫賬號只讀、binlog 參數校驗、分庫/表白名單;
- Sink:目標端地址、批量/并發、DLQ 主題與保留策略;
-
Streams 工單:
- 副本與 standby、EXACTLY_ONCE_V2、狀態目錄與清理、監控看板與告警(commit 延時 / 處理速率 / record-lag-max)。
五、完整參數速查表
下面是逐項列名 + 含義/類型/默認值/取值范圍的壓縮清單,與上文場景選擇一一對應;你可以直接用于代碼審查或 CMDB 入庫。
為避免刷屏,每個條目只保留“最重要的語義”,字段一個不落:
5.1 Kafka Connect — Worker 級(3.5)
High:
config.storage.topic
(存 connector 配置的 Topic)|group.id
(Connect 集群 ID)|key.converter
(鍵格式轉換)|offset.storage.topic
(Source 偏移存儲)|status.storage.topic
(狀態存儲)|value.converter
(值格式轉換)|bootstrap.servers
(引導)|exactly.once.source.support
(Source 端 EOS:disabled/preparing/enabled)|heartbeat.interval.ms
|rebalance.timeout.ms
|session.timeout.ms
|一組 ssl.*(key/trust/keystore/password/location/type/provider/…)
Medium:
client.dns.lookup
|connections.max.idle.ms
|connector.client.config.override.policy(All/None/Principal)
|receive.buffer.bytes
|request.timeout.ms
|sasl.client.callback.handler.class
|sasl.jaas.config
|sasl.kerberos.service.name
|sasl.login.callback.handler.class
|sasl.login.class
|sasl.mechanism
|sasl.oauthbearer.jwks.endpoint.url
|sasl.oauthbearer.token.endpoint.url
|security.protocol
|send.buffer.bytes
|ssl.enabled.protocols
|ssl.keystore.type(JKS/PKCS12/PEM)
|ssl.protocol(TLSv1.3)
|ssl.provider
|ssl.truststore.type
|worker.sync.timeout.ms
|worker.unsync.backoff.ms
Low:
access.control.allow.methods
|access.control.allow.origin
|admin.listeners
|client.id
|config.providers
|config.storage.replication.factor
|connect.protocol(eager/compatible/sessioned)
|header.converter(SimpleHeaderConverter)
|inter.worker.key.generation.algorithm(HmacSHA256)
|inter.worker.key.size
|inter.worker.key.ttl.ms
|inter.worker.signature.algorithm(HmacSHA256)
|inter.worker.verification.algorithms
|listeners
|metadata.max.age.ms
|metadata.recovery.rebootstrap.trigger.ms
|metadata.recovery.strategy(rebootstrap/none)
|metric.reporters(JmxReporter)
|metrics.num.samples
|metrics.recording.level(INFO/DEBUG)
|metrics.sample.window.ms
|offset.flush.interval.ms
|offset.flush.timeout.ms
|offset.storage.partitions(25)
|offset.storage.replication.factor(3)
|plugin.discovery(only_scan/hybrid_warn/hybrid_fail/service_load)
|plugin.path
|reconnect.backoff.max.ms
|reconnect.backoff.ms
|response.http.headers.config
|rest.advertised.host.name
|rest.advertised.listener
|rest.advertised.port
|rest.extension.classes
|retry.backoff.max.ms
|retry.backoff.ms
|sasl.kerberos.kinit.cmd
|sasl.kerberos.min.time.before.relogin
|sasl.kerberos.ticket.renew.jitter
|sasl.kerberos.ticket.renew.window.factor
|sasl.login.connect.timeout.ms
|sasl.login.read.timeout.ms
|sasl.login.refresh.buffer.seconds
|sasl.login.refresh.min.period.seconds
|sasl.login.refresh.window.factor
|sasl.login.refresh.window.jitter
|sasl.login.retry.backoff.max.ms
|sasl.login.retry.backoff.ms
|sasl.oauthbearer.clock.skew.seconds
|sasl.oauthbearer.expected.audience
|sasl.oauthbearer.expected.issuer
|sasl.oauthbearer.header.urlencode
|sasl.oauthbearer.jwks.endpoint.refresh.ms
|sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms
|sasl.oauthbearer.jwks.endpoint.retry.backoff.ms
|sasl.oauthbearer.scope.claim.name
|sasl.oauthbearer.sub.claim.name
|scheduled.rebalance.max.delay.ms
|socket.connection.setup.timeout.max.ms
|socket.connection.setup.timeout.ms
|ssl.cipher.suites
|ssl.client.auth(required/requested/none)
|ssl.endpoint.identification.algorithm(https)
|ssl.engine.factory.class
|ssl.keymanager.algorithm
|ssl.secure.random.implementation
|ssl.trustmanager.algorithm
|status.storage.partitions(5)
|status.storage.replication.factor(3)
|task.shutdown.graceful.timeout.ms
|topic.creation.enable
|topic.tracking.allow.reset
|topic.tracking.enable
5.2 Kafka Connect — Source Connector
High:
name
|connector.class
|tasks.max
Low/Medium(全部列出):
tasks.max.enforce(deprecated)
|key.converter
|value.converter
|header.converter
|config.action.reload(none/restart)
|transforms
|predicates
|errors.retry.timeout
|errors.retry.delay.max.ms
|errors.tolerance(none/all)
|errors.log.enable
|errors.log.include.messages
|topic.creation.groups
|exactly.once.support(requested/required)
|transaction.boundary(poll/interval/connector)
|transaction.boundary.interval.ms
|offsets.storage.topic
5.3 Kafka Connect — Sink Connector
High:
name
|connector.class
|tasks.max
|topics
|topics.regex
Medium/Low(全部列出):
tasks.max.enforce(deprecated)
|key.converter
|value.converter
|header.converter
|config.action.reload(none/restart)
|transforms
|predicates
|errors.retry.timeout
|errors.retry.delay.max.ms
|errors.tolerance
|errors.log.enable
|errors.log.include.messages
|errors.deadletterqueue.topic.name
|errors.deadletterqueue.topic.replication.factor
|errors.deadletterqueue.context.headers.enable
5.4 Kafka Streams
High:
application.id
|bootstrap.servers
|num.standby.replicas
|state.dir
Medium(全部列出):
acceptable.recovery.lag
|cache.max.bytes.buffering
|client.id
|default.deserialization.exception.handler
|default.key.serde
|default.list.key.serde.inner
|default.list.key.serde.type
|default.list.value.serde.inner
|default.list.value.serde.type
|default.production.exception.handler
|default.timestamp.extractor
|default.value.serde
|deserialization.exception.handler
|max.task.idle.ms
|max.warmup.replicas
|num.stream.threads
|processing.exception.handler
|processing.guarantee(at_least_once/exactly_once_v2)
|production.exception.handler
|replication.factor
|security.protocol
|statestore.cache.max.bytes
|task.assignor.class
|task.timeout.ms
|topology.optimization(all/none/reuse.ktable.source.topics/merge.repartition.topics/single.store.self.join)
Low(全部列出):
application.server
|buffered.records.per.partition
|built.in.metrics.version(latest)
|commit.interval.ms
|connections.max.idle.ms
|default.client.supplier
|default.dsl.store(rocksDB/in_memory)
|dsl.store.suppliers.class
|enable.metrics.push
|log.summary.interval.ms
|metadata.max.age.ms
|metadata.recovery.rebootstrap.trigger.ms
|metadata.recovery.strategy(rebootstrap/none)
|metric.reporters
|metrics.num.samples
|metrics.recording.level(INFO/DEBUG/TRACE)
|metrics.sample.window.ms
|poll.ms
|probing.rebalance.interval.ms
|processor.wrapper.class
|rack.aware.assignment.non_overlap_cost
|rack.aware.assignment.strategy(none/min_traffic/balance_subtopology)
|rack.aware.assignment.tags
|rack.aware.assignment.traffic_cost
|receive.buffer.bytes
|reconnect.backoff.max.ms
|reconnect.backoff.ms
|repartition.purge.interval.ms
|request.timeout.ms
|retry.backoff.ms
|rocksdb.config.setter
|send.buffer.bytes
|state.cleanup.delay.ms
|upgrade.from(列出所有允許的歷史版本或 null)
|window.size.ms
|windowed.inner.class.serde(僅供普通Consumer)
|windowstore.changelog.additional.retention.ms
六、落地建議(把“可讀配置”固化到模板)
- 把上面 Worker/Source/Sink/Streams 的“示例 + 速查表”做成你們倉庫的標準模板(
*.properties
+ 注釋)。 - CMDB 建模:把所有鍵值錄入,強校驗 RF/分區/安全參數與 “二選一/必填/默認值” 規則。
- 一次壓測:把 Streams 的
cache.max.bytes.buffering
、num.stream.threads
、max.task.idle.ms
、commit.interval.ms
作為四個旋鈕做 222*2 組合測試,確定延遲/吞吐拐點。 - 觀測面:給 Connect & Streams 拉一套固定看板(rebalance 次數、lag、commit latency、DLQ TPS、task error rate)。