Kafka入門4.0.0版本(基于Java、SpringBoot操作)

Kafka入門4.0.0版本(基于Java、SpringBoot操作)

一、kafka概述

Kafka最初是由LinkedIn公司開發的,是一個高可靠、高吞吐量、低延遲的分布式發布訂閱消息系統,它使用Scala語言編寫,并于2010年被貢獻給了Apache基金會,隨后成為Apache的頂級開源項目。主要特點有:

  1. 為發布和訂閱提供高吞吐量
  2. 消息持久化
  3. 分布式
  4. 消費消息采用Pull模式
  5. 支持在線和離線場景

本次采用最新的kafka版本4.0.0,Kafka 4.0 最引人矚目的變化之一,當屬其默認運行在 KRaft(Kafka Raft)模式下,徹底擺脫了對 Apache ZooKeeper 的依賴。在 Kafka 的發展歷程中,ZooKeeper 曾是其核心組件,負責協調分布式系統中的元數據管理、Broker 注冊、主題分區分配等關鍵任務。然而,隨著 Kafka 功能的不斷豐富與用戶規模的持續擴大,ZooKeeper 逐漸成為系統部署和運維中的一個復雜性來源,增加了運營成本與管理難度。

KRaft 模式的引入,標志著 Kafka 在架構上的自我進化達到了一個新高度。通過采用基于 Raft 一致性算法的共識機制,Kafka 將元數據管理內嵌于自身體系,實現了對 ZooKeeper 的無縫替代。這一轉變帶來了多方面的顯著優勢:

簡化部署與運維:運維人員無需再為維護 ZooKeeper 集群投入額外精力,降低了整體運營開銷。新架構減少了系統的復雜性,使得 Kafka 的安裝、配置和日常管理變得更加直觀和高效。

增強可擴展性:KRaft 模式下,Kafka 集群的擴展性得到了進一步提升。新增 Broker 節點的加入流程更加簡便,能夠更好地適應大規模數據處理場景下對系統資源動態調整的需求。

提升系統性能與穩定性:去除 ZooKeeper 這一外部依賴后,Kafka 在元數據操作的響應速度和一致性方面表現出色。尤其是在高并發寫入和讀取場景下,系統的穩定性和可靠性得到了增強,減少了因外部組件故障可能導致的單點問題。

  • 之前的架構

在這里插入圖片描述

  • 現在的架構

在這里插入圖片描述

kafka消費模型

不同消費者組可以消費全量的消息,相同消費者組內的消費者只能消費一部分。

在這里插入圖片描述

kafka基本概念

Producer(生產者)

消息的生產者,負責將消息發送到Kafka集群中。

Consumer(消費者)

消息的消費者,負責從Kafka集群中讀取并處理消息

Broker(服務代理節點)

Kafka集群中的一個或多個服務器,負責存儲和轉發消息。

Topic(主題)

Kafka中的消息以主題為單位進行歸類,生產者發送消息到特定主題,消費者訂閱并消費這些主題的消息。

Partition(分區)

每個主題可以細分為多個分區,分區是Kafka存儲消息的物理單位,每個分區可以看作是一個有序的、不可變的消息序列。

Replica(副本)

Kafka為每個分區引入了多副本機制,以提高數據的安全性和可靠性。副本分為leader和follower,其中leader負責處理讀寫請求,follower負責從leader同步數據。

Consumer Group(消費者組)

由多個消費者組成,消費者組內的消費者共同消費同一個主題的消息,但每個消費者只負責消費該主題的一個或多個分區,避免消息重復消費。

kraft

通過采用基于 Raft 一致性算法的共識機制,Kafka 將元數據管理內嵌于自身體系,實現了對 ZooKeeper 的無縫替代

kafka發送端采用push模式

kafka消費端采用pull模式訂閱并消費消息

在這里插入圖片描述

Kafka的工作原理

可以概括為以下幾個步驟:

  • 消息發布: 生產者將消息發送到Kafka集群的特定主題,并可以選擇發送到該主題的哪個分區。如果未指定分區,Kafka會根據負載均衡策略自動選擇分區。

  • 消息存儲: Kafka將接收到的消息存儲在磁盤上的分區中,每個分區都是一個有序的消息序列。Kafka使用順序寫入和零拷貝技術來提高寫入性能,并通過多副本機制確保數據的安全性和可靠性。

在這里插入圖片描述

  • 消息消費: 消費者組中的消費者從Kafka集群中訂閱并消費消息。每個消費者負責消費一個或多個分區中的消息,并確保消息至少被消費一次。消費者可以使用拉(Pull)模式或推(Push)模式從Kafka中拉取消息。

在這里插入圖片描述

  • 負載均衡: Kafka通過ZooKeeper維護集群的元數據信息,包括分區和消費者的對應關系。當消費者數量或分區數量發生變化時,Kafka會重新分配分區給消費者,以實現負載均衡。

  • 容錯機制: Kafka通過多副本機制實現容錯。當leader副本出現故障時,Kafka會從ISR(In-Sync Replicas)集合中選擇一個新的leader副本繼續對外提供服務。同時,Kafka還提供了多種可靠性級別供用戶選擇,以滿足不同的業務需求。

kafka特點

一、Kafka的持久化機制

Kafka的持久化機制主要涉及消息的存儲和復制。Kafka以日志的形式存儲消息,每個主題(Topic)被劃分為多個分區(Partition),每個分區中的消息按照順序進行存儲。Kafka使用多個副本(Replica)保證消息的持久性和可靠性,每個分區的消息會被復制到多個副本中,以防止數據丟失。此外,Kafka還允許根據配置的保留策略來保留已消費的消息一段時間,以便在需要時進行檢索和恢復。

Kafka的副本機制是其實現高可用性和數據持久性的重要基石。每個主題的每個分區都配置有多個副本,這些副本分散保存在不同的Broker上,從而能夠對抗部分Broker宕機帶來的數據不可用問題。Kafka的副本機制包括領導者副本(Leader Replica)和追隨者副本(Follower Replica):

領導者副本:負責處理所有的讀寫請求,包括生產者的消息寫入和消費者的消息讀取。

追隨者副本:從領導者副本異步拉取消息,并寫入到自己的提交日志中,從而實現與領導者副本的同步。追隨者副本不對外提供服務,只作為數據的冗余備份。

Kafka還引入了ISR(In-Sync Replicas)機制,即與領導者副本保持同步的副本集合。只有處于ISR中的副本才能參與到消息的寫入和讀取過程中,以確保數據的一致性和可靠性。當某個副本與領導者副本的同步延遲超過一定的閾值時,它會被踢出ISR,直到同步恢復正常。

二、Kafka的數據一致性

Kafka通過多個機制來確保數據的一致性,包括副本同步、ISR機制、生產者事務和消費者事務等:

副本同步:確保主副本將數據同步到所有副本的過程,在副本同步完成之前,生產者才會認為消息已經被成功寫入。

ISR機制:通過動態調整ISR列表中的副本,確保只有可靠的副本參與到數據的讀寫操作,從而提高數據的一致性和可靠性。

生產者事務:Kafka的生產者事務機制可以確保消息的Exactly-Once語義,即消息不會被重復寫入或丟失。生產者事務將消息的發送和位移提交等操作放在同一個事務中,一旦事務提交成功,就意味著消息已經被成功寫入,并且對應的位移也已經提交。

消費者事務:雖然Kafka的消費者通常不直接支持事務但消費者可以通過提交位移(Offset)來確保消息的正確消費。消費者事務將消息的拉取和位移提交等操作放在同一個事務中,以確保消息不會被重復消費或丟失。

二、kafka應用

2.1 win11安裝kafka4.0.0

下載地址:https://kafka.apache.org/downloads 下載最后一個kafka-2.13-4.0.0.tgz

在這里插入圖片描述

下載好之后,把這個壓縮包解壓就行了,然后找到config下面的server.properties

找到log.dirs改成自己電腦上的目錄

log.dirs=E:\\runSoft\\kafka\\data

在這里插入圖片描述

  • 第一步 獲取uuid

先打開命令行,進入到bin下面的windows目錄下

命令

kafka-storage.bat random-uuid

在這里插入圖片描述

先獲取uuid,我的uuid為ANVnC_s-QYGJF1C7wu9Aww

  • 第二步 格式化日志

命令:

kafka-storage.bat format --standalone -t PPEZ2LW8T8yjZNWnfNHorQ -c ../../config/server.properties

在這里插入圖片描述

  • 第三步 啟動

打開命令行,進入到bin下面的windows目錄下 啟動命令

kafka-server-start.bat ../../config/server.properties
創建topic
kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092
啟動一個消費端
kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
啟動一個生產端
kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
問題
1、如果提示如下
命令行  輸入行太長。
命令語法不正確。

則需要把目錄變短,目錄太長,win11不讓輸入。

2,tgz需要解壓兩次
只解壓一次是不行的,tgz是打包之后壓縮的。

3、如果啟動失敗,需要重新配置

重新配置時。把log.dirs的路徑下面的東西清空
2.2 java開發kafka

第一步,引入依賴

 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version>
</dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.17</version>
</dependency>

第二步,建立生產者

public class Producer {public static void main(String[] args) {Map<String,Object> props = new HashMap<>();//  kafka 集群 節點props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");String topic = "test";KafkaProducer<String,  String> producer = new KafkaProducer(props);producer.send(new ProducerRecord<String, String>(topic, "key", "value-1"));producer.send(new ProducerRecord<String, String>(topic, "key", "value-2"));producer.send(new ProducerRecord<String, String>(topic, "key", "value-3"));producer.close();}}

ProducerRecord 是 Kafka 中的一個核心類,它代表了一組 Kafka 需要發送的 key/value 鍵值對,它由記錄要發送到的主題名稱(Topic Name),**可選的分區號(Partition Number)**以及可選的鍵值對構成。

在這里插入圖片描述

第三步、建立消費者類

public class Consumer {public static void main(String[] args){Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String , String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test"));while (true) {ConsumerRecords<String,String> records =  consumer.poll(Duration.ofDays(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("partition = %d ,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());}}}
}

運行效果

在這里插入圖片描述

2.3 spring boot整合kafka

第一步,引入依賴

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.1</version><relativePath/> <!-- lookup parent in repository --></parent><artifactId>spring_boot_kafka_demo</artifactId><packaging>jar</packaging><name>spring_boot_kafka_demo Maven Webapp</name><url>http://maven.apache.org</url><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

第二步,編寫配置文件

編寫resources下的application.yml

spring:kafka:bootstrap-servers: localhost:9092consumer:auto-offset-reset: earliest

第三步,編寫生產者

@Service
public class Producer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("topic1", message);}@PostConstructpublic void init() {sendMessage("Hello, Kafka!");}
}

第四步,編寫消費者

@Component
public class Consumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}
}

第五步,編寫啟動類

@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}}

運行效果

在這里插入圖片描述

2.4 記錄日志到kafka中

第一步,在2.3的基礎上,添加依賴

<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.12</version> <!-- Spring Boot 3.x 推薦版本 -->
</dependency>

第二步,添加kafka的日志appender類

public class KafkaLogbackAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {private String topic = "application-logs";private String bootstrapServers = "localhost:9092";private KafkaProducer<String, String> producer;@Overridepublic void start() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());this.producer = new KafkaProducer<>(props);super.start();}@Overrideprotected void append(ILoggingEvent eventObject) {String msg = eventObject.getFormattedMessage();producer.send(new ProducerRecord<>(topic, msg));}@Overridepublic void stop() {if (producer != null) {producer.close();}super.stop();}// Getter and Setter for XML configpublic void setTopic(String topic) {this.topic = topic;}public void setBootstrapServers(String bootstrapServers) {this.bootstrapServers = bootstrapServers;}
}

第三步,在resources下添加logback-spring.xml文件

<configuration debug="false" scan="true" scanPeriod="30 seconds"><!-- 定義日志格式 --><property name="PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"/><!-- 控制臺輸出 --><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>${PATTERN}</pattern></encoder></appender><!-- Kafka Appender --><appender name="KAFKA" class="com.demo.KafkaLogbackAppender"><bootstrapServers>localhost:9092</bootstrapServers><topic>application-logs</topic></appender><!-- 根日志輸出 --><root level="info"><appender-ref ref="STDOUT"/><appender-ref ref="KAFKA"/></root></configuration>

第四步,修改Producer類

@Service
public class Producer {private static final Logger logger = LoggerFactory.getLogger(Producer.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("topic1", message);}@PostConstructpublic void init() {sendMessage("Hello, Kafka!");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");}
}

第五步,修改Consumer類

@Component
public class Consumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}@KafkaListener(id = "myId2", topics = "application-logs")public void listen2(String in) {System.out.println("resinfo:"+in);}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/83661.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/83661.shtml
英文地址,請注明出處:http://en.pswp.cn/web/83661.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

react react-router-dom中獲取自定義參數v6.4版本之后

路由配置, AutnToken 組件作為權限、登錄管理 import { createBrowserRouter, Navigate } from react-router-dom; import Layout from /layout/index; import Login from /pages/login; import Page404 from /pages/404;import AutnToken from /components/authToken; import…

AI中的Prompt

1. System 作用&#xff1a;設定 AI 的“角色設定”和“行為準則”。 內容&#xff1a;通常是描述 LLM 的身份、語氣、行為范圍、約束規則。 類似&#xff1a;在大語言模型中是最優先被考慮的提示。 示例&#xff1a; 你是一個專業的商品評價分析助手&#xff0c;請根據用戶…

從人工到智能:IACheck如何重構檢測報告審核工作流?

從人工到智能&#xff1a;IACheck如何重構檢測報告審核工作流&#xff1f; 在當今AI技術迅猛發展的時代&#xff0c;各行各業正經歷從“人工驅動”到“智能驅動”的根本性變革。檢測認證&#xff08;TIC&#xff09;行業作為關乎質量與安全的重要支柱&#xff0c;也不例外。在…

React事件處理:如何給按鈕綁定onClick點擊事件?

系列回顧&#xff1a; 在前幾篇文章中&#xff0c;我們已經學會了如何使用 State 管理組件的內部數據&#xff0c;以及如何通過 Props 實現父子組件之間的通信。我們的組件現在已經有了“數據”和“外觀”。但是&#xff0c;它還像一個只能看的“模型”&#xff0c;無法與用戶進…

【機器學習|學習筆記】粒子群優化(Particle Swarm Optimization, PSO)詳解,附代碼。

【機器學習|學習筆記】粒子群優化&#xff08;Particle Swarm Optimization, PSO&#xff09;詳解&#xff0c;附代碼。 【機器學習|學習筆記】粒子群優化&#xff08;Particle Swarm Optimization, PSO&#xff09;詳解&#xff0c;附代碼。 文章目錄 【機器學習|學習筆記】粒…

深度剖析:AI 社媒矩陣營銷工具,如何高效獲客?

在社交媒體營銷領域&#xff0c;競爭日益激烈&#xff0c;傳統的社媒矩陣運營方式面臨諸多挑戰。而 AI 社媒矩陣營銷工具的出現&#xff0c;正以前所未有的方式重構社媒矩陣的底層架構&#xff0c;為營銷人員帶來了全新的機遇與變革。接下來&#xff0c;我們將從技術破局、實戰…

Spring XML 常用命名空間配置

Spring XML 常用命名空間配置 下面是一個綜合性的Spring XML配置樣例&#xff0c;展示了各種常用命名空間的使用方式&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframework.org/schema/beans&quo…

UE5場景漫游——開始界面及關卡跳轉

UE中實現UMG游戲界面搭建及藍圖控制&#xff0c;點擊游戲界面中的按鈕實現關卡的跳轉效果。 一、游戲界面顯示。1.創建UMG&#xff0c;2.搭建UI。3.關卡藍圖控制顯示 二、點擊按鈕之后實現關卡跳轉

CSS 外邊距合并(Margin Collapsing)問題研究

在 CSS 中&#xff0c;margin-top 屬性會導致外部 DIV 移動的現象主要是由于 外邊距合并&#xff08;Margin Collapsing&#xff09; 造成的。這是 CSS 盒模型的一個特性&#xff0c;可能會與直覺相悖。 外邊距合并的原理 當一個元素&#xff08;如內部 DIV&#xff09;的 ma…

清理電腦C磁盤,方法N:使用【360軟件】中的【清理C盤空間】

1、先下載并打開【360安全衛士】&#xff0c;點擊如下位置&#xff1a; 之后&#xff0c;可以把這個東西&#xff0c;創建快捷方式到電腦桌面&#xff0c;方便以后使用&#xff1a;

微服務集成seata分布式事務 at模式快速驗證

微服務集成Seata分布式事務 本次demo代碼地址業務場景&#xff1a;一般用于以下場景&#xff1a;使用 AT 模式的優勢&#xff08;適用于快速驗證&#xff09;&#xff1a;快速驗證建議步驟&#xff1a;注意事項&#xff1a; 工具環境微服務版本選擇Nacos 環境搭建與啟動nacos 下…

LLM基礎5_從零開始實現 GPT 模型

基于GitHub項目&#xff1a;https://github.com/datawhalechina/llms-from-scratch-cn 設計 LLM 的架構 GPT 模型基于 Transformer 的 decoder-only 架構&#xff0c;其主要特點包括&#xff1a; 順序生成文本 參數數量龐大&#xff08;而非代碼量復雜&#xff09; 大量重復…

Android 中 linux 命令查詢設備信息

一、getprop 命令 在 Linux 系統中&#xff0c; getprop 命令通常用于獲取 Android 設備的系統屬性&#xff0c;這些屬性包括設備型號、Android 版本、電池狀態等。 1、獲取 Android 版本號 adb shell getprop ro.build.version.release2、獲取設備型號 adb shell getprop …

26考研 | 王道 | 計算機組成原理 | 六、總線

26考研 | 王道 | 計算機組成原理 | 六、總線 文章目錄 26考研 | 王道 | 計算機組成原理 | 六、總線6.1 總線概述1. 總線概述2. 總線的性能指標 6.2 總線仲裁&#xff08;考綱沒有&#xff0c;看了留個印象&#xff09;6.3 總線操作和定時6.4 總線標準&#xff08;考綱沒有&…

SE(Secure Element)加密芯片與MCU協同工作的典型流程

以下是SE&#xff08;Secure Element&#xff09;加密芯片與MCU協同工作的典型流程&#xff0c;綜合安全認證、數據保護及防篡改機制&#xff1a; 一、基礎認證流程&#xff08;參數保護方案&#xff09; 密鑰預置? SE芯片與MCU分別預置相同的3DES密鑰&#xff08;Key1、Key2…

數據庫——MongoDB

一、介紹 1. MongoDB 概述 MongoDB 是一款由 C 語言編寫的開源 NoSQL 數據庫&#xff0c;采用分布式文件存儲設計。作為介于關系型和非關系型數據庫之間的產品&#xff0c;它是 NoSQL 數據庫中最接近傳統關系數據庫的解決方案&#xff0c;同時保留了 NoSQL 的靈活性和擴展性。…

WebSocket 前端斷連原因與檢測方法

文章目錄 前言WebSocket 前端斷連原因與檢測方法常見 WebSocket 斷連原因及檢測方式聊天系統場景下的斷連問題與影響行情推送場景下的斷連問題與影響React 前端應對斷連的穩健策略自動重連機制的設計與節流控制心跳機制的實現與保持連接存活連接狀態管理與 React 集成錯誤提示與…

2025年真實面試問題匯總(三)

線上數據庫數據丟失如何恢復 線上數據庫數據丟失的恢復方法需要根據數據丟失原因、備份情況及數據庫類型&#xff08;如MySQL、SQL Server、PostgreSQL等&#xff09;綜合處理&#xff0c;以下是通用的分步指南&#xff1a; 一、緊急止損&#xff1a;暫停寫入&#xff0c;防止…

Android音視頻多媒體開源框架基礎大全

安卓多媒體開發框架中&#xff0c;從音頻采集&#xff0c;視頻采集&#xff0c;到音視頻處理&#xff0c;音視頻播放顯示分別有哪些常用的框架&#xff1f;分成六章&#xff0c;這里一次幫你總結完。 音視頻的主要流程是錄制、處理、編解碼和播放顯示。本文也遵循這個流程展開…

安卓上架華為應用市場、應用寶、iosAppStore上架流程,保姆級記錄(1)

上架前請準備好apk、備案、軟著、企業開發者賬號&#xff01;&#xff01;&#xff01;其余準備好app相關的截圖、介紹、測試賬號&#xff0c;沒講解明白的評論區留言~ 華為應用市場 1、登錄賬號 打開 華為開發者平臺 https://developer.huawei.com/consumer/cn/ 2.登錄企…