Spring Boot整合Kafka

文章目錄

      • 1. 介紹
      • 2. Kafka基礎
        • 2.1. 安裝KafKa
        • kafka集群搭建_kafka交流群-CSDN博客
      • 3. Spring Boot整合Kafka
        • 3.1. 引入Kafka依賴
        • 3.2.編寫配置文件
      • 4. 生產者(produced)
        • 4.1. 生產者基礎案例(基礎測試)
      • 5. 消費者
        • 5.1.消費者基本案例(基礎測試)
      • 6.Kafka常用配置
        • kafka配置文件解釋
          • producer.properties解釋
          • consumer.properties解釋
          • server.properties解釋
      • 7.ACK
      • 8.查看指定分區的數據

1. 介紹

Kafka作為一款分布式流處理平臺,具有高吞吐量、持久性、容錯性等特點,適用于構建大規模的實時數據管道。

Spring Boot作為快速開發框架,提供了簡化開發和部署的能力,使得與Kafka的整合變得更加容易。

通過將Spring Boot與Kafka進行整合,具有以下優勢

  1. 異步消息處理:Kafka可以作為消息隊列,實現異步消息的生產和消費,提高系統的吞吐量和響應速度。
  2. 解耦和擴展性:消息隊列的引入可以解耦不同系統或模塊之間的依賴關系,提高系統的靈活性和可維護性。
  3. 數據流處理:Kafka支持流處理,可以用于實時數據分析、日志收集、事件驅動等場景,為業務提供更多的數據洞察。
  4. 可靠性和容錯性:Kafka具有高度可靠性和容錯性,能夠保證消息不丟失,并且支持分布式部署,保證系統的穩定性和可靠性。

Spring Boot與Kafka整合為開發者提供了一種強大的消息傳遞解決方案,能夠滿足現代分布式系統對于消息傳遞的需求,提高系統的性能、可維護性和可擴展性。

2. Kafka基礎

2.1. 安裝KafKa

可參考本篇博客

kafka集群搭建_kafka交流群-CSDN博客

Kafka的一些特點

  1. 發布訂閱模型:
    • 生產者將消息發布到主題,消費者訂閱這些主題以接收消息。
    • 每個主題可以有多個訂閱者,消息會廣播給所有訂閱者。
  2. 消息日志:
    • Kafka將消息存儲在持久化的日志中,每個消息都有一個唯一的偏移量。
    • 日志被分割成多個分區,每個分區中的消息有順序的索引。
    • 分區允許Kafka在集群中并行處理和存儲消息,提高了吞吐量和擴展性。
  3. 分布式架構:
    • Kafka是一個分布式系統,集群由多個Broker組成。
    • 每個分區有副本分布在不同的Broker上,確保消息的可靠性和容錯性。
    • 分布式架構支持水平擴展,能夠處理大規模的數據和高并發的請求。
  4. 高吞吐量:
    • Kafka被設計為高吞吐量的消息系統,能夠處理每秒數百萬條消息。
    • 高效的批量處理和零拷貝機制使得Kafka能夠提供低延遲的消息傳遞服務。
  5. 持久性:
    • Kafka的消息是持久化存儲的,可以通過配置持久化策略來保留消息的時間和大小。
    • 消息一旦被寫入到Kafka中就不會丟失,即使消費者尚未處理。
  6. 水平擴展:
    • Kafka集群可以水平擴展,通過增加Broker和分區來提高系統的容量和吞吐量。
    • 新的Broker和分區可以動態地加入到集群中,而不會中斷服務。
  7. 可靠性和容錯性:
    • Kafka通過副本機制和ISR(In-Sync Replicas)機制實現高可靠性和容錯性。
    • ISR機制確保了即使部分Broker失效,也能繼續保持數據的一致性和可用性。
  8. 流處理:
    • Kafka Streams API提供了流處理的能力,允許開發者在Kafka中進行實時數據處理和分析。
    • 流處理功能使得Kafka能夠更靈活地處理實時數據流和生成實時結果。

3. Spring Boot整合Kafka

3.1. 引入Kafka依賴

image-20240229213324522

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.9.13</version>
</dependency>gradle:
implementation 'org.springframework.kafka:spring-kafka:2.9.13'
3.2.編寫配置文件
# 配置環境的 指定topic 如果有多個 可以使用 ,進行連接
dc:topics:info: ENTRY_USER_INFO# 指定 組idgroup-id: hrfan-consumer-group
spring:kafka:consumer:bootstrap-servers: 192.168.112.128:9092group-id: hrfan-consumer-groupauto-offset-reset: earliest# 錯誤處理key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializervalue-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializerproperties:spring.json.trusted.packages: '*'# 序列化、反序列化一致spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer# 注意 這里需要使用 org.apache.kafka.common.serialization.StringDeserializer# 不能使用 org.springframework.kafka.support.serializer.JsonSerializer 不然會報錯 需要保持一致spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializerproducer:bootstrap-servers: 192.168.112.128:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.springframework.kafka.support.serializer.JsonSerializer

4. 生產者(produced)

4.1. 生產者基礎案例(基礎測試)
/*** 測試向Kafka發送消息** @author 13723* @version 1.0* 2024/3/1 10:35*/
@SpringBootTest
public class KafkaProducedTest {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());@Value("${dc.topics.dutyform}")private String topics;/*** kafka模板 String消息*/@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** kafka模板 Message消息*/@Resourceprivate KafkaTemplate<String, Message<String>> kafkaTemplateMessage;@Test@DisplayName("測試向KfaKa發送消息")public void testSend() {// 發送普通消息kafkaTemplate.send(topics, "測試發送普通消息-無Key");// 發送Key-Value消息kafkaTemplate.send(topics, "hrfan-key-1", "測試發送key-Value消息");// 發送Partition(分區)-Offset(偏移量)消息// 在Apache Kafka中,消息被組織在稱為"主題(Topics)"的邏輯類別中。// 每個主題可以被劃分為一個或多個"分區(Partitions)"。分區是消息的物理存儲單元,它們分布在不同的Kafka服務器上。// 每個分區中的消息都有一個唯一的編號,稱為"偏移量(Offset)"。這個偏移量標識了消息在該分區中的位置。偏移量是一個遞增的整數,新消息的偏移量比舊消息的偏移量大。kafkaTemplate.send(topics, 0, "hrfan-key-2", "測試發送Partition-Offset消息");// 可以理解為拼裝JSON類型數據// 發送Message消息// 通過sendDefault()方法發送消息,消息將會被發送到默認的主題中。String event = "測試發送Message消息";Map<String, Object> map = new HashMap<>();map.put("token", UUID.randomUUID().toString());MessageHeaders headers = new MessageHeaders(map);Message<String> message = MessageBuilder.createMessage(event, headers);// 設置默認topickafkaTemplate.setDefaultTopic(topics);// 將消息發送到默認的topic// 注意此時修改 泛型為// @Resource// private KafkaTemplate<String,Message<String>> kafkaTemplateMessage;kafkaTemplateMessage.sendDefault("hrfan-key-3", message);logger.info("消息發送成功");}}

image-20240301133509648

image-20240301133913211

5. 消費者

5.1.消費者基本案例(基礎測試)
/*** 模擬Kafka消費者* @author 13723* @version 1.0* 2024/2/29 17:01*/
@Component
public class KafkaCustomerDemo {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());@KafkaListener(topics = "#{'${dc.topics.dutyform}'.split(',')}", groupId = "${dc.group-id}", autoStartup = "true")public void onMessageSync(ConsumerRecord<String, String> record,  @Headers Map<String,Object> headers) {logger.error("獲取到的信息為:{},",record);}}

image-20240301133749367

6.Kafka常用配置

kafka配置文件解釋
producer.properties解釋
producer.properties:生產端的配置文件
#指定kafka節點列表,用于獲取metadata,不必全部指定
#需要kafka的服務器地址,來獲取每一個topic的分片數等元數據信息。
metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092#生產者生產的消息被發送到哪個block,需要一個分組策略。
#指定分區處理類。默認kafka.producer.DefaultPartitioner,表通過key哈希到對應分區
#partitioner.class=kafka.producer.DefaultPartitioner#生產者生產的消息可以通過一定的壓縮策略(或者說壓縮算法)來壓縮。消息被壓縮后發送到broker集群,
#而broker集群是不會進行解壓縮的,broker集群只會把消息發送到消費者集群,然后由消費者來解壓縮。
#是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。
#壓縮后消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。
#文本數據會以1比10或者更高的壓縮比進行壓縮。
compression.codec=none#指定序列化處理類,消息在網絡上傳輸就需要序列化,它有String、數組等許多種實現。
serializer.class=kafka.serializer.DefaultEncoder#如果要壓縮消息,這里指定哪些topic要壓縮消息,默認empty,表示不壓縮。
#如果上面啟用了壓縮,那么這里就需要設置
#compressed.topics= 
#這是消息的確認機制,默認值是0。在面試中常被問到。
#producer有個ack參數,有三個值,分別代表:
#(1)不在乎是否寫入成功;
#(2)寫入leader成功;
#(3)寫入leader和所有副本都成功;
#要求非常可靠的話可以犧牲性能設置成最后一種。
#為了保證消息不丟失,至少要設置為1,也就
#是說至少保證leader將消息保存成功。
#設置發送數據是否需要服務端的反饋,有三個值0,1,-1,分別代表3種狀態:
#0: producer不會等待broker發送ack。生產者只要把消息發送給broker之后,就認為發送成功了,這是第1種情況;
#1: 當leader接收到消息之后發送ack。生產者把消息發送到broker之后,并且消息被寫入到本地文件,才認為發送成功,這是第二種情況;#-1: 當所有的follower都同步消息成功后發送ack。不僅是主的分區將消息保存成功了,
#而且其所有的分區的副本數也都同步好了,才會被認為發動成功,這是第3種情況。
request.required.acks=0#broker必須在該時間范圍之內給出反饋,否則失敗。
#在向producer發送ack之前,broker允許等待的最大時間 ,如果超時,
#broker將會向producer發送一個error ACK.意味著上一次消息因為某種原因
#未能成功(比如follower未能同步成功)
request.timeout.ms=10000#生產者將消息發送到broker,有兩種方式,一種是同步,表示生產者發送一條,broker就接收一條;
#還有一種是異步,表示生產者積累到一批的消息,裝到一個池子里面緩存起來,再發送給broker,
#這個池子不會無限緩存消息,在下面,它分別有一個時間限制(時間閾值)和一個數量限制(數量閾值)的參數供我們來設置。
#一般我們會選擇異步。
#同步還是異步發送消息,默認“sync”表同步,"async"表異步。異步可以提高發送吞吐量,
#也意味著消息將會在本地buffer中,并適時批量發送,但是也可能導致丟失未發送過去的消息
producer.type=sync#在async模式下,當message被緩存的時間超過此值后,將會批量發送給broker,
#默認為5000ms
#此值和batch.num.messages協同工作.
queue.buffering.max.ms = 5000#異步情況下,緩存中允許存放消息數量的大小。
#在async模式下,producer端允許buffer的最大消息量
#無論如何,producer都無法盡快的將消息發送給broker,從而導致消息在producer端大量沉積
#此時,如果消息的條數達到閥值,將會導致producer端阻塞或者消息被拋棄,默認為10000條消息。
queue.buffering.max.messages=20000#如果是異步,指定每次批量發送數據量,默認為200
batch.num.messages=500#在生產端的緩沖池中,消息發送出去之后,在沒有收到確認之前,該緩沖池中的消息是不能被刪除的,
#但是生產者一直在生產消息,這個時候緩沖池可能會被撐爆,所以這就需要有一個處理的策略。
#有兩種處理方式,一種是讓生產者先別生產那么快,阻塞一下,等會再生產;另一種是將緩沖池中的消息清空。
#當消息在producer端沉積的條數達到"queue.buffering.max.meesages"后阻塞一定時間后,
#隊列仍然沒有enqueue(producer仍然沒有發送出任何消息)
#此時producer可以繼續阻塞或者將消息拋棄,此timeout值用于控制"阻塞"的時間
#-1: 不限制阻塞超時時間,讓produce一直阻塞,這個時候消息就不會被拋棄
#0: 立即清空隊列,消息被拋棄
queue.enqueue.timeout.ms=-1#當producer接收到error ACK,或者沒有接收到ACK時,允許消息重發的次數
#因為broker并沒有完整的機制來避免消息重復,所以當網絡異常時(比如ACK丟失)
#有可能導致broker接收到重復的消息,默認值為3.
message.send.max.retries=3#producer刷新topic metada的時間間隔,producer需要知道partition leader
#的位置,以及當前topic的情況
#因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,
#將會立即刷新
#(比如topic失效,partition丟失,leader失效等),此外也可以通過此參數來配置
#額外的刷新機制,默認值600000
topic.metadata.refresh.interval.ms=60000
consumer.properties解釋
#消費者集群通過連接Zookeeper來找到broker。
#zookeeper連接服務器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181#zookeeper的session過期時間,默認5000ms,用于檢測消費者是否掛掉
zookeeper.session.timeout.ms=5000#當消費者掛掉,其他消費者要等該指定時間才能檢查到并且觸發重新負載均衡
zookeeper.connection.timeout.ms=10000#這是一個時間閾值。
#指定多久消費者更新offset到zookeeper中。
#注意offset更新時基于time而不是每次獲得的消息。
#一旦在更新zookeeper發生異常并重啟,將可能拿到已拿到過的消息
zookeeper.sync.time.ms=2000#指定消費
group.id=xxxxx#這是一個數量閾值,經測試是500條。
#當consumer消費一定量的消息之后,將會自動向zookeeper提交offset信息#注意offset信息并不是每消費一次消息就向zk提交
#一次,而是現在本地保存(內存),并定期提交,默認為true
auto.commit.enable=true# 自動更新時間。默認60 * 1000
auto.commit.interval.ms=1000# 當前consumer的標識,可以設定,也可以有系統生成,
#主要用來跟蹤消息消費情況,便于觀察
conusmer.id=xxx# 消費者客戶端編號,用于區分不同客戶端,默認客戶端程序自動產生
client.id=xxxx# 最大取多少塊緩存到消費者(默認10)
queued.max.message.chunks=50# 當有新的consumer加入到group時,將會reblance,此后將會
#有partitions的消費端遷移到新  的consumer上,如果一個
#consumer獲得了某個partition的消費權限,那么它將會向zk
#注冊 "Partition Owner registry"節點信息,但是有可能
#此時舊的consumer尚沒有釋放此節點, 此值用于控制,
#注冊節點的重試次數.
rebalance.max.retries=5#每拉取一批消息的最大字節數
#獲取消息的最大尺寸,broker不會像consumer輸出大于
#此值的消息chunk 每次feth將得到多條消息,此值為總大小,
#提升此值,將會消耗更多的consumer端內存
fetch.min.bytes=6553600#當消息的尺寸不足時,server阻塞的時間,如果超時,
#消息將立即發送給consumer
#數據一批一批到達,如果每一批是10條消息,如果某一批還
#不到10條,但是超時了,也會立即發送給consumer。
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360# 如果zookeeper沒有offset值或offset值超出范圍。
#那么就給個初始的offset。有smallest、largest、
#anything可選,分別表示給當前最小的offset、
#當前最大的offset、拋異常。默認largest
auto.offset.reset=smallest# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder
server.properties解釋
server.properties:服務端的配置文件
#broker的全局唯一編號,不能重復
broker.id=0#用來監聽鏈接的端口,producer或consumer將在此端口建立連接
port=9092#處理網絡請求的線程數量,也就是接收消息的線程數。
#接收線程會將接收到的消息放到內存中,然后再從內存中寫入磁盤。
num.network.threads=3#消息從內存中寫入磁盤是時候使用的線程數量。
#用來處理磁盤IO的線程數量
num.io.threads=8#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400#接受套接字的緩沖區大小
socket.receive.buffer.bytes=102400#請求套接字的緩沖區大小
socket.request.max.bytes=104857600#kafka運行日志存放的路徑
log.dirs=/export/servers/logs/kafka#topic在當前broker上的分片個數
num.partitions=2#我們知道segment文件默認會被保留7天的時間,超時的話就
#會被清理,那么清理這件事情就需要有一些線程來做。這里就是
#用來設置恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1#segment文件保留的最長時間,默認保留7天(168小時),
#超時將被刪除,也就是說7天之前的數據將被清理掉。
log.retention.hours=168#滾動生成新的segment文件的最大時間
log.roll.hours=168#日志文件中每個segment的大小,默認為1G
log.segment.bytes=1073741824#上面的參數設置了每一個segment文件的大小是1G,那么
#就需要有一個東西去定期檢查segment文件有沒有達到1G,
#多長時間去檢查一次,就需要設置一個周期性檢查文件大小
#的時間(單位是毫秒)。
log.retention.check.interval.ms=300000#日志清理是否打開
log.cleaner.enable=true#broker需要使用zookeeper保存meta數據
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181#zookeeper鏈接超時時間
zookeeper.connection.timeout.ms=6000#上面我們說過接收線程會將接收到的消息放到內存中,然后再從內存
#寫到磁盤上,那么什么時候將消息從內存中寫入磁盤,就有一個
#時間限制(時間閾值)和一個數量限制(數量閾值),這里設置的是
#數量閾值,下一個參數設置的則是時間閾值。
#partion buffer中,消息的條數達到閾值,將觸發flush到磁盤。
log.flush.interval.messages=10000#消息buffer的時間,達到閾值,將觸發將消息從內存flush到磁盤,
#單位是毫秒。
log.flush.interval.ms=3000#刪除topic需要server.properties中設置delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true#此處的host.name為本機IP(重要),如果不改,則客戶端會拋出:
#Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=kafka01advertised.host.name=192.168.239.128

7.ACK

什么是ACK?

? 在Apache Kafka中,“Ack” 是 “Acknowledgement” 的縮寫,用于表示生產者(producer)發送消息給 Kafka 服務器后,服務器返回的確認信息。

Kafka提供了三種ACK級別

  • acks=0:生產者發送消息后,不等待任何確認,直接發送下一條消息。
  • acks=1:生產者發送消息后,等待leader節點成功寫入消息后返回確認,然后發送下一條消息。
  • acks=all:生產者發送消息后,等待所有的follower節點和leader節點都成功寫入消息后返回確認,然后發送下一條消息。

acks=all 是最安全的設置,但是也會導致最慢的性能,因為要等待多個副本的確認。

image-20240301183001141

  • 生產者(produced)發送消息到 leaderleader收到消息會發送ACK
    • leader負責處理讀寫操作
      • 如果leader出現故障,會從follwer中重新選取leader
    • follower負責副本數據之間的同步
      • follower可以理解為自動備份,會不斷從對應分區拉取leader的數據,對數據進行存儲
  • leaderfollower 之間同步數據也會發送ACK

在Spring-Kafka中,提供了集中AckMode模式

org.springframework.kafka.listener.ContainerProperties.AckMode
public static enum AckMode {RECORD,BATCH,TIME,COUNT,COUNT_TIME,MANUAL,MANUAL_IMMEDIATE;private AckMode() {}
}
  1. RECORD:每處理一條消息后立即發送確認。這意味著每處理一條消息,消費者都會向 Kafka 代理發送一個確認消息。
  2. BATCH:批量確認模式。消費者將處理一批消息后才發送一次確認。這種模式可以減少確認消息的數量,提高性能。
  3. TIME:定時確認模式。消費者將在一定時間間隔內處理的所有消息后發送一次確認。這可以控制確認消息的發送頻率。
  4. COUNT:計數確認模式。消費者將處理一定數量的消息后發送一次確認。這種模式也有助于控制確認消息的發送頻率。
  5. COUNT_TIME:結合了計數和定時的確認模式。消費者將在達到一定數量的消息或一定時間間隔內發送一次確認,以提高靈活性和性能。
  6. MANUAL:手動確認模式。消費者需要在處理消息后顯式地調用確認操作,以告知 Kafka 代理消息已被處理。
  7. MANUAL_IMMEDIATE:立即手動確認模式。與上述手動確認模式類似,但在調用確認操作后立即發送確認,而不是等待一定的時間或數量。

7.1手動提交ACK

Kafka中ACK默認是自動提交的,在開發中,有時候我們需要進行手動提交ACK,那么在配置中我們可以做如下修改

  • 禁止自動提交enable-auto-commit=false
  • 設置ack-modemanual_immediat(立即手動確認模式)
	@KafkaListener(topics = "#{'${dc.topics.user}'.split(',')}", groupId = "${dc.group-id}", autoStartup = "true")public void onMessageSync(ConsumerRecord<String, String> record,  @Headers Map<String,Object> headers,Acknowledgment ack) {logger.error("獲取到的信息為:{},",record);//手動提交offset// ack.acknowledge();}

image-20240301201320109

image-20240301201438112

image-20240301201524566

8.查看指定分區的數據

有時候需要在服務器上查詢一些分區的數據,可以使用 kafka提供的工具 kafka-console-consumer

kafka-console-consumer --bootstrap-server 192.168.112.129:9092 --topic ENTRY_LIST_SEND_INFO --from-beginning

image-20240301171337018

image-20240301171647186

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/712963.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/712963.shtml
英文地址,請注明出處:http://en.pswp.cn/news/712963.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【LLM RAG】GritLM:統一嵌入和生成的大語言模型淺談

前言 目前&#xff0c;所有基于文本的語言問題都可以歸結為生成問題&#xff0c;并通過單一的LLM來處理。然而&#xff0c;使用嵌入的任務&#xff08;如聚類或檢索&#xff09;在這種視角下往往被忽視了。文本嵌入在許多關鍵的實際應用中扮演著重要角色。如RAG&#xff0c;在…

AIGC下一步:如何用AI再度重構或優化媒體處理?

讓媒資中“沉默的大多數”再次煥發光彩。 鄒娟&#xff5c;演講者 編者按 AIGC時代下&#xff0c;媒體內容生產領域隨著AI的出現也涌現出更多的變化與挑戰。面對AI的巨大沖擊&#xff0c;如何優化或重構媒體內容生產技術架構&#xff1f;在多樣的應用場景中媒體內容生產技術又…

Oracle case when end和decode的區別

Oracle中的CASE WHEN和DECODE都是條件表達式&#xff0c;但它們在某些方面有所不同。 CASE WHEN&#xff1a; CASE WHEN是一個條件表達式&#xff0c;允許您基于條件返回不同的值。它具有以下結構&#xff1a; sql CASE WHEN condition1 THEN result1 WHEN condition2 THE…

JavaScript 基本數據類型的詳解

JavaScript的基本數據類型 以下都是JS內置的幾種類型 數據類型描述number數字&#xff0c;不區分整數和小數string字符串類型booleantrue 真, false 假undefined表示未定義的值null只有唯一的值 null&#xff0c;表示空值 number 數字類型 JavaScript 中不區分整數和浮點數&…

itertools, 一個超好用的Python庫

前言 Python用來處理迭代器的工具你想到了啥&#xff1f;itertools 就是一個特別有用的庫&#xff0c;它提供了一系列用于創建和操作迭代器的工具&#xff0c;以下是10個常用的操作&#xff0c;可用在實際工作中&#xff0c;熟練掌握這些操作&#xff0c;將極大提升你在 Pytho…

棧(順序棧)實現Language C

###王道考研的學習領悟&#xff0c;個人喜好講解清晰 何為棧&#xff1f; 定義:棧&#xff08;stack&#xff09;是只允許在一端進行插入或刪除的線性表。 其重要術語&#xff1a;棧頂&#xff0c;棧底&#xff0c;空棧。 我們只需要把這個圖看明白了&#xff0c;理解起來就…

學校機房Dev c++解決中文亂碼問題

工具->編譯選項->勾選 編譯時加入以下命令 -fexec-charsetGBK -finput-charsetUTF-8 顯示中文&#xff1a;工具->編輯器選項->去掉第一個的勾勾。

vue+node對接支付寶沙箱

目錄 一、準備工作 二、后端node 1、添加支付寶配置文件 2、開始寫支付提交接口 3、前端部分&#xff08;點擊提交訂單&#xff09;3.1、寫axios請求 3.2、點擊提交訂單按鈕后發送網絡請求 3.3、付款成功跳轉的頁面的路徑是下面這樣的&#xff1a; 4、后端向支付寶發請求…

Github上最值得學習的10個Android開源項目,安卓面試題

1.Java語言進階與Android相關技術核 Android應用是由Java語言進行開發的&#xff0c;SDK也是由Java語言編寫&#xff0c;對于Android來說&#xff0c;只要SDK沒有用Kotlin重寫&#xff0c;那么Java語言是都需要學習的。而且Android APK的后臺服務器程序大概率是Java語言構建&a…

【計算機網絡】應用層自定義協議

自定義協議 一、為什么需要自定義協議&#xff1f;二、網絡版計算器1. 基本要求2. 序列化和反序列化3. 代碼實現&#xff08;1&#xff09;封裝 socket&#xff08;2&#xff09;定制協議和序列化反序列化&#xff08;3&#xff09;客戶端&#xff08;4&#xff09;計算器服務端…

Javaweb之SpringBootWeb案例之自動配置以及常見方案的詳細解析

3.2 自動配置 我們講解了SpringBoot當中起步依賴的原理&#xff0c;就是Maven的依賴傳遞。接下來我們解析下自動配置的原理&#xff0c;我們要分析自動配置的原理&#xff0c;首先要知道什么是自動配置。 3.2.1 概述 SpringBoot的自動配置就是當Spring容器啟動后&#xff0c…

OLLAMA 本地模型調用

Ollama 網址下載 再cmd&#xff0c;用 library 里面的庫 英文對話&#xff1a; Gemma is available in both 2b and 7b parameter sizes: ollama run gemma:2bollama run gemma:7b (default) 中文對話 ollama run qwen:0.5bollama run qwen:1.8b 用vscode而不是cmd調用 …

【論文筆記】An Effective Adversarial Attack on Person Re-Identification ...

原文標題&#xff08;文章標題處有字數限制&#xff09;&#xff1a; 《An Effective Adversarial Attack on Person Re-Identification in Video Surveillance via Dispersion Reduction》 Abstract 通過減少神經網絡內部特征圖的分散性攻擊reid模型。 erbloo/Dispersion_r…

強化學習嵌入Transformer(代碼實踐)

這里寫目錄標題 ChatGPT的答案GPT4.0 ChatGPT的答案 # 定義Transformer模塊 class Transformer(nn.Module):def __init__(self, input_dim, hidden_dim, num_heads, num_layers):super(Transformer, self).__init__()self.encoder_layer nn.TransformerEncoderLayer(d_modeli…

Vue3中組件通訊的方式

Vue3中組件通訊的方式 1 &#x1f916;GPT&#x1f916;: (答案有點問題混淆了vue2的內容) 父組件向子組件傳遞數據 props 子組件通過 props 屬性從父組件接收數據。emit事件子組件通過emit 事件 子組件通過 emit事件子組件通過emit 發射事件向父組件發送消息。provide / in…

Java SpringCloud gateway面試題

Java SpringCloud gateway面試題 前言1、什么是網關Zuul&#xff08;gateway&#xff09;&#xff1f;2、服務網關的作用&#xff1f;3、Zuul網關(Gateway)如何搭建集群&#xff1f;4、ZuulFilter常用有那些方法&#xff1f;5、如何實現動態zuul網關路由轉發&#xff1f;6、在Z…

kubeadm安裝部署

目錄 1.要求 2.環境準備 3.所有節點安裝docker 4.所有節點安裝kubeadm&#xff0c;kubelet和kubectl 5.部署K8S集群 6.測試 7.擴展3個副本 8.部署Dashboard master&#xff08;2C/4G&#xff0c;cpu核心數要求大于2&#xff09;192.168.27.10docker、kubeadm、kubelet、…

LightDB - ecpg 支持dml 中使用 return into 【24.1】

在之前的版本中ecpg 中只能使用returning into 來給c 變量賦值&#xff0c;如下&#xff1a; exec sql update t1 set c aa where id 2 returning c into :c_val;為了兼容oracle pro*c 中return into 的用法&#xff0c;從24.1 開始&#xff0c; LightDB 也支持通過return in…

Chrome插件 | WEB 網頁數據采集和爬蟲程序

無邊無形的互聯網遍地是數據&#xff0c;品類豐富、格式繁多&#xff0c;包羅萬象。數據采集&#xff0c;或說抓取&#xff0c;就是把分散各處的內容&#xff0c;通過各種方式匯聚一堂&#xff0c;是個有講究要思考的體力活。君子愛數&#xff0c;取之有道&#xff0c;得注意遵…

mobile app 安全掃描工具MobSF了解下

可以干啥&#xff1a; static 靜態分析 dynamic 動態分析 可以用來滲透了 如何docker安裝 docker image 下載地址https://hub.docker.com/r/opensecurity/mobile-security-framework-mobsf/ setup 兩行即可 1 docker pull opensecurity/mobile-security-framework-mobsf…