8:00 2019/3/141:什么是hadoop? hadoop是解決大數據問題的一整套技術方案2:hadoop的組成? 核心框架 分布式文件系統 分布式計算框架 分布式資源分配框架 hadoop對象存儲 機器計算3:hadoop 云計算 大數據 微服務 人工智能關系 參見word學習文檔 1. 現階段,云計算的兩大底層支撐技術為“虛擬化”和“大數據技術”2. 而HADOOP則是云計算的PaaS層的解決方案之一,并不等同于PaaS,更不等同于云計算本身。4:大數據項目的通常結構 采集數據 數據分析統計 數據展示5:大數據項目的通常技術架構 見畫圖Hadoop Common: 為其他Hadoop模塊提供基礎設施。Hadoop DFS:一個高可靠、高吞吐量的分布式文件系統Hadoop MapReduce:一個分布式的離線并行計算框架Hadoop YARN:一個新的MapReduce框架,任務調度與資源管理6:安裝一個偽分布式的hdfs a:準備安裝介質 hadoop-2.8.2.tar.gz b:把安裝介質上傳到linux c:在linux使用hostname命令確認主機名 d:編輯/etc/hosts文件 完成ip地址和主機名的映射 在分布式的每一臺機器中都需要把所有機器的ip地址和主機名的映射關系配置注意:關閉每臺機器的防火墻 systemctl stop firewalld.service 關閉 systemctl disable firewalld.service 禁止開機啟動 e:配置ssh免密碼登錄 f:從/export/software/下面把hadoop-2.8.2.tar.gz 解壓到/export/servers/下 常用目錄說明: bin sbin hadoop常用的命令目錄 配置到/etc/profile中 etc hadoop常用配置文件目錄 share hadoop核心jar包目錄 g:完成配置文件 hadoop-env.sh 配置java環境 core-site.xml hadoop核心配置 hdfs-site.xml hdfs核心配置 mapred-site.xml mr的核心配置 yarn-site.xml yarn的核心配置 h:格式化namenode環境 創建namenode保存數據的環境 hdfs namenode -format i:使用命令啟動hdfs start-dfs.sh start-yarm.sh 執行完成回到命令行 可以使用jps查看關鍵進程是否已經在運行 namenode SecondaryNameNode datanode還要從web頁面使用http連接管理頁面查看 http://192.168.21.134:50070 可以看到hdfs的管理頁面 證明hdfs安裝并啟動成功 http://192.168.21.134:8088 可以看到mr計算任務的管理頁面 證明mr yarn安裝并啟動成功 注意事項: a:如果某個進程沒有正確啟動 要學會看日志 eg: starting namenode, logging to /export/service/hadoop-2.8.2/logs/hadoop-root-namenode-text4.out 以上輸出信息是說明namenode啟動過程寫入了哪一個日志 假設最終namenode沒有啟動成功 需要打開日志查看原因b:常用命令中還有單獨啟動某個進程的命令 hdfs單獨啟動各個進程服務 hadoop-daemon.sh start namenode hadoop-daemon.sh start datanode hadoop-daemon.sh start secondarynamenode yarn單獨啟動各個進程服務 yarn-daemon.sh start resourcemanager yarn-daemon.sh start nodemanager7:安裝分布式 a:已有一臺虛擬機基礎環境配置完畢 text4 b:利用有的虛擬機復制出2臺 一共3臺虛擬機 復制以已及每臺配置網絡 參照文檔即可b1:配置每臺機器的主機名(提前規劃好)(text4 text5 text6)text4 —— namenode datanode secondarynamenode resourcemanager nodemanagertext5 —— datanode nodemanagertext6 —— datanode nodemanagerb2:修改網卡配置(每臺機器的ip地址是提前規劃好)(192.168.21.134 192.168.21.138 192.168.21.139)b3:修改/etc/hosts文件 配置每臺機器和ip地址的映射關系 c:配置3臺之間可以ssh免密登錄 ssh-keygen ssh-copy-id root@機器名稱c1:配置text4 text5 text6互相都可以ssh免密登錄c2:選取一臺虛擬機做namenode 配置它到text5 text6免密登錄可以在多個機器之間使用scp傳輸文件 如果不需要輸入密碼 則ssh免密配置正確 scp -r(目錄整體復制) 目錄/文件名稱 用戶名@機器名:目的機器的路徑 d:確保所有機器防火墻都是關閉 e:修改每臺機器的hadoop配置文件e1:把每個block(數據塊)修改為有2個備份e2:把namenode相關的ip地址都修改成了主機名 f:把已有的namenode datanode配置好的數據文件夾刪除 g:在namenode節點 執行namenode格式化 hdfs namenode -format h:在namenode執行start-dfs.sh 啟動hdfs 使用jps查看每臺機器的進程規劃8:namenode datanode 數據目錄結構講解 namenode:存放數據目錄位置 /data/hadoop/dfs/namedatanode:存放數據目錄位置 /data/hadoop/dfs/data實際上傳到datanote中的文件數據都是保存在finalized目錄下 eg:從linux 往hdfs上傳一個文件 a0001.data因為配置的block副本是2分 所以3臺datanode節點中只有text4 text5有數據 經驗證和原有的上傳文件內容一致 就以2個副本的方式保證了數據的完整性eg:從linux往hdfs上傳一個大于128m的文件hadoop-2.8.2.tar.gz(240m)因為它大于128m(128m是hdfs中默認的一個block的大小) 所以這個壓縮包被分成2個block上傳每個block還是2個備份文件hdfs dfs -put 待上傳的文件名稱 要上傳的hdfs的目的地路徑secondarynamenode:存放數據目錄位置 /data/hadoop/tmp/dfs/namesecondarystart-all.sh 啟動所有的Hadoop守護進程。包括NameNode、 Secondary NameNode、DataNode、JobTracker、 TaskTrack stop-all.sh 停止所有的Hadoop守護進程。包括NameNode、 Secondary NameNode、DataNode、JobTracker、 TaskTrack start-dfs.sh 啟動Hadoop HDFS守護進程NameNode、SecondaryNameNode和DataNode stop-dfs.sh 停止Hadoop HDFS守護進程NameNode、SecondaryNameNode和DataNode hadoop-daemons.sh start namenode 單獨啟動NameNode守護進程 hadoop-daemons.sh stop namenode 單獨停止NameNode守護進程 hadoop-daemons.sh start datanode 單獨啟動DataNode守護進程 hadoop-daemons.sh stop datanode 單獨停止DataNode守護進程 hadoop-daemons.sh start secondarynamenode 單獨啟動SecondaryNameNode守護進程 hadoop-daemons.sh stop secondarynamenode 單獨停止SecondaryNameNode守護進程 start-mapred.sh 啟動Hadoop MapReduce守護進程JobTracker和TaskTracker stop-mapred.sh 停止Hadoop MapReduce守護進程JobTracker和TaskTracker hadoop-daemons.sh start jobtracker 單獨啟動JobTracker守護進程 hadoop-daemons.sh stop jobtracker 單獨停止JobTracker守護進程 hadoop-daemons.sh start tasktracker 單獨啟動TaskTracker守護進程 hadoop-daemons.sh stop tasktracker 單獨啟動TaskTracker守護進程sc文件:hdfs dfs -put aa.data /logsio nio aio思想b:編寫hdfs java客戶端程序b1:建立開發工程建立maven工程在pom文件中引入開發hdfs客戶端需要的jar包<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.8.2</version></dependency>建立普通的java工程引入jar包 目前只是開發hdfs的java客戶端 只引入hadoop安裝包下share目錄中common和hdfs jar包即可b2:規劃包結構b3:編寫demo測試類創建一個配置信息封裝對象(客戶端處理hdfs 編寫mr 處理hive 處理hase都是需要配置信息)創建一個文件系統的客戶端對象 方便處理文件數據對文件的增刪改查的java api接口使用對某個指定目錄下所有文件或者目錄基本信息的查看hdfs復習點:1:hdfs就是一個文件系統 在操作系統之上。通常操作系統針對hdfs叫做本地2:hdfs的重要組成部分namenode:存儲元數據元數據中數據都有什么?a:某個文件的某個block存儲在哪個datanode上b:每個datanode資源使用情況c:每個文件的屬性 修改時間等等信息datanode:真正存儲文件數據所以一個hdfs沒有namenode或者namenode出問題則不能再使用但是如果是datanode出問題 只可能是數據部分丟失 而不是hdfs不能使用secondary namenode namenode datanode 關系https://www.cnblogs.com/chenyaling/p/5521464.html3:基本的理論知識13:49 2019/3/1413:49 2019/3/14https://blog.csdn.net/lvtula/article/details/82354989 a:hdfs中namenode datanode secondarynamenode 某些主要目錄或者文件的作用 https://blog.csdn.net/baiye_xing/article/details/76268495 namenode:edits fsimage versiondatanode:finalized version blc文件每個都有一個meta文件secondarynamenode:edits fsimageb:namenode中edits和fsimage的作用以及其和內存的相互關系c:namenode和secondarynamenode的工作機制d:hdfs寫流程hdfs讀流程4:hdfs shell5:java api操作hdfsHA hadoop集群搭建步驟: 1:對集群每一臺機器需要安裝什么服務進行規劃。 text4:zk nn dn zkfc jn nm rm text5: zk nn dn zkfc jn nm rm text6: zk dn jn nm2:準備每臺機器的基本環境a:3臺機器之間需要免密登錄b:每臺機器必須安裝好jdk 8.0以上c:集群每臺機器之間時間同步c1:設定每臺機器的正確時區timedatectl set-timezone Asia/Shanghaitimedatectl set-local-rtc 1datec2:選擇集群中一臺機器為主 master 讓其它機器和這臺機器完成時間同步使用rdate完成集群之間時間同步具體操作請參見保存的時間同步頁面說明3:修改hadoop集群的配置文件 按照樣例配置文件修改即可 4:啟動集群過程(安裝過程的啟動)a:啟動zkb: 啟動jn hadoop-daemon.sh start journalnodec:格式化nn (選取text4)d:格式化zkfchdfs zkfc -formatZKe:把text4的nn結構復制到text5先在text4啟動nn在text5執行 hdfs namenode bootstrapstandby如果出現此時在text5并沒有能成功復制text4的nn的目錄機構則直接可以從text4復制到text5 完成2臺nn之間的復制f:啟動集群zk啟動zkfc啟動start-dfs.shstart-yarn.sh(如果那個rm沒有啟動 直接可以使用守護進程)1:什么是hadoop? hadoop是解決大數據問題的一整套技術方案2:hadoop的組成? 核心框架 分布式文件系統 分布式計算框架 分布式資源分配框架 hadoop對象存儲 機器計算3:hadoop 云計算 大數據 微服務 人工智能關系 參見word學習文檔4:大數據項目的通常結構 采集數據 數據分析統計 數據展示hadoop-env.sh 配置java環境 core-site.xml hadoop核心配置 hdfs-site.xml hdfs核心配置 mapred-site.xml mr的核心配置 yarn-site.xml yarn的核心配置namenode:存放數據目錄位置 /data/hadoop/dfs/namenamenode的數據文件:namenode保存的是整個hdfs的元數據eg:上傳文件的所屬 大小 修改日期以及文件每一個block所在哪個datanode對應關系都是namenode元數據保存的edtis:保存的是最近的日志記錄(namenode接收到的命令以及解析)fsimage:保存的是namenode內存信息的鏡像seen_txid:集群狀態的恢復標識步驟解析1: 上傳 1、根namenode通信請求上傳文件,namenode檢查目標文件是否已存在,父目錄是否存在 2、namenode返回是否可以上傳 3、client請求第一個 block該傳輸到哪些datanode服務器上 4、namenode返回3個datanode服務器ABC 5、client請求3臺dn中的一臺A上傳數據(本質上是一個RPC調用,建立pipeline),A收到請求會繼續調用B,然后B調用C,將真個pipeline建立完成,逐級返回客戶端 6、client開始往A上傳第一個block(先從磁盤讀取數據放到一個本地內存緩存),以packet為單位,A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答隊列等待應答 7、當一個block傳輸完成之后,client再次請求namenode上傳第二個block的服務器。詳細步驟解析2: 下載 1、跟namenode通信查詢元數據,找到文件塊所在的datanode服務器 2、挑選一臺datanode(就近原則,然后隨機)服務器,請求建立socket流 3、datanode開始發送數據(從磁盤里面讀取數據放入流,以packet為單位來做校驗) 4、客戶端以packet為單位接收,現在本地緩存,然后寫入目標文件HA hadoop集群搭建步驟: 1:對集群每一臺機器需要安裝什么服務進行規劃。 text4:zk nn dn zkfc jn nm rm text5: zk nn dn zkfc jn nm rm text6: zk dn jn nm2:準備每臺機器的基本環境a:3臺機器之間需要免密登錄b:每臺機器必須安裝好jdk 8.0以上c:集群每臺機器之間時間同步c1:設定每臺機器的正確時區timedatectl set-timezone Asia/Shanghaitimedatectl set-local-rtc 1datec2:選擇集群中一臺機器為主 master 讓其它機器和這臺機器完成時間同步使用rdate完成集群之間時間同步具體操作請參見保存的時間同步頁面說明3:修改hadoop集群的配置文件 按照樣例配置文件修改即可 4:啟動集群過程(安裝過程的啟動)a:啟動zkb: 啟動jn hadoop-daemon.sh start journalnodec:格式化nn (選取text4)d:格式化zkfchdfs zkfc -formatZKe:把text4的nn結構復制到text5先在text4啟動nn在text5執行 hdfs namenode bootstrapstandby如果出現此時在text5并沒有能成功復制text4的nn的目錄機構則直接可以從text4復制到text5 完成2臺nn之間的復制f:啟動集群zk啟動zkfc啟動start-dfs.shstart-yarn.sh(如果那個rm沒有啟動 直接可以使用守護進程)Zookeeper是一個分布式協調服務;就是為用戶的分布式應用程序提供協調服務 A、zookeeper是為別的分布式程序服務的 B、Zookeeper本身就是一個分布式程序(只要有半數以上節點存活,zk就能正常服務) C、Zookeeper所提供的服務涵蓋:主從協調、服務器節點動態上下線、統一配置管理、分布式共享鎖、統一名稱服務…… D、雖然說可以提供各種服務,但是zookeeper在底層其實只提供了兩個功能: 管理(存儲,讀取)用戶程序提交的數據; 并為用戶程序提供數據節點監聽服務;1、Znode有兩種類型: 短暫(ephemeral)(斷開連接自己刪除) 持久(persistent)(斷開連接不刪除) 2、Znode有四種形式的目錄節點(默認是persistent ) PERSISTENT PERSISTENT_SEQUENTIAL(持久序列/test0000000019 ) EPHEMERAL EPHEMERAL_SEQUENTIAL 3、創建znode時設置順序標識,znode名稱后會附加一個值,順序號是一個單調遞增的計數器,由父節點維護 4、在分布式系統中,順序號可以被用于為所有的事件進行全局排序,這樣客戶端可以通過順序號推斷事件的順序1、Zookeeper:一個leader,多個follower組成的集群 2、全局數據一致:每個server保存一份相同的數據副本,client無論連接到哪個server,數據都是一致的 3、分布式讀寫,更新請求轉發,由leader實施 4、更新請求順序進行,來自同一個client的更新請求按其發送順序依次執行 5、數據更新原子性,一次數據更新要么成功,要么失敗 6、實時性,在一定時間范圍內,client能讀到最新數據4.2. zookeeper數據結構 1、層次化的目錄結構,命名符合常規文件系統規范(見下圖) 2、每個節點在zookeeper中叫做znode,并且其有一個唯一的路徑標識 3、節點Znode可以包含數據和子節點(但是EPHEMERAL類型的節點不能有子節點,下一頁詳細講解) 4、客戶端應用可以在節點上設置監視器(后續詳細講解) yarn的流程:1:客戶端向yarn發送請求,開始運行job 2:resourcemanager反饋信息,把切片資源在hdfs的存放目錄發送給客戶端 3:客戶端調用fileinputformat的getsplits方法完成數據切片計算 4:客戶端把切片資源上傳到yarn指定的hdfs目錄下 5:通知yarn資源已上傳,請分配mrappmaster開始執行任務 6:yarn吧客戶端待執行的任務放入任務隊列 7:resourcemanager從某個namemodemanager選擇一臺機器準備運行環境(根據充足的空間判定) 8:制定后 mappermaster從指定位置獲取任務資源(job.split jar job.xml) 9:mappermaster從job.spilit中確認需要運行多少maptask,想yarn的rm申請運行資源 10resourcemanager給mrappmaster反饋可用的運行maptask的資源 11mrappmaster根據yarn提供的資源開始運行maptask 12:當maptask運行完畢,mrappmaster會知道 13:重復申請資源的步驟 開始運行reducetask 14當reducetask運行完畢 回收mappmaster的占用資源yarn只是一個分布式資源分配系統,核心是resourcemanager和nodemanager yarn只負責資源分配,不負責xuanfayunsuan sheff運行流程 map()輸出結果->內存(環形緩沖區,當內存大小達到指定數值,如80%,開始溢寫到本地磁盤) 溢寫之前,進行了分區partition操作,分區的目的在于數據的reduce指向,分區后進行二次排序,第一次是對partitions進行排序, 第二次對各個partition中的數據進行排序,之后如果設置了combine,就會執行類似reduce的合并操作,還可以再進行壓縮, 因為reduce在拷貝文件時消耗的資源與文件大小成正比 內存在達到一定比例時,開始溢寫到磁盤上 當文件數據達到一定大小時,本地磁盤上會有很多溢寫文件,需要再進行合并merge成一個文件 reduce拷貝copy這些文件,然后進行歸并排序(再次merge),合并為一個文件作為reduce的輸入數據wordcount流程: 計算在某個目錄下有n多份文件,每個文件中有n多個單詞,計算每個單詞出現的個數 1:yarn的resourcemanager會反饋給客戶端吧切片信息存放到那個目錄下,然后客戶端通過調用fileinputformat的getsplits進行切片機算,得到3樣數據( a:切片的描述信息jobsplit b:計算所有單詞個數的jar包 c:把這次計算任務job的配置信息放入文件中的job。xml ) 2:放置后會通知yarn分配mappermaster任務,會挑選一臺nodemanager機器作為mrappmaster 3:mrappmaster會根據job的描述信息 根據job。split確定需要幾個 maptaskxiang yarn申請計算資源, 4:yarn反饋給mrappmaster明確的可用的計算資源5:mrappmaster根據yarn提供的資源開始運行maptask 6:maptask從hdfs中依據job。spilt的描述的數據從hdfs 復制到本地的server,調用wordcount程序完成計算 (計算:maptask運行map方法,map方法接受的是文本中的每一行的偏移量, v是每次map方法執行時讀取的一行單詞) 7:map階段執行完后在執行reduce階段, reduce完成對map階段輸出的解過進行合并操作 reducetask的數量是程序員根據需求制定,如果不指定是1 8:reduce完成后會輸出問結果文件1yarn的resourcemanager會反饋給客戶端切片信息放置的目錄,然后客戶端通過調用fileinputformat的getsplits進行切片機算,得到三楊樹局,
(a切片的描述信息b機損所有單詞的jar包c將job的藐視信息放入job。xml) 2客戶端放置后通知yarn分配mrappmaster執行任務,yarn會挑選一臺namenodemanager作為mrappmaster 3mrappmaster會從指定位置下獲取資源,根據job。splits計算需要多少個maptask,并向yarn申請及資源 4yarn反饋資源,mrappmaster給句yarn提供的資源開始運行maptask 5maptask從hdfs中依據job。split的描述的數據從hdfs復制到本地的server,調用wordcount完成計算1.hadoop運行原理 MapReduce HDFS 分布式文件系統(HDFS客戶端的讀寫流程) 寫: 客戶端接收用戶數據,并緩存到本地 當緩存足夠一個HDFS大小的時候 客戶端同NameNode通訊注冊一個新的塊 注冊成功后 NameNode給客戶端返回一個DateNode的列表 客戶端向列表中的第一個DateNode寫入塊 收到所有的DateNode確認信息后,客戶端刪除本地緩存 客戶端繼續發送下一個塊 重復以上步驟 所有數據發送完成后,寫操作完成 讀: 客戶端與NameNode通訊獲取文件的塊位置信息,包括塊的所有冗余備份的位置信息:DateNode列表 客戶端獲取文件位置信息后直接同有文件塊的DateNode通訊,讀取文件 如果第一個DateNode無法連接,客戶端將自動聯系下一個DateNode 如果塊數據的校驗值出錯,則客戶端需要向NameNode報告,并自動聯系下一個DateNode客戶端的hadoop環境:與集群的hadoop包一樣 集群入口:core-site.xml、fs.default.name 緩存塊大小:fs.block.size 存多少份:fs.replication2.mapreduce的原理mapreduce的原理:一個MapReduce框架由一個單獨的master JobTracker和集群節點每一個slave TaskTracker共同組成。 master負責調度構成一個作業的所有任務,在這些slave上,master監控它們的執行,并且重新執行已經失敗的任務。 而slave僅負責執行由maste指派的任務。3.Mapreduce數據傾斜是什么意思?怎么處理? Mapreduce數據傾斜是指我們在分片的時候導不同分片上的數據不均,導致這些分片在并行處理的時候,有的分片執行事件過長, 有的執行時間過短,導致總的執行時間過長的一種現象,通常是由:1.map端的key值過多或者有空值; 2.業務本身的特性;3.某些sql就有數據傾斜;4.建表的時候考慮不周等原因造成的。 處理:* a:增加reducetask的數量* b:在不改變整體統計結果的前提下,可以修改key的設定方式* c:在做關聯時,盡量避免reducetask端的join 可以使用maptask端的join4.combiner的作用,使用時機? Combiner其實也是一種reduce操作,是map運算的后續操作,在map后續對于相同key值做一個簡單合并,減小后續的reduce的計算壓力。數據量小的時候,且輸入的結果不會影響到reduce輸入的結果,且不做平均值的時候,用基于map端之后shuffle端之前的reduce操作。 1 5.MapReduce–如何設置Reducer的個數 1.在代碼中通過:JobConf.setNumReduceTasks(Int numOfReduceTasks)方法設置reducer的個數; 2.在hive中:set mapred.reduce.tasks; 3.reducer的最優個數與集群中可用的reducer的任務槽數相關,一般設置比總槽數微少一些的reducer數量;Hadoop文檔中推薦了兩個公式: 0.95*NUMBER_OF_NODES*mapred.tasktracker.reduce.tasks.maximum 1.75*NUMBER_OF_NODES*mapred.tasktracker.reduce.tasks.maximum 備注:NUMBER_OF_NODES是集群中的計算節點個數; mapred.tasktracker.reduce.tasks.maximum:每個節點所分配的reducer任務槽的個數;6.MR的過程: input —–>spilt—–>map—–>combiner—–>shuffer—> partition—–>reduce—–>output spilt :對數據進行split分片處理,產生K1值和V1值,傳給map map: 數據整理,把數據整理成K2和V2, combiner:如果map輸出內容比較多,reduce計算比較慢,我們可以加個combiner map端的本地化reduce,減少map輸出; shuffer:相同的數據放到一個分區 partiton:如果reduce不是一個,shuffler做一個分區,將相同的K值,分到一個區; 排序方式:hash方式; reduce:shuffer分區結束后交給reduce進行計算,形成K3 V 3 output: 將reduce處理完的 K3和V3交給output輸出; a. 客戶端編寫好mapreduce程序,提交job到jobtracker; b. Jobtracker進行檢查操作,確定輸出目錄是否存在,存在拋出錯誤; c. Jobtracker根據輸入計算輸入分片input split、配置job的資源、初始化作業、分配任務; d. 每個input split創建一個map任務,tasktracker執行編寫好的map函數; e. Combiner階段是可選的,它是一個本地化的reduce操作,合并重復key的值; f. Shuffle一開始就是map做輸出操作,并對結果進行排序,內存使用達到閥值就會spill,把溢出文件寫磁盤,寫磁盤前有個排序操作,map輸出全部做完后,
會合并溢出文件,這個過程中還有個Partitioner操作,一個partitioner對應一個reduce作業,reduce開啟復制線程,復制對應的map輸出文件,復制時候reduce還會進行排序操作和合并文件操作 g. 傳輸完成,執行編寫好的reduce函數,結果保存到hdfs上。7.MR怎么處理小文件: 1.輸入過程合并處理:1.在linux 10000個文件上傳到HDFS時候,用腳本形成二進制文件流上傳,上傳的過程中就合并成了一個文件。 2.如果在hdfs中有大量小文件,首先進行清洗,把10000個小文件清洗成一個文件或者幾個文件,寫個map(1.前提10000小文件格式相同,
2.不會有太多的小文件 一千萬個小文件,首先會在操作系統上傳時就處理完了,但是要是問可以說,分批做,每一萬個存儲到一個目錄中,對一個目錄進行map清洗)),其次,進行reduce計算 清洗會產生數據傾斜: 很多小文件是數據傾斜(解決方法):2.1.基于map端的離散方法;2.2.combiner; //hdfs為什么怕很多小文件:因為很多小文件的話也會占用namenode目錄樹的空間,一般一個文件的元數據會占到150字節;
而NameNode是要接收集群中所有的DataNode的心跳信息,來確定元數據的信息變化的,當小文件一旦過多,namenode的元數據讀取就會變慢。
(在HDFS中,namenode將文件系統中的元數據存儲在內存中,因此,HDFS所能存儲的文件數量會受到namenode內存的限制)8.如何從編程的角度講解MR的過程 對數據進行底層默認分片把數據解析成k1/v1形式傳給map; Map對k1/v1進行截取、運算等操作生成k2/v2傳給reduce; Reduce對相同key的值進行計算,生成最終結果k3/v3輸出9.MR中有沒有只有Map的 有,只對數據進行分片,解析成key/value形式后,直接輸出結果不進行reduce端的去重和數組化的。 eg:比如說我把所有的經過split(map)形成的元素都放到context的key做標簽就不會用到reduce。10.Map輸出端的組成部份 Combiner、shuffle、partitioner11.如何用MR實現join 1) reduce side join(在reduce端做join操作) 在map階段,map函數同時讀取兩個文件File1和File2,為了區分兩種來源的key/value數據對,對每條數據打一個標簽 (tag),
比如:tag=0表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不同文件中的數據打標簽。 在reduce階段,reduce函數獲取key相同的來自File1和File2文件的value list, 然后對于同一個key,對File1和File2中的數據進行join(笛卡爾乘積)。即:reduce階段進行實際的連接操作。 2) map side join(在map端做join操作) 之所以存在reduce side join,是因為在map階段不能獲取所有需要的join字段,即:同一個key對應的字段可能位于不同map中。Reduce side join是非常低效的,因為shuffle階段要進行大量的數據傳輸。 Map side join是針對以下場景進行的優化:兩個待連接表中,有一個表非常大,而另一個表非常小,以至于小表可以直接存放到內存中。這樣,我們可以將小表復制多份,
讓每個map task內存中存在一份(比如存放到hash table中),然后只掃描大表:對于大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,
如果有,則連接后輸出即可。為了支持文件的復制,Hadoop提供了一個類DistributedCache,使用該類的方法如下: (1)用戶使用靜態方法DistributedCache.addCacheFile()指定要復制的文件,它的參數是文件的URI(如果是 HDFS上的文件,可以這樣:
hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口 號)。JobTracker在作業啟動之前會獲取這個URI列表,并將相應的文件拷貝到各個TaskTracker的本地磁盤上。 (2)用戶使用 DistributedCache.getLocalCacheFiles()方法獲取文件目錄,并使用標準的文件讀寫API讀取相應的文件。DistributedCache方法:(DistributedCache 是一個提供給Map/Reduce框架的工具,用來緩存文件(text, archives, jars and so on)文件的默認訪問協議為(hdfs://). DistributedCache將拷貝緩存的文件到Slave節點在任何Job在節點上執行之前。 文件在每個Job中只會被拷貝一次,緩存的歸檔文件會被在Slave節點中解壓縮。) 符號鏈接 每個存儲在HDFS中的文件被放到緩存中后都可以通過一個符號鏈接使用。 URI hdfs://namenode/test/input/file1#myfile 你可以在程序中直接使用myfile來訪問 file1這個文件。 myfile是一個符號鏈接文件。12.MAP如何排序 在map端一共經歷兩次的排序: 當map函數產生輸出時,會首先寫入內存的環形緩沖區,當達到設定的閾值,在刷寫磁盤之前, 后臺線程會將緩沖區的數據劃分成相應的分區。在每個分區中,后臺線程按鍵進行內排序,在Map任務完成之前, 磁盤上存在多個已經分好區,并排好序的、大小和緩沖區一樣的溢寫文件, 這時溢寫文件將被合并成一個已分區且已排序的輸出文件。 由于溢寫文件已經經過第一次排序,所以合并分區文件時只需要再做一次排序就可使輸出文件整體有序。13.什么是inputsplit InputSplit是MapReduce對文件進行處理和運算的輸入單位,只是一個邏輯概念,每個InputSplit并沒有對文件實際的切割 ,只是記錄了要處理的數據的位置(包括文件的path和hosts)和長度(由start和length決定)。14.MR中使用了哪些接口?(或者是抽象類) FileinputFormat、Mapper、Reducer、FileoutputFormat、Combiner、Partitioner等 --------------------- 作者:QianShiK 來源:CSDN 原文:https://blog.csdn.net/QianShiK/article/details/81480854 版權聲明:本文為博主原創文章,轉載請附上博文鏈接!這個業務中map輸出value包含日志數據中每一行的上行流量 下行流量 * 因為一次要輸出3個值 所以我們通常都是編寫一個自定義的類型* * 自定義map reducer的輸出值類型?* a:需要編寫一個java類封裝每一行日志數據中的上行流量和下行流量* b:還需要在a中封裝根據上行流量和下行流量計算總流量* c:因為a中的java類做的是map的輸出值類型 所以它需要符合hadoop io類型* LongWritable Text IntWritable都實現了WritableComparable接口* 反推就是想編寫hadoop io類型就需要實現WritableComparable接口* * WritableComparable接口又包括了* Writable接口(write,readFields兩個方法) 這個接口是負責這個對象的序列化和反序列化* Comparable接口(compareTo一個方法) 這個接口是負責對象比較大小/排序使用的* 所以兩個接口中Writable接口才是標識是否屬于hadoop io的類型* * 又以為map的輸出值不需要排序 只需要序列化 所以我們在這需求需要編寫的map輸出自定義類型只* 需要實現Writable接口即可切片怎么切的:切片的數量不是越大或者越小寫好,而是要根據每次計算的實際數據涼,自定義優化的切片大小來控制切片的數量 比如有一共300m,前128m以一個切片,如果129到300m的存儲空間小可以直接是一個切片NIO 1:什么是NIO? NIO是基于通道和緩存的非阻塞IO。2:IO 和 NIO的區別?a:通道在IO中只是一個便于理解的虛擬概念 而在NIO中通道是一個實際的概念b:在IO中最底層的傳輸數據是字節 而在NIO中最小的傳輸都是緩存c:在IO中 虛擬的通道直接會和數據接觸 在NIO中通道直接面對的不是數據 而是緩存d:在IO中某一個通道通常都是單向的 在NIO中通道是雙向的e:IO和NIO針對數據傳輸內存使用的方式不同IO是面向流的 NIO是面向緩存的3:NIO中的緩存 在NIO中傳輸數據 都是把數據先放入某個緩存中 再在某個通道中 按照緩存傳輸 在java中原有針對NIO的開發包 java.nio.*在java.nio包下直接都是可以使用的緩存類: ByteBuffer CharBuffer DoubleBuffer FloatBuffer IntBuffer LongBuffer MappedByteBuffer ShortBuffer 4:直接緩沖區和非直接緩沖區a:系統內存(系統內核內存) 和 JVM內存(程序內存的區別)b:IO是把數據先傳入JVM內存 再從JVM內存復制到系統內存 組后從系統內存寫入目的硬盤NIO是把數據可以先傳入硬盤內存區 再從硬盤內存區直接寫入目的硬盤buffer.allocate方法創建的緩沖區是在非直接緩沖區申請的內存buffer.allocateDirect一旦使用 就是在直接緩沖區申請的內存5:通道 通道是為了替代cpu完成io操作 從而提升cpu的利用率
?