kafka集成篇

kafka的Java客戶端

生產者

1.引入依賴

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.3</version></dependency>

2.生產者發送消息的基本實現

/*** 消息的發送?*/
public class MyProducer {private final static String TOPIC_NAME = "my-replicated-topic";public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094");// 把發送的key從字符串序列化為字節數組props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 把發送消息value從字符串序列化為字節數組props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());RecordMetadata metadata = null;try (Producer<String, String> producer = new KafkaProducer<>(props)) {Order order = new Order(1L, 99.9D);// 未指定發送分區,具體發送的分區計算公式:hash(key)%partitionNumProducerRecord<String, String> producerRecord = newProducerRecord<>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));// 等待消息發送成功的同步阻塞?法metadata = producer.send(producerRecord).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);} finally {if (metadata != null) {// =====阻塞=======System.out.println("同步?式發送消息結果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());}}}
}

3.發送消息到指定分區

image-20230814170847906

4.發送消息未指定分區

發送消息未指定分區,會通過業務key的hash運算,算出消息往哪個分區上發

// 未指定發送分區,具體發送的分區計算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = newProducerRecord<>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));

5.同步發送消息

image-20230814172356139

如果生產者發送消息沒有收到ack,生產者會阻塞,阻塞到3s的時間,如果還沒有收到消息,會進行重試。重試的次數3次。

    RecordMetadata metadata = producer.send(producerRecord).get();System.out.println("同步?式發送消息結果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());

6.異步發送消息

image-20230814173250894

異步發送,生產者發送完消息后就可以執行之后的業務,broker在收到消息后異步調用生產者提供的callback回調方法。

            // 異步發送消息 Callback回調接口producer.send(producerRecord, new Callback() {// 異步回調方法@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e != null) {System.err.println("發送消息失敗:" +e.getMessage());}if (metadata != null) {System.out.println("異步?式發送消息結果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}}});System.out.println("處理之后的邏輯~");

輸出結果:

image-20230814173709486

7.生產者中的ack的配置

在同步發消息的場景下:生產者發送消息到broker上后,ack會有3種不同的選擇

  • ack = 0 :kafka-cluster不需要任何的broker收到消息,就立即返回ack給生產者就可以繼續發送下一條消息,效率是最高的但最容易丟消息
  • ack=1(默認):多副本之間的leader已經收到消息,并把消息寫?到本地的log中,才會返回ack給生產者,性能和安全性是最均衡的(這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失)
  • ack=-1/all:需要等待 min.insync.replicas(默認為1,推薦配置大于等于2) 這個參數配置的副本個數都成功寫入日志才會返回ack給生產者,這種策略會保證只要有?個備份存活就不會丟失數據。這種方式最安全但性能最差。(?般除非是金融級別,或跟錢打交道的場景才會使用這種配置)

image-20230814175916962

code:

props.put(ProducerConfig.ACKS_CONFIG, "1");

關于ack和重試(如果沒有收到ack,就開啟重試)的配置

  • 發送會默認會重試3次,每次間隔100ms
props.put(ProducerConfig.ACKS_CONFIG, "1");/*發送失敗會重試,默認重試間隔100ms,【重試能保證消息發送的可靠性,但是也可能造成消息重復發送】,?如?絡抖動,所以【需要在接收者那邊做好消息接收的冪等性處理】*/props.put(ProducerConfig.RETRIES_CONFIG, 3);// 重試間隔設置props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

8.關于消息發送的緩沖區

發送的消息會先進入到本地緩沖區(32mb),kakfa會跑?個線程,該線程去緩沖區中取16k的數據,發送到kafka,如果到10毫秒數據沒取滿16k,也會發送?次。

image-20230814180715793

  • kafka默認會創建一個消息緩沖區,用來存放要發送的消息,緩沖區是32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  • kafka本地線程會去緩沖區中?次拉16k的數據,發送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  • 如果線程拉不到16k的數據,間隔10ms也會將已拉到的數據發到broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

消費者

1.消費者消費消息的基本實現

public class MyConsumer {private final static String TOPIC_NAME = "my-replicated-topic";private final static String CONSUMER_GROUP_NAME = "testGroup";public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094");// 消費分組名props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 1.創建?個消費者的客戶端try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {// 2.消費者訂閱主題列表consumer.subscribe(Collections.singletonList(TOPIC_NAME));while (true) {/** 3.poll()API 是拉取消息的?輪詢*/ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {// 4.操作消息System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value());}}} catch (Exception e) {throw new RuntimeException(e);}}
}

2.消費者自動提交和手動提交offset

1)提交的內容

消費者無論是自動提交還是手動提交,都需要把所屬的消費組+消費的某個主題+消費的某個分區及消費的偏移量,這樣的信息提交到集群的_consumer_offsets主題里面。

2)自動提交

消費者poll消息下來以后就會自動提交offset

// 是否自動提交offset,默認就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自動提交offset的間隔時間
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

注意自動提交會丟消息。因為消費者在消費前提交offset,有可能提交完后還沒消費時消費者掛了。于是下?個消費者會從已提交的offset的下一個位置開始消費消息。之前未被消費的消息就丟失掉了。

3)手動提交

需要把自動提交的配置改成false

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

手動提交又分成了兩種

  • 手動同步提交

在消費完消息后調用同步提交的方法,當集群返回ack前?直阻塞,返回ack后表示提交成功,執行之后的邏輯

            while (true) {/** poll()API 是拉取消息的?輪詢*/ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {// 操作消息System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value());}// 所有的消息已消費完if (records.count() > 0) {// 有消息// ?動同步提交offset,當前線程會阻塞直到offset提交成功// 【?般使?同步提交】,因為提交之后?般也沒有什么邏輯代碼了consumer.commitSync();// =======阻塞=== 提交成功}}
  • 手動異步提交

在消息消費完后提交,不需要等到集群ack,直接執行之后的邏輯,可以設置?個回調方法,供集群調用

            while (true) {/** poll()API 是拉取消息的?輪詢*/ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {// 操作消息System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value());}// 所有的消息已消費完if (records.count() > 0) {// 有消息// ?動異步提交offset,當前線程提交offset不會阻塞,可以繼續處理后?的程序邏輯consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition,OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.println("Commit failed exception: " + exception.getMessage());}}});}}

3.長輪詢poll消息(消費者拉取消息)

  • 消費者建立了與broker之間的長連接,開始poll消息

  • 默認情況下,消費者一次會poll500條消息

// ?次poll最?拉取消息的條數,可以根據消費速度的快慢來設置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
  • 代碼中設置了長輪詢的時間是1000毫秒
            while (true) {/** poll()API 是拉取消息的?輪詢*/ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value());}}
  • 意味著:
    • 如果?次poll到500條,就直接執行for循環
    • 如果這?次沒有poll到500條。且時間在1秒內,那么長輪詢繼續poll,要么到500條,要么到1s,執行后續for循環
    • 如果多次poll都沒達到500條,且1秒時間到了,那么直接執行for循環
    • 如果兩次poll的間隔超過30s(poll時間短但是消費時間長,消費者消費可能會達到30s左右),集群會認為該消費者的消費能力過 弱,該消費者被踢出消費組,觸發rebalance機制,rebalance機制會造成性能開銷

可以通過設置參數, 讓?次poll的消息條數少?點,避免觸發rebalance損耗性能

 // ?次poll最?拉取消息的條數,可以根據消費速度的快慢來設置props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 如果兩次poll的時間如果超出了30s的時間間隔,kafka會認為其消費能?過弱,將其踢出消費組。將分區分配給其他消費者。-rebalanceprops.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

4.消費者的健康狀態檢查

消費者每隔1s向kafka集群發送心跳,集群發現如果有超過10s沒有續約的消費者,將被踢出消費組,觸發該消費組的rebalance機制,將該分區交給消費組里的其他消費者進行消費。

// consumer給broker發送心跳的間隔時間  1s一次
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
// kafka如果超過10秒沒有收到消費者的心跳,則會把消費者踢出消費組,進?rebalance,把分區分配給其他消費者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

5.指定分區和偏移量、時間消費

  • 指定分區消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
  • 從頭消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
  • 指定offset消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
  • 指定時間消費

根據時間,去所有的partition中確定該時間對應的offset,然后去所有的partition中找到該offset之后的消息開始消費。

// topic對應所有分區
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
// 從1小時前開始消費
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;long offset = value.offset();System.out.println("partition-" + key.partition() +"|offset-" + offset);System.out.println();//根據消費?的timestamp確定offsetconsumer.assign(Arrays.asList(key));consumer.seek(key, offset);
}

6.新消費組的消費offset規則

新消費組中的消費者在啟動以后,默認會從當前分區的最后?條消息的offset+1開始消費(消費新消息)。可以通過以下的設置,讓新的消費者第?次從頭開始消費。之后開始消費新消息(最后消費的位置的偏移量+1)

  • Latest:默認的,消費新消息

  • earliest:第?次從頭開始消費。之后開始消費新消息(最后消費的位置的偏移量+1),這個需要區別于consumer.seekToBeginning(每次都從頭開始消費)

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

SpringBoot集成kafka

1.引入依賴

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2.配置文件

server:port: 8080
spring:kafka:bootstrap-servers: 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094producer: # 生產者retries: 3 # 設置大于0的值,則客戶端會將發送失敗的記錄重新發送batch-size: 16384 # 每次拉取多少數據發送broker buffer-memory: 33554432 # 本地緩沖區大小acks: 1# 指定消息key和消息體的編解碼?式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500listener:# 當每?條記錄被消費者監聽器(ListenerConsumer)處理之后提交# RECORD# 當每?批poll()的數據被消費者監聽器(ListenerConsumer)處理之后提交# BATCH# 當每?批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,距離上次提交時間大于TIME時提交# TIME# 當每?批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,被處理record數量大于等于COUNT時提交# COUNT# TIME | COUNT 有?個條件滿足時提交# COUNT_TIME# 當每?批poll()的數據被消費者監聽器(ListenerConsumer)處理之后, 手動調用Acknowledgment.acknowledge()后提交# MANUAL# 【手動調用Acknowledgment.acknowledge()后立即提交,?般使用這種】# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATE

3.消息生產者

發送消息到指定topic

image-20230815110653415

4.消息消費者

設置消費組,消費指定topic

@Component
public class MyConsumer {@KafkaListener(topics = "my-replicated-topic", groupId = "MyGroup1")public void listenGroup(ConsumerRecord<String, String> record,Acknowledgment ack) {String value = record.value();System.out.println(record);System.out.println(value);//?動提交offsetack.acknowledge();}
}

5.消費者中配置消費主題、分區和偏移量

設置消費組、多topic、指定分區、指定偏移量消費及設置消費者個數

    @KafkaListener(groupId = "testGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}),@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))}, concurrency = "3")// concurrency:同消費組中消費者個數,就是并發消費數,建議小于等于分區總數public void listenGroupPro(ConsumerRecord<String, String> record,Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);//?動提交offsetack.acknowledge();}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/43041.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/43041.shtml
英文地址,請注明出處:http://en.pswp.cn/news/43041.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

SAP 預留(Reservation)詳解

相信使用SAP的話&#xff0c;對SAP的SAP預留(Reservation)已經有所了解&#xff0c;而且經常使用作業的&#xff0c;并且在很多方案中都使用得到&#xff0c;下面我就簡單總結下我所了解的SAP預留(Reservation)&#xff0c;希望對一些需要幫助的人&#xff0c;有所指點。 首先…

分類預測 | MATLAB實現DRN深度殘差網絡多輸入分類預測

分類預測 | MATLAB實現DRN深度殘差網絡多輸入分類預測 目錄 分類預測 | MATLAB實現DRN深度殘差網絡多輸入分類預測預測效果基本介紹程序設計參考資料 預測效果 基本介紹 1.分類預測 | MATLAB實現DRN深度殘差網絡多輸入分類預測 2.代碼說明&#xff1a;MATLAB實現DRN深度殘差網絡…

LVS集群和nginx負載均衡

目錄 1、基于 CentOS 7 構建 LVS-DR 群集。 2、配置nginx負載均衡。 1、基于 CentOS 7 構建 LVS-DR 群集。 1.部署LVS負載調度器 1>安裝配置工具 [rootnode6 ~]# yum install -y ipvsadm 2>配置LVS虛擬IP&#xff08;VIP地址&#xff09; [rootnode6 ~]# ifconfig ens…

32.Netty源碼之服務端如何處理客戶端新建連接

highlight: arduino-light 服務端如何處理客戶端新建連接 Netty 服務端完全啟動后&#xff0c;就可以對外工作了。接下來 Netty 服務端是如何處理客戶端新建連接的呢&#xff1f; 主要分為四步&#xff1a; md Boss NioEventLoop 線程輪詢客戶端新連接 OP_ACCEPT 事件&#xff…

VB+SQL自動點歌系統設計與實現

摘 要 隨著社會的發展,人類的進步,21世紀人們的生活的水平有所提高,為了滿足人們對生活的需要,豐富業余生活,就需要有一些娛樂的設施來彌補這些空缺,所以開發了自動點歌系統。 論文詳細論述了系統總體設計思想、數據庫設計以及功能模塊設計等,給出了自動點歌系統一般流程…

算法與數據結構(七)--堆

一.堆 1.堆的定義 堆是計算機科學中一類特殊的數據結構的通常&#xff0c;堆通常可以被看做是一顆完全二叉樹的數組對象。 堆的特性 1.它是完全二叉樹&#xff0c;除了樹的最后一層結點不需要是滿的&#xff0c;其他的每一層從左到右都是滿的&#xff0c;如果最后一層結點不…

管理類聯考——邏輯——真題篇——按知識分類——匯總篇——二、論證邏輯——支持加強——第三節——分類3——類比題干支持

文章目錄 第三節 支持加強-分類3-類比題干支持真題(2017-28)-支持加強-正面支持-表達“確實如此”真題(2017-36)-支持加強-正面支持-表達“確實如此”真題(2017-39)-支持加強-正面支持-方法有效或方法可行,但多半不選擇方法無惡果真題(2017-50)-支持加強真題(2018-2…

linux 文件權限識別及其修改

一、文件權限認識 在 Linux 系統中&#xff0c;一切皆文件&#xff0c;目錄也是一種文件形式叫目錄文件&#xff0c;它們的屬性主要包含&#xff1a;索引節點(inode)&#xff0c;類型、權限屬性、鏈接數、所歸屬的用戶和用戶組、最近修改時間等內容。 如下為根目錄下目錄&…

華創云鼎面試:java后端開發

華創云鼎面試: 1、項目:項目業務介紹、項目人員組成 2、分布式鎖用過哪些 基于數據庫的鎖&#xff1a;可以使用關系型數據庫的事務和行級鎖來實現分布式鎖。通過在數據庫中創建一個標志位或特定的鎖表來表示資源的鎖定狀態&#xff0c;其他進程在訪問該資源之前需要先獲取該鎖…

尋路算法(Java 實例代碼源碼包下載)

目錄 尋路算法 Java 實例代碼 src/runoob/graph/Path.java 文件代碼&#xff1a; 尋路算法 圖的尋路算法也可以通過深度優先遍歷 dfs 實現&#xff0c;尋找圖 graph 從起始 s 點到其他點的路徑&#xff0c;在上一小節的實現類中添加全局變量 from數組記錄路徑&#xff0c;fr…

8月18日,每日信息差

1、中國空間站收獲階段性應用成果。?截至目前&#xff0c;空間站已安排在軌實施了110個空間科學研究與應用項目&#xff0c;涉及空間生命科學與人體研究、微重力物理和空間新技術領域&#xff0c;獲得原始實驗數據近100TB&#xff0c;下行了近300個實驗樣品&#xff0c;部分項…

改進YOLO系列:3.添加SOCA注意力機制

添加SOCA注意力機制 1. SOCA注意力機制論文2. SOCA注意力機制原理3. SOCA注意力機制的配置3.1common.py配置3.2yolo.py配置3.3yaml文件配置1. SOCA注意力機制論文 暫未找到 2. SOCA注意力機制原理 3. SOCA注意力機制的配置 3.1common.py配置 ./models/common.p…

Linux 網絡發包流程

哈嘍大家好&#xff0c;我是咸魚 之前咸魚在《Linux 網絡收包流程》一文中介紹了 Linux 是如何實現網絡接收數據包的 簡單回顧一下&#xff1a; 數據到達網卡之后&#xff0c;網卡通過 DMA 將數據放到內存分配好的一塊 ring buffer 中&#xff0c;然后觸發硬中斷CPU 收到硬中…

Lnton羚通關于Optimization在【PyTorch】中的基礎知識

OPTIMIZING MODEL PARAMETERS &#xff08;模型參數優化&#xff09; 現在我們有了模型和數據&#xff0c;是時候通過優化數據上的參數來訓練了&#xff0c;驗證和測試我們的模型。訓練一個模型是一個迭代的過程&#xff0c;在每次迭代中&#xff0c;模型會對輸出進行猜測&…

python3 0基礎學習----數據結構(基礎+練習)

python 0基礎學習筆記之數據結構 &#x1f4da; 幾種常見數據結構列表 &#xff08;List&#xff09;1. 定義2. 實例&#xff1a;3. 列表中常用方法.append(要添加內容) 向列表末尾添加數據.extend(列表) 將可迭代對象逐個添加到列表中.insert(索引&#xff0c;插入內容) 向指定…

8.17校招 內推 面經

綠泡泡&#xff1a; neituijunsir 交流裙&#xff0c;內推/實習/校招匯總表格 1、校招 | 騰訊2024校園招聘全面啟動(內推) 校招 | 騰訊2024校園招聘全面啟動(內推) 2、校招 | 大華股份2024屆全球校園招聘正式啟動(內推) 校招 | 大華股份2024屆全球校園招聘正式啟動(內推) …

國家一帶一路和萬眾創業創新的方針政策指引下,Live Market探索跨境產業的創新發展

現代社會&#xff0c;全球經濟互聯互通&#xff0c;跨境產業也因此而崛起。為了推動跨境產業的創新發展&#xff0c;中國政府提出了“一帶一路”和“萬眾創業、萬眾創新”的方針政策&#xff0c;旨在促進全球經濟的互聯互通和創新發展。在這個大環境下&#xff0c;Live Market積…

Mariadb高可用MHA

本節主要學習了Mariadb高可用MHA的概述&#xff0c;案例如何構建MHA 提示&#xff1a;以下是本篇文章正文內容&#xff0c;下面案例可供參考 一、概述 1、概念 MHA&#xff08;MasterHigh Availability&#xff09;是一套優秀的MySQL高可用環境下故障切換和主從復制的軟件。…

合宙Air724UG LuatOS-Air LVGL API--簡介

為何是 LVGL LVGL 是一個開源的圖形庫&#xff0c;它提供了創建嵌入式 GUI 所需的一切&#xff0c;具有易于使用的圖形元素、漂亮的視覺效果和低內存占用的特點。 LVGL特點&#xff1a; 強大的 控件 &#xff1a;按鈕、圖表、列表、滑動條、圖像等 高級圖形引擎&#xff1a;動…

BIO、NIO和AIO

一.引言 何為IO 涉及計算機核心(CPU和內存)與其他設備間數據遷移的過程&#xff0c;就是I/O。數據輸入到計算機內存的過程即輸入&#xff0c;反之輸出到外部存儲&#xff08;比如數據庫&#xff0c;文件&#xff0c;遠程主機&#xff09;的過程即輸出。 I/O 描述了計算機系統…