Kafka(四) Consumer消費者

一,基礎知識


1,消費者與消費組

  • 每個消費者都有對應的消費組,不同消費組之間互不影響。
  • Partition的消息只能被一個消費組中的一個消費者所消費, 但Partition也可能被再平衡分配給新的消費者
  • 一個Topic的不同Partition會根據分配策略(消費者客戶端參數partition.assignment strategy)分給不同消費者。

2,Kafka的消息模式

消息中間件一般有兩種消息投遞模式:點對點模式和發布/訂閱模式,Kafka 同時支持兩種。
  1. 如果所有的消費者都屬于同一消費組,那么所有的消息都會被均衡地投遞給每個消費者,即每條消息只會被一個消費者處理,這就相當于點對點模式;
  2. 如果所有的消費者都隸屬于不同的消費組,那么所有的消息都會被廣播給所有的消費者,即每條消息會被所有的消費者處理,這就相當于發布/訂閱模式;

二,Client開發


1,消費邏輯需要具備以下幾個步驟

  1. 配置消費者參數及創建消費者實例
  2. 訂閱主題
  3. 拉取消息并消費
  4. 提交消費位移
  5. 關閉消費者實例
public class Consumer {private static final String BROKER_LIST = "localhost:9092";private static final String TOPIC = "TOPIC-A";private static final String GROUP_ID = "GROUP-A";private static final AtomicBoolean IS_RUNNING = new AtomicBoolean(true);public static Properties initConfig() {Properties properties = new Properties();// 以下3個必須properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 客戶端IDproperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "eris-kafka-consumer");// 消費組IDproperties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);// 自動提交,默認為trueproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return properties;}public static void main(String[] args) {Properties properties = initConfig();KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);kafkaConsumer.subscribe(Arrays.asList(TOPIC));try {while (IS_RUNNING.get()) {// poll內部封裝了消費位移提交、消費者協調器、組協調器、消費者的選舉、分區分配與再均衡、心跳等// Duration用來控制在消費者的緩沖區里沒有可用數據時阻塞等待的時間,0表示不等待直接返回ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> r : records) {print("topic:" + r.topic() + ", patition:" + r.partition() + ", offset:" + r.offset());print("key:" + r.key() + ", value:" + r.value());}}} catch (WakeupException e) {// wakeup方法是KafkaConsumer中唯一可以從其他線程里安全調用的方法,調用wakeup后可以退出poll的邏輯,并拋出WakeupException。我們也不需處理WakeupException,它只是一種跳出循環的方式。} catch (Exception e) {e.printStackTrace();} finally {// maybe commit offset.kafkaConsumer.close();}}
}
意:KafkaConsumer是非線程安全的,wakeup()方法是 KafkaConsume 中唯一可以從其他線程里安全調用的方法。

2,subscribe有4個重載方法

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)// 在之后如果又創建了新主題,并且與正表達式相匹配,那么這個消費者也可以消費到新添加的Topic
public void subscribe (Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe (Pattern pattern)

3,assign訂閱指定的分區

消費者不僅可以通過 subscribe方法訂閱,還可以直接訂閱指定分區 ,如下:
consumer.assign(Arrays.asList(new TopicPartition ("topic-demo", 0))) ;
通過subscribe訂閱主題具有消費者自動再均衡功能 ,可以根據分區分配策略來自動分配各個消費者與分區的關系;
通過assign來訂閱分區時,是不具備消費者自動均衡的功能的。

4,取消訂閱

以下三行代碼等效。
consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());

5,消息消費

消費者消費到的每條消息的類型為ConsumerRecord, 這個和生產者發送的ProducerRecord相對應。
注意與ConsumerRecords區別,ConsumerRecords實現了Iterable,是poll返回的對象。
public class ConsumerRecord<K, V> {private final String topic;private final int partition;pr vate final long offset;private final long timestamp;private final TimestampType timestampType;private final int serializedKeySize;private final int serializedValueSize;private final Headers headers;private final K key;private final V value;private volatile Long checksum;//省略若干方法
}
根據Partition或Topic來消費消息
① 根據分區對當前批次消息分類:public List<ConsumerRecord<K, V> records(TopicPart tion partition)for (TopicPartition tp : records.partitions()) {for (ConsumerRecord<String, String> record : records.records(tp)) {System.out.println(record.partition() + ":" + record.value());}
}② 根據主題對當前批次消息分類:public Iterable<ConsumerRecord<K, V> records(String topic)// ConsumerRecords類中并沒提供與partitions()類似的topics()方法來查看拉取的消息集中所含的主題列表。
for (String topic : Arrays.asList(TOPIC)) {for (ConsumerRecord<String, String> record : records.records(topic)) {System.out.println(record.topic() + ":" + record.value());}
}

6,反序列化

ConsumerRecord里的value,就是經過反序列化后的業務對象。
Kafka所提供的反序列器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer。
在實際應用中,在Kafka提供的序列化器和反序列化器滿足不了應用需求的前提下,推薦使用 Avro、JSON、Thrift、 ProtoBuf、Protostuff等通用的序列化工具來包裝,不建議使用自定義的序列化器或反序列化器。

三,位移提交


1,消費位移提交

在每次調用poll方法時,返回的是還沒有被消費過的消息集。
消費位移必須做持久化保存,否則消費者重啟之后就無法知曉之前的消費位移。再者,當有新的消費者加入時,那么必然會再均衡,某個分區可能在再均衡之后分配給新的消費者,如果不持久化保存消費位移,那么這個新消費者也無法知曉之前的消費位移。
在舊消費者客戶端中,消費位移存儲在ZooKeeper中,而在新消費者客戶端中則存儲在Kafka內部的主題 __consumer_offsets中。
這里把將消費位移持久化的動作稱為“提交”?,消費者在消費完消息之后需要執行消費位移的提交。

2,三個位移的關系

? ? ? ? ? ?
  • lastConsumedOffset:當前消費到的位置,即poll拉到的該分區最后一條消息的offset
  • committed offset:提交的消費位移
  • position:下次拉取的位置
position =?committed offset = lastConsumedOffset +?1(當然position和committed offset 并不會一直相同)
TopicPartition tp = new TopicPartition("topic", 0);
kafkaConsumer.assign(Arrays.asList(tp));
long lastConsumedOffset = 0;while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(tp);lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();// 同步提交消費位移kafkaConsumer.commitSync();System.out.println("consumed off set is " + lastConsumedOffset);OffsetAndMetadata offsetAndMetadata = kafkaConsumer.committed(tp);System.out.println("commited offset is " + offsetAndMetadata.offset());long posititon = kafkaConsumer.position(tp);System.out.println("he offset of t he next record is " + posititon);
}輸出結果:
consumed?offset is 377
commited offset is 378
the offset of the next record is 378

3,消息丟失與重復消費

  • 如果poll后立馬提交位移,之后業務異常,再次拉取就從新位移開始,就丟失了數據。
  • 如果poll后先處理數據,處理到一半異常了,或者最后提交位移異常,重新拉取會從之前的位移拉,就重復消費了。

4,自動提交位移原理

Kafka中默認的消費位移的提交方式是自動提交,這個由消費者客戶端參數enable.auto.commit配置,默認值為true。
不是每消費一條消息就提交一次,而是定期提交,這個定期的周期時間由客戶端參數auto.commit.interval.ms配置,默認值為5秒,此參數生效的前提是enable.auto.commit參數為true。
在默認的方式下,消費者每隔5秒會將拉取到的每個分區中最大的消息位移進行提交。動位移提交的動作是在poll()方法的邏輯里完成的,在每次真正向服務端發起拉取請求之前會檢查是否可以進行位移提交,如果可以,那么就會提交上一次輪詢的位移。
自動提交讓編碼更簡潔,但隨之而來的是重復消費和消息丟失的問題。

5,手動提交位移

① ?同步提交
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {//do some logical processing?kafkaConsumer.commitSync();
}1)commitSync會根據poll拉取的最新位移來進行提交(注意提交的值對應于圖3-6 position的位置〉。2)可以使用帶參方法,提交指定位移:commitSync(final Map<TopicPartition OffsetAndMetadata> offsets)3)沒必要每條消息提交一次,可以改為批量提交。
② 異步提交
public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public?void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)1)異步提交在執行的時候消費者線程不會被阻塞,可能在提交消費位移的結果還未返回之前就開始了新一次的poll操作。2)提交也可能會異常,可引入重試機制。但重試可能出現問題,若第一次commitAsync失敗在重試,第二次成功了,然后第一次重試也成功了,就會覆蓋位移為之前。解決方案:可以提交時維護一個序號,如果發現過期的序號就不再重試。
總結:不管是自動提交還是手動提交(同步、異步),都可能出現漏消費和重復消費,一般情況提交位移這一步操作很少失敗,至于業務異常如何影響提交,需要結合具體情況分析。
可以引入重試機制,重試提交或者業務處理。但重試會增加代碼邏輯復雜度。
還有對于消費者異常退出,重復消費的問題就很難避免,因為這種情況下無法及時提交消費位移,需要消費者的冪等處理。
如果消費者正常退出或發生再均衡況,那么可以 在退出或再均衡執行之前使用同步提交的方式做最后的把關
try {while (IS_RUNNING.get()) {// poll records and do some logical processing .kafkaConsumer.commitAsync();}
} finally {try {kafkaConsumer.commitSync();} finally {kafkaConsumer.close();}
}

四,暫停或恢復消費


暫停某些分區在poll時返回數據給客戶端和恢復某些分區返回數據給客戶端。
public void pause(Collection<TopicPartition> partitions)
public roid resume(Collection<TopicPartition> partitions)

五,指定位移消費


有了消費位移的持久化,才使消費者在關閉、崩潰或者在遇到再均衡的時候,可以讓接替的消費者能夠根據存儲的消費位移繼續進行消費。
當新的消費組建立的時候,它根本沒有可以查找的消費位移。或者消費組內的新消費者訂閱了一個新的主題,它也沒有可以查找的消費位移。
auto.offset.reset 配置
① 作用:
  • 決定從何處消費;
② 何時生效:
  1. 找不到消費位移記錄時;
  2. 位移越界時(seek);
③ 配置值:
  1. (默認值)auto.offset.reset=latest,從分區末尾開始消費;
  2. auto.offset.reset=earliest,從分區起始開始消費;
seek方法
① 定義:指定消費的位置,可以向前跳過若干消息或回溯消息。
//?partition:分區,offset:從哪個位置消費?
public void seek(TopicPartition partition, long offset)
② seek方法只能重置消費者分配到的分區的消費位置,而分區的分配是在 poll方法過程中實現的,也就是說在執行seek之前需要先執行一次poll,等到分配到分區之后才可以重置消費位置。
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {// 如采不為空,則說明已經成功分配到了分區kafkaConsumer.poll(Duration.ofMillis(1000));assignment = kafkaConsumer.assignment();
}for (TopicPartition tp : assignment) {kafkaConsumer.seek(tp, 10);
}
while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));//consume the record
}
③?如果對當前消費者未分配到的分區執行 seek方法,那么會報IllegalStateException。
④ 如果消費者啟動能找到位移記錄,又想從頭或者尾消費,可以通過seek結合endOffsets、beginningOffsets或者直接seekToBeginning、seekToEnd實現。
注意:分區的起始位置是0,但并不代表每時每刻都為0,因為日志清理會清理舊的數據 ,所以分區的起始位置自然會增加。
⑤ 按時間戳指定消費位置
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)給定待查分區和時間戳,返回大于等于該時間戳的第一條消息對應的offset和timestamp,對應于OffsetAndTimestamp中的offset、timestamp字段。
⑥ 將分區消費位移存儲在數據庫、文件等外部介質,再通過seek指定消費,可以配合再均衡監聽器來實現新消費者的繼續消費。

六,消費再均衡


再均衡是指分區的所屬權從一個消費者轉移到另一消費者的行為。它為消費組具備高可用性和伸縮性提供保障,使我們可以既方便又安全地刪除消費組內的消費者或往消費組內添加消費者。
不過, 在再均衡發生期間的這一小段時間,消費組會變得不可用。而且再均衡容易導致重復消費等問題,一般情況應盡量避免不必要的再均衡。
再均衡監聽器 ?ConsumerRebalanceListener
注冊:
subscribe(Collection<String> topics, ConsumerRebalanceListener listener) 和 subscribe(Patten pattern, ConsumerRebalanceListener listener)ConsumerRebalanceListener是一個接口,有2個方法。(1) void onPartitionsRevoked(Collection<TopicPartition> partitions)
再均衡開始之前和消費者停止讀取消息之后被調用。(2) void onPartitionsAssigned(Collection<TopicPartition> partitions)
新分配分區之后和消費者開始拉取消費之前被調用 。
示例①:消費時把位移暫存在Map,再均衡之前同步提交,避免發送再均衡的同時,異步提交還沒提交上去(重復消費)。
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
kafkaConsumer.subscribe(Arrays.asList("topic"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {kafkaConsumer.commitSync(currentOffsets);currentOffsets.clear();}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {//do nothing .}
});
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {//process the recordcurrentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));
}
kafkaConsumer.commitAsync(currentOffsets, null);
示例②:把位移存在db,再均衡后通過seek定位繼續消費
kafkaConsumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// store offset in DB}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition tp : partitions) {// 從DB中讀取消費位移kafkaConsumer.seek(tp, getOffsetFromDB(tp));}}
}

七,消費者攔截器


與生產者攔截器對應,消費者攔截器需要自定義實現org.apache.kafka.clients.consumer.Consumerlnterceptor接口。
Consumerlnterceptor有3個方法:
//?poll方法返回之前調用,可以修改返回的消息內容、按照某種規則過濾消息等
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K , V> records);//?提交完消費位移之后調用,可以用來記錄跟蹤所提交的位移信息,比如當使用commitSync的無參方法時,我們不知道提交的消費位移,而onCommit方法卻可以做到這一點
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);public void close();
在消費者中也有攔截鏈的概念,和生產者的攔截鏈一樣, 也是按照interceptor.classes參數配置的攔截器的順序來一一執行的。如果在攔截鏈中某個攔截器執行失敗,那么下一個攔截器會接著從上一個執行成功的攔截器繼續執行。

八,消費者多線程模型


① 每個消費者單獨線程,只消費一個分區
模型簡單,自動、手動提交位移都很簡單。
優點:每個分區可以按順序消費
缺點:多個TCP連接消耗
② 一個消費者拉取,提交線程池處理
各分區的消費是散亂在各線程,提交位移(順序)復雜。
優點:拉取快,TCP連接少
缺點:分區消費順序不好保障。
上圖是自動提交位移。如果要手動提交,可考慮共享offsets方式,同時為了避免對同一個分區,后序批次提交了更大的位移,前序批次處理失敗造成的消息丟失,可以考慮滑動窗口機制。(參考《深入理解Kafka核心設計與實踐原理》)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/45526.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/45526.shtml
英文地址,請注明出處:http://en.pswp.cn/web/45526.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MySQL集群、Redis集群、RabbitMQ集群

一、MySQL集群 1、集群原理 MySQL-MMM 是 Master-Master Replication Manager for MySQL&#xff08;mysql 主主復制管理器&#xff09;的簡稱。腳本&#xff09;。MMM 基于 MySQL Replication 做的擴展架構&#xff0c;主要用來監控 mysql 主主復制并做失敗轉移。其原理是將真…

環境變量在Gradle中的妙用:構建自動化的秘訣

環境變量在Gradle中的妙用&#xff1a;構建自動化的秘訣 在構建自動化的過程中&#xff0c;環境變量扮演著至關重要的角色。它們允許開發者根據不同的運行環境&#xff08;如開發、測試和生產環境&#xff09;來調整配置&#xff0c;而無需修改代碼。Gradle&#xff0c;作為一…

基于Faster R-CNN的安全帽目標檢測

基于Faster R-CNN的安全帽目標檢測項目通常旨在解決工作場所&#xff0c;特別是建筑工地的安全監管問題。這類項目使用計算機視覺技術&#xff0c;特別是深度學習中的Faster R-CNN算法&#xff0c;來自動檢測工人是否正確佩戴了安全帽&#xff0c;從而確保遵守安全規定并減少事…

實驗一:圖像信號的數字化

目錄 一、實驗目的 二、實驗原理 三、實驗內容 四、源程序及結果 源程序&#xff08;python&#xff09;&#xff1a; 結果&#xff1a; 五、結果分析 一、實驗目的 通過本實驗了解圖像的數字化過程&#xff0c;了解數字圖像的數據矩陣表示法。掌握取樣&#xff08;象素個…

用Python爬蟲能實現什么?得到什么?

Python爬蟲是一種強大的工具&#xff0c;可以用來自動化地從互聯網上抓取數據和信息。使用Python實現爬蟲可以達成多種目的&#xff0c;包括但不限于以下幾個方面&#xff1a; 數據收集&#xff1a; 網頁內容抓取&#xff1a;可以抓取網頁上的文本、圖片、視頻等內容。搜索引擎…

Linux 網絡配置與連接

一、網絡配置 1.1 ifconfig 網卡配置查詢 ifconfig #查看所有啟動的網絡接口信息 ifconfig 指定的網卡 #查看指定網絡接口信息 1.2 修改網絡配置文件 vim /etc/sysconfig/network-scripts/ifcfg-ens33 #ens33網絡配置文…

【電源拓撲】反激拓撲

目錄 工作模式 固定頻率 CCM連續電流模式 DCM不連續電流模式 可變頻率 CRM電流臨界模式 反激電源CRM工作模式為什么要跳頻 反激電源應用場景 為什么反激電源功率做不大 電感電流爬升 反激變壓器的限制條件 精通反激電源設計的關鍵-反激電源變壓器設計 反激電源變壓…

MySQL 事務與鎖

事務ACID特性 原子性&#xff1a;事務要么同時成功&#xff0c;要么同時失敗&#xff0c;事務的原子性通過undo log日志保證 一致性&#xff1a;業務代碼要拋出報錯&#xff0c;讓數據庫回滾 隔離性&#xff1a;事務并發執行時&#xff0c;他們內部操作不能互相干擾 持久性&…

Python 讀取esxi上所有主機的設備信息

&#xff08;主要是為了統計所有虛擬機的設備名稱和所屬主機&#xff09; 代碼&#xff1a; from pyVim import connect from pyVmomi import vim import ssldef get_vm_devices(vm):devices []try:if vm.config is not None and hasattr(vm.config, hardware) and hasattr(v…

SpringBoot解決Apache Tomcat輸入驗證錯誤漏洞

Apache Tomcat是美國阿帕奇&#xff08;Apache&#xff09;基金會的一款輕量級Web應用服務器。該程序實現了對Servlet和JavaServer Page&#xff08;JSP&#xff09;的支持。 Apache Tomcat存在輸入驗證錯誤漏洞&#xff0c;該漏洞源于HTTP/2請求的輸入驗證不正確&#xff0c;會…

postgresql簡單導出數據與手動本地恢復(小型數據庫)

問題 需要每天手動備份postgresql。 步驟 導出數據 /opt/homebrew/opt/postgresql16/bin/pg_dump --file/Users/zhangyalin/backup_sql/<IP地址>_pg-2024_07_15_17_30_15-dump.sql --dbname<數據庫名> --username<用戶名> --host<IP地址> --port54…

Day53:圖論 島嶼數量 島嶼的最大面積

99. 島嶼數量 時間限制&#xff1a;1.000S 空間限制&#xff1a;256MB 題目描述 給定一個由 1&#xff08;陸地&#xff09;和 0&#xff08;水&#xff09;組成的矩陣&#xff0c;你需要計算島嶼的數量。島嶼由水平方向或垂直方向上相鄰的陸地連接而成&#xff0c;并且四周…

低空經濟持續發熱,無人機培訓考證就業市場及前景剖析

隨著科技的不斷進步和社會需求的日益增長&#xff0c;低空經濟已成為全球及我國經濟增長的新引擎。作為低空經濟的重要組成部分&#xff0c;無人機技術因其廣泛的應用領域和顯著的經濟效益&#xff0c;受到了社會各界的廣泛關注。為滿足市場對無人機人才的需求&#xff0c;無人…

深入剖析 Android 開源庫 EventBus 的源碼詳解

文章目錄 前言一、EventBus 簡介EventBus 三要素EventBus 線程模型 二、EventBus 使用1.添加依賴2.EventBus 基本使用2.1 定義事件類2.2 注冊 EventBus2.3 EventBus 發起通知 三、EventBus 源碼詳解1.Subscribe 注解2.注冊事件訂閱方法2.1 EventBus 實例2.2 EventBus 注冊2.2.1…

夢想CAD在線預覽編輯功能

1.最近有個需求&#xff0c;在web系統里進行在線進行CAD預覽和編輯&#xff0c;這里用的是夢想CAD實現此功能&#xff0c;夢想CAD官網文檔 2.CAD預覽&#xff0c;需要需要對CAD文件格式進行轉化&#xff0c;將dwg文件格式轉化為mxweb格式&#xff0c;再進行調用夢想CAD里的打開…

ipynb轉換為pdf、Markdown(.md)

Jupyter Notebook 文件&#xff08;.ipynb&#xff09;可以轉換成多種數據格式&#xff0c;以適應不同的使用場景和需求。以下是幾種常見的轉換格式及其簡潔描述&#xff1a; HTML: Jupyter Notebook可以直接導出為靜態的網頁&#xff08;HTML&#xff09;格式&#xff0c;這樣…

記一次IP數據處理過程,文本(CSV文件)處理,IP解析

個人博客&#xff1a;無奈何楊&#xff08;wnhyang&#xff09; 個人語雀&#xff1a;wnhyang 共享語雀&#xff1a;在線知識共享 Github&#xff1a;wnhyang - Overview 起因 突然接收到XX給的任務&#xff0c;要將一批IP數據處理一下&#xff0c;將IP對應的省市區解析出來…

PHP基礎語法

PHP 腳本在服務器上執行&#xff0c;然后將純 HTML 結果發送回瀏覽器。 基本的 PHP 語法 PHP 腳本可以放在文檔中的任何位置。 PHP 腳本以 <?php 開始&#xff0c;以 ?> 結束&#xff1a; <?php // PHP 代碼 ?> PHP 文件的默認文件擴展名是 .php。 PHP 文…

PHP智云物業管理平臺微信小程序系統源碼

?&#x1f3e0;智云物業管理新紀元&#xff01;微信小程序&#xff0c;讓家園管理更智慧&#x1f4f1; &#x1f3e1;【開篇&#xff1a;智慧生活&#xff0c;從物業開始】&#x1f3e1; 在快節奏的現代生活中&#xff0c;我們追求的不僅僅是家的溫馨&#xff0c;更是生活的…

基于hive數據庫的泰坦尼克號幸存者數據分析

進入 ./beeline -u jdbc:hive2://node2:10000 -n root -p 查詢 SHOW TABLES; 刪除 DROP TABLE IF EXISTS tidanic; 上傳數據 hdfs dfs -put train.csv /user/hive/warehouse/mytrain.db/tidanic 《泰坦尼克號幸存者數據分析》 1、原始數據介紹 泰坦尼克號是當時世界上…