Kafka代碼模板

Kafka 服務器(Broker) 的配置

server.properties

# broker.id: 每個 Kafka Broker 的唯一標識符。broker.id 必須在整個 Kafka 集群中唯一。
broker.id=0# 配置 Kafka Broker 監聽客戶端請求的地址和端口。這個配置決定了 Kafka 服務將接受來自生產者、消費者以及其他客戶端的連接。
listeners=PLAINTEXT://192.168.65.60:9092# Kafka 消息日志文件的存儲目錄
log.dir=/usr/local/data/kafka‐logs#  Kafka 連接到 Zookeeper 的地址
zookeeper.connect=192.168.65.60:2181

每個 Kafka 集群中的節點(Broker)都需要有一個 server.properties 配置文件,并且每個節點的配置可以有所不同。

生產者

生產者配置

Properties props = new Properties();// Kafka服務器地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// 把發送的key和value從字符串序列化為字節數組
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); /* * 1. 發出消息持久化機制參數* acks=0: 表示producer不需要等待任何broker確認收到消息的回復,就可以繼續發送下一條消息。性能最高,但是最容易丟消息* acks=1: 至少要等待leader已經成功將數據寫入本地log,但是不需要等待所有follower是否成功寫入,就可以繼續發送下一條消息*          如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失* acks=‐1或all: 需要等待 min.insync.replicas(默認為1,推薦配置大于等于2) 這個參數配置的副本個數都成功寫入日志。這是最強的數據保證。一般金融級別才會使用這種配置*/
props.put(ProducerConfig.ACKS_CONFIG, "1");// 2. 重試相關
//2.1 發送失敗重試次數,重試能保證消息發送的可靠性,但是也可能造成消息重復發送,需要接收者做好消息接收的冪等性處理
props.put(ProducerConfig.RETRIES_CONFIG, 3);// 2.2 重試間隔設置,默認重試間隔100ms
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);/ * 3. 本地緩沖區和延遲發送相關* 在設置本地緩沖區/延遲發送后,消息會先發送到本地緩沖區,當達到批量發送消息的大 * 小時,本地線程會從緩沖區取數據(一個batch),批量發送到broker。同時,需要設置 * batch最大的延遲發送時間,如果一條消息在本地緩沖區中等待的時間達到設置的時間后 * batch沒滿,那么也必須把消息發送出去* /// 3.1 設置本地緩沖區大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// 3.2 設置batch大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);/* * 3.3 batch最大的延遲發送時間* 默認值是0:意思就是消息必須立即被發送,但這樣會影響性能* 一般設置10毫秒左右,就是說這個消息發送完后會進入本地的一個batch,如果10毫秒內,這個batch滿了16kb就會隨batch一起被發送出去* 如果10毫秒內,batch沒滿,那么也必須把消息發送出去,不能讓消息的發送延遲時間太長* *  消息 -> 本地緩沖區(32M)-> batch(16k)-> 發送(10ms batch不滿也發送)*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

生產者發送消息

// 創建 Kafka 生產者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 發送消息
String topic = "test-topic";  // 主題名稱
String key = "order1";  // 消息的 key
String value = "Order details: 123";  // 消息的內容// 創建消息記錄
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);try {// 發送消息,這里的lambda函數就是onCompletion()方法producer.send(record, (metadata, exception) -> {if (exception != null) {System.out.println("Error sending message: " + exception.getMessage());} else {System.out.println("Message sent successfully to topic " + metadata.topic() +" partition " + metadata.partition() + " with offset " + metadata.offset());}});} catch (Exception e) {e.printStackTrace();
} finally {// 關閉生產者producer.close();
}
// 指定發送分區
var producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0, key_json, value_json);// 也可以指定發送分區
var producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, key_json, value_json);// 等待消息發送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();// 異步回調方式發送消息
producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {// 處理異常}
});
// 關閉
producer.close();

此外,為了保證生產者的消息發送成功,可以通過添加回調函數的方式,在send成功后打印日志。

詳細內容參考:Kafka如何保證消息不丟失

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生產者成功發送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex -> logger.error("生產者發送消失敗,原因:{}", ex.getMessage()));

消費者

消費者配置

Properties properties = new Properties();// Kafka服務器地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// 把發送的key和value從字符串序列化為字節數組
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 消費分組名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);// 是否自動提交offset,默認就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 自動提交offset的間隔時間
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/* * 當消費主題的是一個新的消費組,或者指定offset的消費方式,offset不存在,那么應該如何消費* latest(默認) :只消費自己啟動之后發送到主題的消息* earliest:第一次從頭開始消費,以后按照消費offset記錄繼續消費,這個需要區別于 consumer.seekToBeginning(每次都從頭開始消費)*/
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// consumer給broker發送心跳的間隔時間,broker接收到心跳如果此時有rebalance發生會通過心跳響應將rebalance方案下發給consumer,這個時間可以稍微短一點
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);// 服務端broker多久感知不到一個consumer心跳就認為他故障了,會將其踢出消費組,對應的Partition也會被重新分配給其他consumer,默認是10秒
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);// 一次poll最大拉取消息的條數,如果消費者處理速度很快,可以設置大點,如果處理速度一般,可以設置小點
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 如果兩次poll操作間隔超過了這個時間,broker就會認為這個consumer處理能力太弱,會將其踢出消費組,將分區分配給別的consumer消費
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

消費者消費消息

// 創建 Kafka 消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 訂閱主題
consumer.subscribe(Collections.singletonList("test-topic"));// 消費指定分區,這段代碼指定了消費者從TOPIC_NAME的第一個分區(分區0)開始消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));/* 回溯消費(從頭消費 - seekToBeginning)* seekToBeginning()方法使消費者回溯到該分區的最初位置,意味著從頭開始消費該分    區的所有消息。* 這對于重新消費主題中的消息或重新同步時非常有用。* /
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));// 指定offset消費,即消費者將跳過之前的消息,從該offset開始消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);/* 從指定時間點開始消費 - 1小時前
* partitionsFor()方法獲取指定主題(TOPIC_NAME)的所有分區信息。
* fetchDataTime 是一個時間戳,表示1小時前的時間,new Date().getTime() - 1000 * 60 * 60 用來計算這個時間戳。
* map 用于存儲每個分區與其對應的時間戳(fetchDataTime)。這個時間戳將用于從Kafka中拉取時間戳較早的消息。
*/ 
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
long fetchDataTime = new Date().getTime() ‐ 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
}
// 消費消息
try {while (true) {consumer.poll(1000).forEach(record -> {// 可以修改為具體業務邏輯System.out.println("Consumed record with key: " + record.key() +", value: " + record.value() + ", from partition: " + record.partition());});}
} catch (Exception e) {e.printStackTrace();
} finally {// 關閉消費者consumer.close();
}

消費者提交offset

手動提交offset的意義

  1. 控制消費進度

手動提交offset能夠讓消費者在每個消息或消息批次消費后,明確地告訴Kafka“我已經消費到這個offset了”。這對于控制消息消費的精確性非常重要,尤其在需要精確控制消費位置的場景中。

  1. 避免消息丟失或重復消費

如果自動提交offset,可能會發生消費者在處理中出現異常(如程序崩潰),導致已消費的消息的offset提交失敗,導致消息丟失或重復消費。手動提交則可以在處理完消息并確保成功時再提交offset,避免這種問題。比如在金融交易、日志收集系統等場景中,需要確保消息的處理不會丟失,并且不會重復處理。

  1. 靈活的錯誤處理與恢復

通過手動提交offset,消費者可以在消費過程中靈活地處理錯誤。如果在消費某條消息時發生異常,消費者可以選擇不提交offset,這樣在消費者重啟或恢復時會重新消費該消息。它使得消費者在出錯時能更好地控制重試策略。

代碼實現

同步提交
consumer.commitSync();

當調用該方法時,消費者會將當前消費的偏移量提交到Kafka集群,并且當前線程會阻塞,直到該提交操作完成。

優勢:

  1. 阻塞:會等待offset提交成功,不會繼續執行后續代碼,直到提交完成。
  2. 可靠性:如果提交失敗,commitSync()會拋出異常,可以捕獲并進行處理,確保提交正確。

缺點:會導致性能問題,因為它會阻塞當前線程,直到提交完成。

異步提交
consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {// 處理異常}
});

回調函數:異步提交會接受一個OffsetCommitCallback回調接口作為參數,該接口的onComplete()方法會在提交操作完成時被調用。這個方法會接收到兩個參數:
offsets:包含提交的偏移量信息(TopicPartition和OffsetAndMetadata)。
ex:如果提交發生錯誤,該參數會包含異常信息。

優勢:

  1. 非阻塞:不會等待提交完成,允許程序繼續執行其他操作。
  2. 提高吞吐量:減少等待時間,尤其是在批量消費和提交的情況下,可以提高整體的吞吐量和性能。

缺點:可能會出現提交失敗的情況,回調函數中的異常處理需要做好,以確保異常得到及時處理。

Spring boot集成

1. 添加依賴

在pom.xml 中添加 Kafka 的相關依賴

<dependencies><!-- Spring Boot Starter for Apache Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter Web (optional if you need a web app) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Starter for Actuator (optional for monitoring) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- Spring Boot Starter Test (optional for testing) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. 配置文件

application.yml

spring:kafka:# Kafka broker 地址bootstrap‐servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer:retries: 3batch‐size: 16384buffer‐memory: 33554432acks: 1key‐serializer: org.apache.kafka.common.serialization.StringSerializervalue‐serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group‐id: default‐groupenable‐auto‐commit: falseauto‐offset‐reset: earliestkey‐deserializer: xxx.StringDeserializervalue‐deserializer: xxx.StringDeserializerlistener:ack‐mode: manual_immediate

注意:
ack‐mode
RECORD:當每一條記錄被消費者監聽器(ListenerConsumer)處理之后提交
BATCH:當每一批poll()的數據被消費者監聽器處理之后提交
TIME:當每一批poll()的數據被消費者監聽器處理之后,距離上次提交時間大于TIME時提交
COUNT:當每一批poll()的數據被消費者監聽器處理之后,被處理record數量大于等于COUNT時提交
TIME | COUNT:有一個條件滿足時提交
MANUAL:當每一批poll()的數據被消費者監聽器處理之后, 手動調用Acknowledgment.acknowledge()后提交
MANUAL_IMMEDIATE:手動調用Acknowledgment.acknowledge()后立即提交,一般使用這種(一次提交一條消息)

3. 啟動類

package com.example.kafka;import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.beans.factory.annotation.Autowired;@SpringBootApplication
public class KafkaApplication implements CommandLineRunner {@Autowiredprivate KafkaProducer kafkaProducer;public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 發送消息kafkaProducer.sendMessage("test-topic", "Hello, Kafka!");}
}

4. 生產者類

package com.example.kafka.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);System.out.println("Message sent: " + message);}
}

5. 消費者類

package com.example.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {@KafkaListener(topics = "test-topic", groupId = "test-group")public void consume(String message) {System.out.println("Consumed message: " + message);}@KafkaListener(topics = "test-topic",groupId = "test-group")public void consume1(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();ack.acknowledge();  //手動提交offset}// 配置多個topic,concurrency就是同組下的消費者個數,就是并發消費數,必須小于等于分區總數@KafkaListener(groupId = "testGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}), // 從topic1的分區0和1讀取消息@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) // 從topic2的分區0讀取消息,并設置分區1的初始偏移量為100}, concurrency = "6")public void listenToMultipleTopics(String message) {// 消費消息的邏輯System.out.println("Group: testGroup, Message: " + message);}
}

Kafka事務

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my‐transactional‐id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());// 初始化事務
producer.initTransactions();
try {// 開啟事務producer.beginTransaction();// 發到不同的主題的不同分區producer.send(/*...*/);// 提交事務producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {producer.close();
} catch (KafkaException e) {// 回滾事務producer.abortTransaction();
}
// 關閉
producer.close();

spring框架下Kafka事務

可以通過**@Transactional**實現

配置

可以通過在application.yml文件或KafkaConfig配置類中添加配置的方式,提供事務支持。

1. application.yml
spring:kafka:bootstrap-servers: localhost:9092  # Kafka 集群地址producer:acks: all  # 確保消息被所有副本確認transactional-id-prefix: tx-  # 事務前綴,Kafka 事務需要一個事務 ID 前綴consumer:group-id: test-group  # 消費者組 IDenable-auto-commit: false  # 手動提交 offsetlistener:ack-mode: manual  # 設置為手動提交確認
2. 配置類
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {// 設置 Kafka 生產者的事務管理器KafkaTransactionManager<String, String> transactionManager =new KafkaTransactionManager<>(producerFactory());KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());kafkaTemplate.setTransactionManager(transactionManager);return kafkaTemplate;}@Beanpublic DefaultKafkaProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-");  // 事務 IDreturn new DefaultKafkaProducerFactory<>(configProps);}
}

生產者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.Transactional;
import org.springframework.stereotype.Service;@Service
public class KafkaTransactionProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Transactionalpublic void sendTransactionalMessages() {try {// 發送事務消息kafkaTemplate.send("topic1", "key1", "message1");kafkaTemplate.send("topic2", "key2", "message2");// 你可以在此處加入其他業務邏輯,如果出現異常,會回滾事務if (someConditionFails()) {throw new RuntimeException("Simulating failure to trigger rollback");}// 如果沒有異常,事務提交,消息將被正常發送} catch (Exception e) {// 事務回滾System.out.println("Transaction failed, rolling back...");throw e;}}private boolean someConditionFails() {// 模擬某些條件下事務失敗return true;}
}

消費者

import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
@EnableKafka
public class KafkaTransactionConsumer {@KafkaListener(topics = "topic1", groupId = "test-group")public void listenTopic1(String message) {System.out.println("Received message from topic1: " + message);}@KafkaListener(topics = "topic2", groupId = "test-group")public void listenTopic2(String message) {System.out.println("Received message from topic2: " + message);}
}

在生產者的配置中啟用事務,配置 transactional.id,并設置事務管理器 KafkaTransactionManager,它會自動管理 Kafka 事務的開始、提交和回滾。

事務管理:@Transactional 注解用于標識在發送消息的過程是一個事務操作。如果其中任何消息發送失敗,Spring Kafka 會自動回滾事務。

回滾機制:在 sendTransactionalMessages() 中模擬了一個失敗的條件,確保事務在遇到異常時會被回滾。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/909797.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/909797.shtml
英文地址,請注明出處:http://en.pswp.cn/news/909797.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

最大子數組和C++

給你一個整數數組 nums &#xff0c;請你找出一個具有最大和的連續子數組&#xff08;子數組最少包含一個元素&#xff09;&#xff0c;返回其最大和。 子數組是數組中的一個連續部分。 示例 1&#xff1a; 輸入&#xff1a;nums [-2,1,-3,4,-1,2,1,-5,4] 輸出&#xff1a;…

centos 7單機安裝ceph并創建rbd塊設備

1. 安裝依賴包 新增阿里云源ceph下載地址 vim /etc/yum.repos.d/ceph.repo [ceph] nameceph baseurlhttp://mirrors.aliyun.com/ceph/rpm-jewel/el7/x86_64/ gpgcheck0 [ceph-noarch] namecephnoarch baseurlhttp://mirrors.aliyun.com/ceph/rpm-jewel/el7/noarch/ gpgcheck…

Jenkins搭建、權限管理、參數化、流水線等詳細教程!

部署Jenkins 一、jenkins 安裝 官網&#xff1a; https://jenkins.io yum 安裝 jenkins *jenkins 依賴 java 環境 #注意2.346之后的版本不再支持jdk8 卸載舊jenkins #查詢以前是否安裝jenkins rpm -qa |grep jenkins #卸載 jenkins yum -y remove jenkins rpm -e jenkins…

百度飛槳(PaddlePaddle)案例分享:基于 PaddleOCR 的圖像文字提取系統

一、案例背景 在實際教學、辦公及政務系統中&#xff0c;紙質材料&#xff08;如手寫作文、表格、試卷等&#xff09;仍廣泛存在。為提升信息處理效率&#xff0c;采用 OCR&#xff08;Optical Character Recognition&#xff09;技術將圖像中的文字提取為可編輯文本已成為剛需…

python操控鼠標

在已知屏幕坐標的情況下&#xff0c;可以通過 Python 的 pyautogui 或 pynput 等庫實現網頁上的鼠標點擊操作。以下是具體步驟和代碼示例&#xff1a; 1. 使用 PyAutoGUI&#xff08;推薦&#xff09; pyautogui 是一個簡單易用的庫&#xff0c;可以直接通過屏幕坐標控制鼠標點…

UV 與 Bun 深度解析

UV 與 Bun 深度解析&#xff1a;現代開發工具的安裝與使用指南 什么是 UV&#xff1f; UV&#xff08;Ultra-Velocity&#xff09;是由 Astral 公司&#xff08;Ruff 的創建者&#xff09;開發的超高速 Python 包管理工具&#xff1a; 用 Rust 編寫&#xff0c;速度極快&…

【算力網絡】多樣化算力感知

一、算力網絡 ? 算力網絡&#xff08;Computing Power Network&#xff09;是我國率先提出的原創性技術理念&#xff0c;其核心是通過高速網絡整合分散的算力資源&#xff08;如云端、邊緣、終端等&#xff09;&#xff0c;實現算力的動態感知、智能調度和一體化服務&#x…

Greenplum/PostgreSQL pg_hba.conf 認證方法詳解

Greenplum/PostgreSQL pg_hba.conf 認證方法詳解 pg_hba.conf 文件中的 METHOD 字段指定了客戶端認證方式&#xff0c;以下是各種認證方法的詳細說明和配置示例。 常用認證方法 1. trust - 無條件允許連接 說明&#xff1a;不需要密碼&#xff0c;完全信任連接 適用場景&am…

分布式數據庫中間件-Sharding-JDBC

前言 學習視頻&#xff1a;深入Sharding-JDBC分庫分表從入門到精通【黑馬程序員】本內容僅用于個人學習筆記&#xff0c;如有侵擾&#xff0c;聯系刪除 1、概述 1.1、分庫分表是什么 小明是一家初創電商平臺的開發人員&#xff0c;他負責賣家模塊的功能開發&#xff0c;其中…

pycharm2020.2版本給項目選擇了虛擬環境解釋器,項目文件都運行正常,為什么terminal文件路徑的前面沒有虛擬解釋器的名稱

解決問題&#xff1a; 1.打開 Anaconda Prompt輸入 conda init cmd.exe 或者 pycharm終端直接 conda init cmd.exe 重啟動 CMD和pycharm&#xff0c;使配置生效。

2025商旅平臺排行:國內主流商旅平臺解析

在數字化轉型加速2025年&#xff0c;企業商旅管理正從“成本中心”向“智能管控樞紐”升級。如何通過技術賦能實現商旅成本精準優化與管理效率躍升&#xff1f;本文聚焦國內五大主流商旅平臺&#xff0c;以“綜合型頭部平臺創新型平臺”雙維度解析&#xff0c;結合數據實證與場…

CNS無線電信號覆蓋分析系統v0.1

#系統終端有的版本號了# 開發一套類EMACS的專業軟件任重道遠&#xff0c;經過慢吞吞的開發&#xff0c;我們已經將目標定位大幅下調了&#xff0c;不再對標EMACS系統了&#xff0c;改為瞄行業老二WRAP軟件了。當然WRAP軟件在電磁信號仿真分析領域也是神一樣的存在&#xff0c;其…

單視頻二維碼生成與列表二維碼生成(完整版)

視頻二維碼有有兩種情況&#xff1a;一種是單個視頻的生成一個二維碼&#xff1b;另一種是一組視頻&#xff08;多個視頻&#xff09;生成一個列表二維碼。用戶按自己的實際需求生成&#xff0c;即可&#xff0c;很方便。 STEP1 注冊帳號 使用視頻二維碼&#xff0c;您需要注…

關于linux:1. Linux 基礎運維

一、Linux 安裝與發行版選擇 關于操作系統種類&#xff1a; 1&#xff09;基于 Linux 內核的操作系統 Ubuntu、Debian、Kali、CentOS、RHEL、Arch、Android、Alpine、OpenWRT 等 特點&#xff1a;開源、穩定、安全、廣泛使用于服務器與開發領域 2&#xff09;基于 Windows…

(LeetCode 每日一題) 2016. 增量元素之間的最大差值 (數組)

題目&#xff1a;2016. 增量元素之間的最大差值 思路&#xff1a;維護已遍歷過的最小值&#xff0c;時間復雜度0(n)。 C版本&#xff1a; class Solution { public:int maximumDifference(vector<int>& nums) {int mnnums[0];int ans0;for(int i1;i<nums.size()…

MySQL基礎與常用數據類型淺析

一.MySQL數據類型分類 二.數值類型 2.1int類型 我們使用TINYINT作為例子進行實驗驗證: 越界插入會直接報錯,跟我們當時學習語言的時候不太一樣,語言會進行隱式類型轉換或截斷.一般不會直接報錯.其他的int類型也是同理. 說明: 在MySQL中&#xff0c;整型可以指定是有符號的…

Ubuntu 20.04離線安裝Nvidia-docker

服務器因系統故障重裝&#xff0c;安裝docker容器時發現幾年前的在線安裝步驟不好使了&#xff0c;只好嘗試離線安裝。為了下次不卡殼&#xff0c;記錄一下安裝步驟。 先確定自己的操作系統&#xff0c;并確保已經安裝了nvidia driver。我的操作系統是Ubuntu 20.04。 1. 下載…

6,TCP客戶端

1,創建一個新的項目 2,界面設計

【dify更新問題】如何更新dify且低成本解決git pull 443問題

我的dify部署在mini server上&#xff0c;掛不了TZ&#xff0c;所以采用了如下辦法 更新origin (.git/config) 地址為&#xff1a;https://gitee.com/dify_ai/dify.git 順序執行 &#xff08;https://docs.dify.ai/en/getting-started/install-self-hosted/docker-compose#upg…

即時通訊消息推送技術深度解析:從底層原理到行業實踐-優雅草卓伊凡|片翼|擱淺

即時通訊消息推送技術深度解析&#xff1a;從底層原理到行業實踐-優雅草卓伊凡|片翼|擱淺 引言&#xff1a;重新啟程的即時通訊項目 優雅草科技的卓伊凡最近重啟了即時通訊項目的二次開發工作&#xff0c;在這個萬物互聯的時代&#xff0c;消息推送通知作為IM系統的核心功能之…