概念
hadoop是一個大數據的分布式存儲,調度,計算框架。也可以說是一個生態圈,包含很多技術:Hive、Hbase、Flume、Kafka...
Hadoop的優點
- Hadoop具有存儲和處理數據能力的高可靠性。
- Hadoop通過可用的計算機集群分配數據,完成存儲和計算任務,這些集群可以方便地擴展到數以
- 千計的節點中,具有高擴展性。
- Hadoop能夠在節點之間進行動態地移動數據,并保證各個節點的動態平衡,處理速度非常快,具
- 有高效性。
- Hadoop能夠自動保存數據的多個副本,并且能夠自動將失敗的任務重新分配,具有高容錯性。
Hadoop的缺點
- Hadoop不適用于低延遲數據訪問。
- Hadoop不能高效存儲大量小文件。
- Hadoop不支持多用戶寫入并任意修改文件。
集群搭建
下載:https://archive.apache.org/dist/hadoop/common/hadoop-2.9.2/
集群規劃
框架 | linux121 | linux122 | linux123 |
HDFS | NameNode,DataNode | DataNode | SecondaryNameNode,DataNode |
YARN | NodeManager | NodeManager | NodeManager,ResourceManager |
解壓到安裝目錄:tar -zxvf hadoop-2.9.2.tar.gz -C /opt/lxq/servers
編輯環境變量:vim /etc/profile
# HADOOP_HOMEexport HADOOP_HOME=/opt/lxq/servers/hadoop-2.9.2export PATH=$PATH:$HADOOP_HOME/binexport PATH=$PATH:$HADOOP_HOME/sbin
使環境變量生效:source /etc/profile
驗證hadoop:hadoop version

集群配置
vim hadoop-env.shexport JAVA_HOME=/opt/lxq/servers/jdk1.8.0_231vim core-site.xml<!-- 指定HDFS中NameNode的地址 --><property><name>fs.defaultFS</name><value>hdfs://linux121:9000</value></property><!-- 指定Hadoop運行時產生文件的存儲目錄 --><property><name>hadoop.tmp.dir</name><value>/opt/lxq/servers/hadoop-2.9.2/data/tmp</value></property>vim hdfs-site.xml<!-- 指定Hadoop輔助名稱節點主機配置 --><property><name>dfs.namenode.secondary.http-address</name><value>linux123:50090</value></property><!--副本數量 --><property><name>dfs.replication</name><value>3</value></property>vim slaves 這里要注意不能有空格,不能有空行linux121linux122linux123vim mapred-env.shexport JAVA_HOME=/opt/lxq/servers/jdk1.8.0_231mv mapred-site.xml.template mapred-site.xmlvim mapred-site.xml<!-- 指定MR運行在Yarn上 --><property><name>mapreduce.framework.name</name><value>yarn</value></property><!-- 歷史服務器端地址 --><property><name>mapreduce.jobhistory.address</name><value>linux121:10020</value></property><!-- 歷史服務器web端地址 --><property><name>mapreduce.jobhistory.webapp.address</name><value>linux121:19888</value></property><!-- 文件壓縮 --><property><name>mapreduce.output.fileoutputformat.compress</name><value>true</value></property><property><name>mapreduce.output.fileoutputformat.compress.type</name><value>RECORD</value></property><property><name>mapreduce.output.fileoutputformat.compress.codec</name><value>org.apache.hadoop.io.compress.SnappyCodec</value></property>vim yarn-site.xml<!-- 指定YARN的ResourceManager的地址 --><property><name>yarn.resourcemanager.hostname</name><value>linux123</value></property><!-- Reducer獲取數據的方式 --><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><!-- 日志聚集功能?--><property><name>yarn.log-aggregation-enable</name><value>true</value></property><!-- 日志保留時間設置7天 --><property><name>yarn.log-aggregation.retain-seconds</name><value>604800</value></property><property><name>yarn.log.server.url</name><value>http://linux121:19888/jobhistory/logs</value></property><!-- 指定我們的任務調度使用fairScheduler的調度方式 --><property><name>yarn.resourcemanager.scheduler.class</name><value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value><description>In case you do not want to use the default scheduler</description></property>
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<allocations>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<queue name="root" >
<queue name="default">
<aclAdministerApps>*</aclAdministerApps>
<aclSubmitApps>*</aclSubmitApps>
<maxResources>9216 mb,4 vcores</maxResources>
<maxRunningApps>100</maxRunningApps>
<minResources>1024 mb,1vcores</minResources>
<minSharePreemptionTimeout>1000</minSharePreemptionTimeout>
<schedulingPolicy>fair</schedulingPolicy>
<weight>7</weight>
</queue>
<queue name="queue1">
<aclAdministerApps>*</aclAdministerApps>
<aclSubmitApps>*</aclSubmitApps>
<maxResources>4096 mb,4vcores</maxResources>
<maxRunningApps>5</maxRunningApps>
<minResources>1024 mb, 1vcores</minResources>
<minSharePreemptionTimeout>1000</minSharePreemptionTimeout>
<schedulingPolicy>fair</schedulingPolicy>
<weight>3</weight>
</queue>
</queue>
<queuePlacementPolicy>
<rule create="false" name="specified"/>
<rule create="true" name="default"/>
</queuePlacementPolicy>
</allocations>
賦予權限:chown -R root:root /opt/lxq/servers/hadoop-2.9.2
安裝分發工具:yum install -y rsync
用法:rsync -rvl /opt/lxq/software/?root@linux122:/opt/lxq/software
編寫分發腳本 vim /usr/local/bin/rsync-script
#!/bin/bash
#1 獲取命令輸入參數的個數,如果個數為0,直接退出命令
paramnum=$#
if((paramnum==0)); then
echo no params;
exit;
fi
#2 根據傳入參數獲取文件名稱
p1=$1
file_name=`basename $p1`
echo fname=$file_name
#3 獲取輸入參數的絕對路徑
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir
#4 獲取用戶名稱
user=`whoami`
#5 循環執行rsync
for((host=121; host<124; host++)); do
echo ------------------- linux$host --------------
rsync -rvl $pdir/$file_name $user@linux$host:$pdir
done
賦予腳本權限:chmod 777?/usr/local/bin/rsync-script
拓展一些
chmod命令
用來變更文件或目錄的權限。在UNIX系統家族里,文件或目錄權限的控制分別以讀取、寫入、執行3種一般權限來區分,另有3種特殊權限可供運用。用戶可以使用chmod指令去變更文件與目錄的權限,設置方式采用文字或數字代號皆可。符號連接的權限無法變更,如果用戶對符號連接修改權限,其改變會作用在被連接的原始文件。
權限范圍的表示法如下:
u User,即文件或目錄的擁有者;
g Group,即文件或目錄的所屬群組;
o Other,除了文件或目錄擁有者或所屬群組之外,其他用戶皆屬于這個范圍;
a All,即全部的用戶,包含擁有者,所屬群組以及其他用戶;
r 讀取權限,數字代號為“4”; w 寫入權限,數字代號為“2”;
x 執行或切換權限,數字代號為“1”;
- 不具任何權限,數字代號為“0”;
s 特殊功能說明:變更文件或目錄的權限。
語法 chmod(選項)(參數)
選項
-c或——changes:效果類似“-v”參數,但僅回報更改的部分;
-f或--quiet或——silent:不顯示錯誤信息;
-R或——recursive:遞歸處理,將指令目錄下的所有文件及子目錄一并處理;
-v或——verbose:顯示指令執行過程;
--reference=<參考文件或目錄>:把指定文件或目錄的所屬群組全部設成和參考文件或目錄的所屬群組相同;
<權限范圍>+<權限設置>:開啟權限范圍的文件或目錄的該選項權限設置;
<權限范圍>-<權限設置>:關閉權限范圍的文件或目錄的該選項權限設置;
<權限范圍>=<權限設置>:指定權限范圍的文件或目錄的該選項權限設置;
參數
權限模式:指定文件的權限模式;
文件:要改變權限的文件。
例:
rwx rw- r-- r=讀取屬性 //值=4
w=寫入屬性 //值=2
x=執行屬性 //值=1
chmod u+x,g+w f01 //為文件f01設置自己可以執行,組員可以寫入的權限
chmod u=rwx,g=rw,o=r f01
chmod 764 f01
chmod a+x f01 //對文件f01的u,g,o都設置可執行屬性 文件的屬主和屬組屬性設置
chown user:market f01 //把文件f01給uesr,添加到market組
ll -d f1 查看目錄f1的屬性
chown命令
改變某個文件或目錄的所有者和所屬的組,該命令可以向某個用戶授權,使該用戶變成指定文件的所有者或者改變文件所屬的組。用戶可以是用戶或者是用戶D,用戶組可以是組名或組id。文件名可以使由空格分開的文件列表,在文件名中可以包含通配符。 只有文件主和超級用戶才可以便用該命令。
語法 chown(選項)(參數)
選項
-c或——changes:效果類似“-v”參數,但僅回報更改的部分;
-f或--quite或——silent:不顯示錯誤信息;
-h或--no-dereference:只對符號連接的文件作修改,而不更改其他任何相關文件;
-R或——recursive:遞歸處理,將指定目錄下的所有文件及子目錄一并處理;
-v或——version:顯示指令執行過程;
--dereference:效果和“-h”參數相同;
--help:在線幫助;
--reference=<參考文件或目錄>:把指定文件或目錄的擁有者與所屬群組全部設成和參考文件或目錄的擁有者與所屬群組相同;
--version:顯示版本信息。
參數
用戶:組:指定所有者和所屬工作組。當省略“:組”,僅改變文件所有者;
文件:指定要改變所有者和工作組的文件列表。支持多個文件和目標,支持shell通配符。
實例 將目錄/usr/meng及其下面的所有文件、子目錄的文件主改成 liu:
chown -R liu /usr/meng
分發hadoop到集群其它節點:rsync-script /opt/lxq/servers/hadoop-2.9.2
第一次啟動格式化(不是第一次不用這句命令):hadoop namenode -format
群起yarn:start-yarn.sh [stop-yarn.sh]
群起hdfs:start-dfs.sh [stop-dfs.sh]
歷史服務器起關命令
$HODOOP_HOME/sbin/mr-jobhistory-daemon.sh start historyserver?
$HODOOP_HOME/sbin/mr-jobhistory-daemon.sh stop?historyserver?
HDFS WEB界面:http://linux121:50070/dfshealth.html#tab-overview
歷史服務器web頁面:http://linux121:19888/jobhistory
查看啟動的服務命令:jps
HDFS命令
hdfs dfs -help rm
hdfs dfs -ls /
hdfs dfs -mkdir -p /a/b/c
hdfs dfs -removeFromLocal /opt/lxq/a.txt /a/b/c/
hdfs dfs -appendToFile /xx /xx/xx.csv
hdfs dfs -cat /a/b/c/a.txt
hdfs dfs -chmod 666 /a/b/c/a.txt
hdfs dfs -chown root:root /a/b/c/a.txt
hdfs dfs -copyFromLocal /opt/lxq/b.txt /a/b/c/
hdfs dfs -cp /a/b/c/a.txt /a/b/a.txt
hdfs dfs -mv /a/b/a.txt /a/b/c/d/
hdfs dfs -get /a/b/c/a.txt
hdfs dfs -copyToLocal /a/b/c/a.txt /opt/lxq/data/
hdfs dfs -put xxx xxx
hdfs dfs -tail /xx/xx/xx.log
hdfs dfs -rm -r /a/b/c/d
hdfs dfs -du -s -h /a
hdfs dfs -du -h /a
hdfs dfs -setrep 10 /a/b/c/a.txt
Java整合Hadoop的依賴
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.9.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.9.2</version></dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.9.2</version>
</dependency>
Java HDFSUtils類
package ;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;@Slf4j
public class HDFSUtil {private static final Configuration configuration = new Configuration();private static volatile FileSystem fileSystem = null;private HDFSUtil() {}private static FileSystem getFS() {if (null == fileSystem) {synchronized (HDFSUtil.class) {if (null == fileSystem) {try {fileSystem = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");} catch (IOException | InterruptedException | URISyntaxException e) {throw new RuntimeException(e);}}}}return fileSystem;}/*** 獲取 HDFS 集群節點信息** @param hdfsUri 集群路徑* @return List<String>* @author lxq* @since 2025-07-31*/public static DatanodeInfo[] getHDFSNodes(String hdfsUri) {if (StringUtils.isBlank(hdfsUri)) {return null;}DatanodeInfo[] dataNodeStats = new DatanodeInfo[0];try (FileSystem fs = getFS()) {// 獲取分布式文件系統DistributedFileSystem hdfs = (DistributedFileSystem) fs;dataNodeStats = hdfs.getDataNodeStats();} catch (IOException e) {log.error("Get DataNode Info exception:", e);}return dataNodeStats;}/*** 獲取目標路徑下的所有文件或者文件夾的全路徑列表** @param target 目標路徑* @return List<String>* @author lxq* @since 2025-07-31*/public static List<String> listFile(String target) {if (StringUtils.isBlank(target)) {return null;}try (FileSystem fs = getFS()) {FileStatus[] status = fs.listStatus(new Path(target));/*for (FileStatus s : status) {s.isFile();s.isDirectory();}*/// 獲取目錄下的所有文件路徑return Arrays.stream(FileUtil.stat2Paths(status)).map(Path::toString).collect(Collectors.toList());} catch (IllegalArgumentException | IOException e) {log.error("list file exception:", e);}return null;}/*** @param target* @return*/public static List<LocatedFileStatus> getFileLocatedStatus(String target) {List<LocatedFileStatus> locatedFileStatusList = new ArrayList<>();if (StringUtils.isNotBlank(target)) {try (FileSystem fs = getFS()) {RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path(target), true);while (listFiles.hasNext()) {LocatedFileStatus status = listFiles.next();// 輸出詳情// 文件名稱System.out.println(status.getPath().getName());// 長度System.out.println(status.getLen());// 權限System.out.println(status.getPermission());// 分組System.out.println(status.getGroup());// 獲取存儲的塊信息BlockLocation[] blockLocations = status.getBlockLocations();for (BlockLocation blockLocation : blockLocations) {// 獲取塊存儲的主機節點String[] hosts = blockLocation.getHosts();for (String host : hosts) {System.out.println(host);}}}} catch (Exception e) {log.error("list file located status exception:", e);}}return locatedFileStatusList;}/*** 查找某個文件在 HDFS集群的位置*/public static BlockLocation[] getFileBlockLocations(String target) {if (StringUtils.isBlank(target)) {return null;}// 文件塊位置列表BlockLocation[] blkLocations = new BlockLocation[0];try (FileSystem fs = getFS()) {// 獲取文件目錄FileStatus filestatus = fs.getFileStatus(new Path(target));// 獲取文件塊位置列表blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());} catch (IOException e) {log.error("Block Location exception:", e);}return blkLocations;}/*** 創建文件夾 是不能創建文件的*/public static void mkdir(String target) {if (StringUtils.isBlank(target)) {return;}try (FileSystem fs = getFS()) {fs.mkdirs(new Path(target));log.info("Dir:{} Create Success.", target);} catch (Exception e) {log.error("make dir exception!", e);}}/*** 上傳文件** @param sourcePath 源路徑* @param targetPath 目標路徑* @author lxq* @since 2025-07-31*/public static void uploadFile(String sourcePath, String targetPath) {if (StringUtils.isBlank(sourcePath) || StringUtils.isBlank(targetPath)) {return;}try (FileSystem fs = getFS()) {File file = new File(sourcePath);if (!file.exists()) {return;}if (file.isDirectory()) {// ... 需要完善文件夾的處理return;}String filename = file.getName();fs.copyFromLocalFile(new Path(sourcePath), new Path(targetPath + "/" + filename));log.info("Had Upload File:{} To Hdfs:{}", sourcePath, targetPath);} catch (Exception e) {log.error("upload file exception!", e);}}/*** 上傳文件** @param sourcePath 源路徑* @param targetPath 目標路徑* @author lxq* @since 2025-07-31*/public static void downFile(String sourcePath, String targetPath) {if (StringUtils.isBlank(sourcePath) || StringUtils.isBlank(targetPath)) {return;}try (FileSystem fs = getFS()) {// boolean delSrc 指是否將原文件刪除// Path src 指要下載的文件路徑// Path dst 指將文件下載到的路徑// boolean useRawLocalFileSystem 是否開啟文件校驗fs.copyToLocalFile(false, new Path(sourcePath), new Path(targetPath), true);log.info("Had Download File:{} To {}", sourcePath, targetPath);} catch (Exception e) {log.error("download file exception!", e);}}/*** 刪除文件 / 文件夾** @param target 目標文件或者文件夾* @author lxq* @since 2025-07-31*/public static void delFileOrDir(String target) {try (FileSystem fs = getFS()) {if (StringUtils.isNotBlank(target)) {// 刪除文件或者文件目錄 delete(Path f) 此方法已經棄用fs.delete(new Path(target), true);log.info("Had Deleted File Or Dir Under the {} From Hdfs", target);}} catch (Exception e) {log.error("delete file or dir exception!", e);}}/*** 判斷目錄是否存在** @param target 目標路徑* @param create 不存在是否創建* @return 是否存在路徑*/public static boolean existDir(String target, boolean create) {if (StringUtils.isBlank(target)) {return false;}try (FileSystem fs = getFS()) {Path path = new Path(target);if (create) {if (!fs.exists(path)) {fs.mkdirs(path);}}if (fs.isDirectory(path)) {return true;}} catch (Exception e) {log.error("exist Dir exception:", e);}return false;}/************************** 流相關API *****************************//*** 流方式 文件上傳*/public static void uploadWithStream(String sourcePath, File file, String targetPath) {if ((StringUtils.isNotBlank(sourcePath) || null != file) && StringUtils.isNotBlank(targetPath)) {try (FileSystem fs = getFS()) {if (null == file) {file = new File(sourcePath);}if (!file.exists()) {return;}if (file.isDirectory()) {return;}String filename = file.getName();FileInputStream fis = new FileInputStream(file);FSDataOutputStream fos = fs.create(new Path(targetPath + "/" + filename));IOUtils.copyBytes(fis, fos, configuration);IOUtils.closeStream(fos);IOUtils.closeStream(fis);log.info("Had Upload File:{} To Hdfs:{} With Stream.", sourcePath, targetPath);} catch (Exception e) {log.error("upload file with stream exception!", e);}}}/*** 流方式 文件下載*/public static void downloadWithStream(String sourcePath, String targetPath) {if (StringUtils.isNotBlank(sourcePath) && StringUtils.isNotBlank(targetPath)) {try (FileSystem fs = getFS()) {Path path = new Path(sourcePath);/*if (!fs.exists(path)) {}if (fs.isDirectory(path)) {}*/FSDataInputStream fis = fs.open(path);FileOutputStream fos = new FileOutputStream(new File(targetPath));IOUtils.copyBytes(fis, fos, configuration);IOUtils.closeStream(fos);IOUtils.closeStream(fis);log.info("Had Download File:{} From Hdfs:{} With Stream.", sourcePath, targetPath);} catch (Exception e) {log.error("download file with stream exception!", e);}}}
}
感謝閱讀!!!