kafka快速入門與知識匯總

? kafka快速入門與知識匯總

一、前言

kafka是一款消息中間件,可以用于傳輸消息和日志收集、監控項目狀況。與其類似的技術棧有rocketmq、rabbitmq等,但這些技術棧大多應用在一些簡單的消息傳輸平臺,而kafka則因其對大量數據的高性能處理在大數據領域受到青睞。

二、為什么要有消息中間件?

1、應用解耦。想象這樣一個場景,有一個服務依賴于另外一個服務,即某個服務需要將數據傳輸給另外一個服務進行處理數據,那么這個服務就必須要等待另一服務處理完數據。如果使用消息中間件,我們可以將數據發送到kafka中,其他服務監聽kafka相關服務,接收消息進行處理,這樣,服務間就不需要互相等待。

2、異步提速。我們在進行上網購物時,需要進行一系列操作,如下訂單–>預減庫存–>支付–>短信發送–>訂單狀態。如果每一步都需要1s時間,那么這一操作就需要花費至少5s時間,這對于用戶來說是不能忍受的。使用消息中間件后,我們只需要下訂單后即可去做其他事情,之間操作異步處理,處理完成返回處理結果即可,響應就快多了

3、削峰填谷。當有大量請求發送到服務器時,如果直接將這些請求交由數據庫處理,會對數據庫造成很大的性能瓶頸。所以可以先將請求發送到kafka中,消費者根據自身處理請求的能力進行消費即可。提高了系統的可用性。

三、架構設計

名詞解釋:

事件:可以理解為發送的一條消息,這條消息包含一些元數據,指明了其發送的地點以及消息內容。

生產者:用于將消息發送出去

消費者:消費消息

主題:可以理解為消息的類型

Broker:kafka協調者,用于管理消息隊列等。

下面用一張圖帶你看看kafka的架構:

在這里插入圖片描述

生產者producer像broker集群發送消息,在消息到達之前,首先會對消息進行序列化(要在網絡間傳輸),進行分區選擇(確認該消息最終發送到哪個主題的哪個分區下),如果實現了攔截器,則會攔截消息處理后在發送到緩沖區,待消息數量到達一定數量或者時間到了將該批數據發送到 對應的partition分區(只會發送到leader分區,follower分區同步leader分區數據,用于leader分區所在broker宕機后不影響后續操作,這是kafka高可用的一個保證),其中broker集群信息由zookerpeer集群統一管理。每個消費者都屬于一個消費組,消費分區中的消息。

四、基本的命令行操作

將kafka壓縮文件解壓后,我們需要修改一些屬性,在kafka/config包下找到server.properties文件

按需修改以下內容

#broker的全局唯一編號,不能重復
broker.id=0
#刪除topic功能,當前版本此配置默認為true
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的線程數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區大小
socket.request.max.bytes=104857600
#topic在當前broker上的分區個數
num.partitions=1
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時7*24h將被刪除,單位小時
log.retention.hours=168
#每個segment文件的大小,默認最大 1G
log.segment.bytes=1073741824
# 檢查過期數據的時間,默認5分鐘檢查一次是否數據過期,單位ms
log.retention.check.interval.ms=300000
#配置連接zk集群地址
zookeeper.connect=node2:2181,node3:2181,node4:2181/kafka

啟動kafka服務

kafka-server-start.sh -daemon /你的kafka路勁/config/server.properties

關閉服務

kafka-server-stop.sh

1、操作topic

–bootstrap-servernode3:9092連接的 Kafka Broker 主機名稱和端口號
–topic<String: topic> 比如:topicA操作的 topic 名稱
–list查看所有主題
–create創建主題
–delete刪除主題
–alter修改主題
–describe查看主題詳細描述
–partitions<Integer: # of partitions>設置分區數
–replication-factor<Integer: replication factor>設置分區副本
–config<String: name=value>更新系統默認的配置
–version查看當前系統kafka的版本
–help輸出幫助信息

示例:

kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topicA --partition 1 --replication-factor 2

2、發送消息

參數描述
–bootstrap-serverlocalhost:9092連接的 Kafka Broker 主機名稱和端口號
–topictopicA操作的 topic 名稱
–batch-size<Integer: size>生成多少條提交一次
–version查詢kafka版本
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topicA

3、消費消息

參數描述
–bootstrap-serverlocalhost:9092連接的 Kafka Broker 主機名稱和端口號
–topictopicA操作的 topic 名稱
–from-beginning從頭開始消費數據
–group指定消費者的消費組id
–version查詢kafka版本
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicA

五、生產者

在學習生產者發送消息之前,我們先來了解一下其發送消息的原理。

一條消息通常需要指定其 消息主題(topic)、分區(可選)、key(可選)、value(消息正文)。

消息被包裝完成后,調用生產者的send()方法,此時會檢查是否實現了攔截器(多個則鏈式處理),此時可以攔截消息做一些處理,比如說添加唯一業務編號(防止重復消費),接著經過序列化器,將key和value進行序列化,再經過分區器,選擇發送的分區,經過這一系列操作,消息最終被發送到一個緩沖區中,此時消息不會立即發送到對應的分區,而是會等待消息數量達到設置的值或者時間到了,會將該批數據一起發送給partitioner(提高效率)。消息發送到對應的分區后,會觸發相應的確認應答機制:

如果ack:0 此時消息還沒被leader分區接收就回復發送成功信息,消息不可靠

如果ack:1 此時leader分區接收到消息后應答,但是follower分區還未同步數據

ack:-1或者all 可靠性最強,需要等待leader及其所有follower確認后應答

1、創建maven項目,導入依賴

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.1</version></dependency>
</dependencies>

2、發送同步消息 producer.send(record).get()


// 同步自定義生產者
public class SyncCustomerProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 創建配置對象Properties props = new Properties();// 添加配置屬性props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092"); // kafka集群地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // key序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化// 創建生產者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for(int i=0;i<10;i++) {// 創建消息對象ProducerRecord<String, String> record= new ProducerRecord<String, String>("topicA","message_"+i);// 發送同步消息producer.send(record).get();}// 關閉生產者producer.close();}
}

3、發送異步消息 producer.send(record)


// 異步自定義生產者
public class ASyncCustomerProducer {public static void main(String[] args) {Properties prop = new Properties();prop.put("bootstrap.servers","192.168.66.99:9092");prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(prop);for(int i=0;i<5;i++) {producer.send(new ProducerRecord<String, String>("topicA","message_"+i));}producer.close();}}

4、異步發送后回調結果


// 異步回調自定義生產者
public class ASyncCallBackCustomerProducer {public static void main(String[] args) {Properties prop = new Properties();prop.put("bootstrap.servers","192.168.66.99:9092");prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(prop);for(int i=0;i<5;i++) {producer.send(new ProducerRecord<String, String>("topicA","message_"+i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception!=null) { exception.printStackTrace();}else {  //異常為空說明發送成功System.out.println("分區:"+metadata.partition()+" 偏移量:"+metadata.offset());}}});}producer.close();}}

5、攔截消息

5.1 自定義攔截器,需要實現ProducerInterceptor接口下的方法


// 消息攔截器
public class MyProducerInterceptor implements ProducerInterceptor {private Integer succectNum = 0;private Integer failNum = 0;//做一些初始化的工作。@Overridepublic void configure(Map<String, ?> map) {}/*它運行在用戶的main線程中,producer確保在消息被序列化以計算分區前調用該方法。用戶可以在該方法中對消息做任何操作,但最好不要修改消息所屬的topic和分區,否則會影響目標分區的計算。*/@Overridepublic ProducerRecord onSend(ProducerRecord producerRecord) {return new ProducerRecord(producerRecord.topic(),producerRecord.partition(),producerRecord.timestamp(),producerRecord.key(),"攔截器處理后的消息:"+producerRecord.value());}/*該方法會在消息被應答之前或消息發送失敗時調用,并且通常都是在回調邏輯觸發之前。該方法運行在producer的I/O線程中,因此不要在該方法中放入很“重”的邏輯,否則會拖慢producer的消息發送效率。*/@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(e == null){succectNum++;}else{failNum++;}}//主要用于執行一些資源清理的工作。@Overridepublic void close() {System.out.println("succectNum:"+succectNum);System.out.println("failNum:"+failNum);}}

5.2 應用自定義攔截器,發送消息


// 實現攔截器
public class SyncCustomerProducerInterceptor {public static void main(String[] args) throws ExecutionException, InterruptedException {// 創建配置對象Properties props = new Properties();// 添加配置屬性props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092"); // kafka集群地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // key序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化// 添加攔截器鏈List<String> interceptors = new ArrayList<>();interceptors.add(MyProducerInterceptor.class.getName());// 添加其他的攔截器構成攔截器鏈。。。props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);// 創建生產者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for(int i=0;i<10;i++) {// 創建消息對象ProducerRecord<String, String> record= new ProducerRecord<String, String>("topicA","message_"+i);// 發送同步消息producer.send(record).get();}// 關閉生產者producer.close();}
}

6、消息序列化

kafka已經為我們提供了一些內置的序列化類,我們可以直接使用,如果不能滿足需要我們也可以自定義序列化。

6.1 創建消息體對象(即需要被序列化的主體


public class UserVo {private String name;private Integer age;private String address;@Overridepublic String toString() {return "UserVo{" +"name='" + name + '\'' +", age=" + age +", address='" + address + '\'' +'}';}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}
}

6.2自定義序列化器


// 自定義序列化器
public class UserSerializer implements Serializer {private ObjectMapper objectMapper;@Overridepublic void configure(Map configs, boolean isKey) {objectMapper = new ObjectMapper();Serializer.super.configure(configs, isKey);}@Overridepublic byte[] serialize(String s, Object o) {byte[] bytes = null;try {bytes = objectMapper.writeValueAsBytes(o);} catch (Exception e) {e.printStackTrace();}return bytes;}@Overridepublic void close() {objectMapper = null;Serializer.super.close();}
}

6.3 使用自定義的序列化對value進行序列化


// 自定義序列化生產者
public class MySerializerProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 創建配置對象Properties props = new Properties();// 添加配置屬性props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092"); // kafka集群地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // key序列化// 使用自定義的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName()); // value序列化// 創建生產者KafkaProducer<String, UserVo> producer = new KafkaProducer<String, UserVo>(props);UserVo userVo = new UserVo();userVo.setName("張三");userVo.setAge(18);userVo.setAddress("北京");// 創建消息對象ProducerRecord<String, UserVo> record= new ProducerRecord<String, UserVo>("topicA",userVo);// 發送同步消息producer.send(record).get();// 關閉生產者producer.close();}}

7、分區

7.1 分區策略:查看源碼可以發現kafka再選擇分區時采用以下規則,如果指定了分區號,則一定選擇該分區,如果沒有分區號,則對key進行hash計算后對分區數取模,如果key也不存在的話,則采用輪詢的方式依次發送到各分區(也可以采用隨機法)。

7.2 發送數據到指定的分區(可以實現消息的有序性)


// 發送給指定的分區
public class PartitionProducer {public static void main(String[] args) {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(prop);// 1、情況1:手動指定分區ProducerRecord<String, String> record= new ProducerRecord<>("topicA", 0, null, "Hello World");// 2、情況2:沒指定分區時,當key存在,會根據key的hash值計算分區
//        ProducerRecord<String, String> record
//                = new ProducerRecord<>("topicA", "key", "Hello World");// 3、情況3:沒指定分區時,當key不存在,會輪詢發送到每個分區
//        ProducerRecord<String, String> record
//                = new ProducerRecord<>("topicA",  "Hello World");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {e.printStackTrace();} else {System.out.println(recordMetadata.topic() + " " + recordMetadata.partition() + " " + recordMetadata.offset());}}});producer.close();}
}

7.3自定義分區規則


// 自定義分區器
public class MyPartitioner implements Partitioner {// 初始化工作,用于分配和創建資源@Overridepublic void configure(Map<String, ?> map) {}/** 計算信息對應的分區* @param topic   主題* @param key    消息的key* @param keyBytes  消息的key序列化后的字節數組* @param value   消息的value* @param valueBytes 消息value序列化后的字節數組* @param cluster  集群元數據 可以獲取分區信息* @return  息對應的分區號*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 根據你的業務邏輯進行分區。。。// 這里使用key的長度作為分區號int len = key.toString().length();// 分區數量int partitionNum = cluster.partitionCountForTopic(topic);return len % partitionNum;}@Overridepublic void close() {}}

// 自定義分區規則
public class MyPartitionerProducer {public static void main(String[] args) {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 指定分區規則(如果設置了分區,就不會使用自定義規則)prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.itbaizhan.kafka.producer.partition.MyPartitioner");KafkaProducer<String, String> producer = new KafkaProducer<>(prop);ProducerRecord<String, String> record1 = new ProducerRecord<>("topicA",1, "kkk", "我是長度為一的key");producer.send(record1, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.out.println("發送失敗");} else {System.out.println(metadata.topic() + " -----  " + metadata.partition());}}});producer.close();}}

8、消息去重(冪等性)

思考這樣一個場景,你再進行點外賣時,由于學校的網絡實在是一言難盡,導致你一連發送了幾個支付請求,如果不對該請求進行冪等性處理,你就要花冤枉錢了。。。

于是,我們就希望實現對于用戶發送過來的某些請求,只處理一次,重復的就不再進行處理了。

再kafka中設計了一個 <PID,Partition,SeqNumber>的主鍵,當主鍵相同時認為是同一條數據,不會進行處理,其中PID是每次kafka重啟后都會分配一個新的,Pratition為分區號,SeqNumber為自增的數。所以kafka只能保證單分區單會話消息不重復。

為了實現真正的去重,我們可以使用kafka事務特性


// 事務生產者
public class TransactionProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 創建配置對象Properties props = new Properties();// 添加配置屬性props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092"); // kafka集群地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // key序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化//唯一事務idprops.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id_1");// 創建生產者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);// 1、初始話事務producer.initTransactions();// 2、開啟事務producer.beginTransaction();// 3、發送消息,成功則提交,失敗回滾(放棄try{for(int i=0;i<10;i++) {// 創建消息對象ProducerRecord<String, String> record= new ProducerRecord<String, String>("topicA","message_"+i);// 發送同步消息producer.send(record).get();}// 提交事務producer.commitTransaction();}catch (Exception e){// 回滾事務producer.abortTransaction();// 拋出異常throw e;}// 關閉生產者producer.close();}}

六、消費組

消費規則:一個分區的數據可以被不同的消費組中的消費者消費,同一個消費組的不能消費同一個分區的數據。

consumer采用pull(拉)模式從broker中讀取數據。pull模式則可以根據consumer的消費能力以適當的速率消費消息。

pull模式不足之處是,如果kafka沒有數據,消費者可能會陷入循環中,一直返回空數據。針對這一點,Kafka的消費者在消費數據時會傳入一個時長參數timeout,如果當前沒有數據可供消費,consumer會等待一段時間之后再返回,這段時長即為timeout。

1、消費某主題下的數據


// 消費指定主題下的消息
public class TopicConsumer {public static void main(String[] args) {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); //設置消費組// 創建消費者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);// 訂閱主題List<String> topics = new ArrayList<>();topics.add("topicA");consumer.subscribe(topics);// 循環監聽主題while (true) {// 每一秒拉取一批數據ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.println(record.partition()+" "+ record.value());}}}}

2、指定分區消費


// 消費指定主題下指定分區的消息
public class TopicPartitionConsumer {public static void main(String[] args) {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); //設置消費組// 創建消費者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);// 訂閱主題分區List<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("topicA", 0));consumer.assign(topicPartitions);// 循環監聽主題while (true) {// 每一秒拉取一批數據ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.println(record.partition()+" "+ record.value());}}}}

3、在broker中有一個名為__consumer_offsets的主題,這里面記錄著每個消費者消費的偏移量,以確保消息被正確無遺漏的消費

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/909187.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/909187.shtml
英文地址,請注明出處:http://en.pswp.cn/news/909187.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

設計模式——觀察者設計模式(行為型)

摘要 本文詳細介紹了觀察者設計模式&#xff0c;包括其定義、結構、實現方式、適用場景以及實戰示例。通過代碼示例展示了如何在Spring框架下實現觀察者模式&#xff0c;以及如何通過該模式實現狀態變化通知。同時&#xff0c;對比了觀察者模式與消息中間件在設計理念、耦合程…

uniapp 頁面棧一定深度后,回首頁導航到新頁面的解決方案

uniapp 頁面棧一定深度后&#xff0c;回首頁導航到新頁面的解決方案 uniapp 頁面導航解決方案 在 uniapp 中&#xff0c;要實現先彈出頁面棧回到首頁&#xff0c;然后再跳轉到指定頁面。 /*** description 后臺選擇鏈接專用跳轉*/ interface Link {path: string;name?: stri…

數據結構 散列表 學習 2025年6月12日15:30:48

數據結構 散列表 哈希表(Hash Table): 通過哈希函數將鍵&#xff08;key&#xff09;映射到存儲位置&#xff0c;從而實現快速的插入、刪除和查找操作。 哈希表是現代編程中最重要的數據結構之一&#xff0c;幾乎所有編程語言都提供了內置實現。 計數 #include <stdio.h&g…

數據結構之LinkedList

系列文章目錄 數據結構之ArrayList-CSDN博客 目錄 系列文章目錄 前言 一、模擬實現鏈表 1. 遍歷鏈表 2. 插入節點 3. 刪除節點 4. 清空鏈表 二、鏈表的常見操作 1. 反轉鏈表 2. 返回鏈表的中間節點 3. 鏈表倒數第 k 個節點 4. 合并兩個有序鏈表 5. 分割鏈表 6. 判…

DC3靶機滲透

1. 靶機介紹 主要的內容有 sql 注入漏洞、joomla 框架漏洞、ssh 攻擊、shell 反彈、提權 信息收集(ip、端口、目錄、指紋信息)--->利用漏洞--->反彈---->提權 2. 信息收集 2.1. 掃描存活 ip 192.168.220.134 2.2. 端口掃描 nmap -T4 -A -p- 192.168.220.134 …

C# 線程交互

一、為什么要進行線程交互 在C#中&#xff0c;線程交互通常涉及到多個線程之間的數據共享和同步。?. 一、全局變量 在C#中&#xff0c;全局變量是指在程序的任何地方都可以訪問的變量。通常&#xff0c;全局變量是在類的外部定義的&#xff0c;或者在所有方法之外定義的。全…

Cursor 編輯器中的 Notepad 功能使用指南

Cursor 編輯器中的 Notepad 功能使用指南 摘要 本指南全面介紹了 Cursor 編輯器中的 Notepad 功能&#xff0c;涵蓋其用途、多種訪問方式、適用場景以及與其它功能的整合技巧等內容&#xff0c;助力用戶高效利用該功能提升工作流程效率。 不同訪問方式介紹 功能概述 Curso…

用于評估大語言模型(LLMs)能力的重要基準任務(Benchmark)

基準任務涵蓋了 多領域&#xff08;如語言理解、數學、推理、編程、醫學等&#xff09;和 多能力維度&#xff08;如事實檢索、計算、代碼生成、鏈式推理、多語言處理&#xff09;。常用于模型發布時的對比評測&#xff0c;例如 GPT-4、Claude、Gemini、Mistral 等模型的論文或…

力扣HOT100之技巧:169. 多數元素

這道題如果不考慮空間復雜度和時間復雜度的限制的話很好做&#xff0c;一種思路是通過一次遍歷將所有元素的數量記錄在一個哈希表中&#xff0c;然后我們直接返回出現次數最多的鍵即可。另一種思路是直接對數組進行排序&#xff0c;數組中間的值一定是多數元素&#xff0c;因為…

wordpress首頁調用指定ID頁面內的相冊

要在WordPress首頁調用ID為2的頁面中的相冊&#xff0c;你可以使用以下幾種方法&#xff1a; 方法一&#xff1a;使用短代碼和自定義查詢 首先&#xff0c;在你的主題的functions.php文件中添加以下代碼&#xff1a; function display_page_gallery($atts) {$atts shortcod…

基于深度學習的異常檢測系統:原理、實現與應用

前言 在現代數據驅動的業務環境中&#xff0c;異常檢測&#xff08;Anomaly Detection&#xff09;是一個關鍵任務&#xff0c;它能夠幫助企業和組織及時發現數據中的異常行為或事件&#xff0c;從而采取相應的措施。異常檢測廣泛應用于金融欺詐檢測、網絡安全、工業設備故障監…

Java基于BS架構的OA流程可視化實戰:從工作流引擎到前端交互(附完整源代碼+論文框架)

一、引言&#xff1a;BS架構OA系統的流程可視化需求 在企業信息化建設中&#xff0c;基于瀏覽器/服務器&#xff08;BS&#xff09;架構的OA系統通過流程自動化提升辦公效率&#xff0c;而流程可視化是實現流程監控、優化的核心模塊。本文基于Java技術棧&#xff0c;結合Activ…

JavaWeb-數據庫連接池

目錄 1.springboot默認Hikari(追光者)連接池 2.切換為Druid(德魯伊)連接池 1.springboot默認Hikari(追光者)連接池 2.切換為Druid(德魯伊)連接池 一般幾乎用不到&#xff0c;不需要切換 <!--Druid連接池--> <dependency><groupId>com.alibaba</groupId&…

c# 完成恩尼格瑪加密擴展

c# 完成恩尼格瑪加密擴展 恩尼格瑪擴展為可見字符恩尼格瑪的設備原始字符順序轉子的設置反射器的設置連接板的設置 初始數據的設置第一版 C# 代碼第二版 C# 代碼 總結 恩尼格瑪 在之前&#xff0c;我們使用 python 實現了一版恩尼格瑪的加密算法&#xff0c;但是這一版&#x…

【Redisson】鎖可重入原理

目錄 一、基本原理 二、源碼解析&#xff1a; &#xff08;2&#xff09;獲取鎖 &#xff08;1&#xff09;釋放鎖&#xff1a; 之前給大家介紹過redisson的分布式鎖&#xff0c;用redisson來實現比自己手搓簡單的分布式鎖有很多好處&#xff0c;因為這些可重入、可重試的邏…

BERT 模型微調與傳統機器學習的對比

BERT 微調與傳統機器學習的區別和聯系&#xff1a; 傳統機器學習流程 傳統機器學習處理文本分類通常包含以下步驟&#xff1a; 特征工程&#xff1a;手動設計特征&#xff08;如 TF-IDF、詞袋模型&#xff09;模型訓練&#xff1a;使用分類器&#xff08;如 SVM、隨機森林、邏…

(12)-Fiddler抓包-Fiddler設置IOS手機抓包

1.簡介 Fiddler不但能截獲各種瀏覽器發出的 HTTP 請求&#xff0c;也可以截獲各種智能手機發出的HTTP/ HTTPS 請求。 Fiddler 能捕獲Android 和 Windows Phone 等設備發出的 HTTP/HTTPS 請求。同理也可以截獲iOS設備發出的請求&#xff0c;比如 iPhone、iPad 和 MacBook 等蘋…

芯科科技Tech Talks技術培訓重磅回歸:賦能物聯網創新,共筑智能互聯未來

聚焦于Matter、藍牙、Wi-Fi、LPWAN、AI/ML五大熱門無線協議與技術 為年度盛會Works With大會賦能先行 隨著物聯網&#xff08;IoT&#xff09;和人工智能&#xff08;AI&#xff09;技術的飛速發展&#xff0c;越來越多的企業和個人開發者都非常關注最新的無線連接技術和應用…

docker-compose容器單機編排

docker-compose容器單機編排 開篇前言 隨著網站架構的升級&#xff0c;容器的使用也越來越頻繁&#xff0c;應用服務和容器之間的關系也越發的復雜。 這個就要求研發人員能更好的方法去管理數量較多的服務器&#xff0c;而不能手動挨個管理。 例如一個LNMP 架構&#xff0c;就…

LeetCode--29.兩數相除

解題思路&#xff1a; 1.獲取信息&#xff1a; 給定兩個整數&#xff0c;一個除數&#xff0c;一個被除數&#xff0c;要求返回商&#xff08;商取整數&#xff09; 限制條件&#xff1a;&#xff08;1&#xff09;不能使用乘法&#xff0c;除法和取余運算 &#xff08;2&#…