文章目錄
- hadoop完全分布式部署
- hdfs-site.xml
- core-site.xml
- marpred-site.xml
- yarn-site.xml
- spark集群部署
- spark-env.sh
- mongodb分片模式部署
- config 服務器
- 初始化config 副本集
- shard 服務器
- 初始化shard 副本集
- mongos服務器
- 添加shard
- 設置chunk大小
- 啟動分片
- 為集合 user 創建索引并進行分片
- fillder抓包+數據采集
- spark清洗數據
- Flask+echarts進行可視化
- 效果圖
hadoop完全分布式部署
hdfs-site.xml
<configuration><property><name>dfs.namenode.name.dir</name><value>file:/usr/local/src/hadoop/dfs/name</value></property><property><name>dfs.datanode.data.dir</name><value>file:/usr/local/src/hadoop/dfs/data</value></property><property><name>dfs.replication</name><value>3</value></property></property><!-- Secondary NameNode 所在主機的ip和端口 --><property><name>dfs.namenode.secondary.http-address</name><value>master02:9890</value></property>
</configuration>
core-site.xml
<configuration><!-- --><!-- 設置hadoop的文件系統,由URI指定 --><property><!-- 指定namenode地址節點所在機器 --><name>fs.defaultFS</name><value>hdfs://master01:9000</value></property><!-- 配置hadoop臨時目錄,默認/tmp/hadoop-${user.name} --><property><name>hadoop.tmp.dir</name><value>/usr/local/src/hadoop/tmp/</value></property></configuration>
marpred-site.xml
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>mapreduce.jobhistory.address</name><value>master01:10020</value></property>
</configuration>
yarn-site.xml
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>mapreduce.jobhistory.address</name><value>master01:10020</value></property>
</configuration>
spark集群部署
spark-env.sh
export JAVA_HOME=/usr/local/src/jdk1.8.0_152
export HADOOP_CONF_DIR=/usr/local/src/hadoop-3.2.2/etc/hadoop/
export SPARK_MASTER_IP=master01
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_MEMORY=512m
export SPARK_WORKER_CORES=1
export SPARK_EXECUTOR_MEMORY=512m
export SPARK_EXECUTOR_CORES=1
export SPARK_WORKER_INSTANCES=1
mongodb分片模式部署
服務器名稱 | IP地址 | Shard1 (端口/角色) | Shard2 (端口/角色) | Shard3 (端口/角色) | mongos (端口) | Config Server (端口/角色) |
---|---|---|---|---|---|---|
master01 | 192.168.121.134 | 27018 (主節點) | 27020 (仲裁節點) | 27019 (副節點) | 27021 | 27022 (主節點) |
node01 | 192.168.121.135 | 27019 (副節點) | 27018 (主節點) | 27020 (仲裁節點) | 27021 | 27022 (副節點) |
node02 | 192.168.121.136 | 27020 (仲裁節點) | 27019 (副節點) | 27018 (主節點) | - | 27022 (副節點) |
config 服務器
dbpath=/usr/local/src/mongodb_demo/shardcluster/configServer/data
logpath=/usr/local/src/mongodb_demo/shardcluster/configServer/logs/config_server.log
port=27022
bind_ip=master01
logappend=true
fork=trues
maxConns=5000
replSet=configs
configsvr=true
初始化config 副本集
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/configServer/configFile/mongodb_config.conf$ ./mongo --host master01 --port 27022
> rs.initiate()
configs:SECONDARY> rs.add('node01:27022')
configs:PRIMARY> rs.add('node02:27022')
shard 服務器
dbpath=/usr/local/src/mongodb_demo/shardcluster/shard/shard1_data
logpath=/usr/local/src/mongodb_demo/shardcluster/shard/logs/shard1.log
port=27018
bind_ip=master01
logappend=true
fork=true
maxConns=5000
replSet=shard1
shardsvr=true
類似地,編輯 shard2.conf 和 shard3.conf,分別修改 dbpath, logpath, port, bind_ip, 和 replSet 參數。
初始化shard 副本集
# 所有節點都啟動
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/shard/configFile/mongodb_shard1.conf
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/shard/configFile/mongodb_shard2.conf
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/shard/configFile/mongodb_shard3.conf# master01節點
$ ./mongo --host master01 --port 27018
> rs.initiate()
configs:SECONDARY> rs.add('node01:27019')
configs:PRIMARY> rs.add('node02:27020')# node01節點
$ ./mongo --host node01--port 27018
> rs.initiate()
configs:SECONDARY> rs.add('node02:27019')
configs:PRIMARY> rs.add('master01 :27020')# node02節點
$ ./mongo --host node02 --port 27018
> rs.initiate()
configs:SECONDARY> rs.add('master01 :27019')
configs:PRIMARY> rs.add('node01:27020')
mongos服務器
logpath=/usr/local/src/mongodb_demo/shardcluster/mongos/logs/mongos.log
logappend=true
port=27021
bind_ip=master01
fork=true
configdb=configs/master01:27022,node01:27022,node02:27022
maxConns=20000
類似地,在node01節點配置
添加shard
$ ./mongo --host master01--port 27021mongos> use gateway
mongos> sh.addShard("shard1/master01:27018,node01:27019,node02:27020")
mongos> sh.addShard("shard2/master01:27020,node01:27018,node02:27019")
mongos> sh.addShard("shard3/master01:27019,node01:27020,node02:27018")
設置chunk大小
use config;
db.settings.insertOne({ _id: "chunksize", value: 64 });
啟動分片
mongos> use gateway
mongos> sh.enableSharding("xxxdatabase")
為集合 user 創建索引并進行分片
mongos> use xxxdatabase
mongos> db.user.createIndex({"id":1})
mongos> sh.shardCollection("xxxdatabase.xxxcollection",{"id":1})
fillder抓包+數據采集
def data_SalesVolume(self,allcity):'''獲取全國和各城市車輛銷售量排名數據結果存儲到本地和hdfs:param allcity: 城市名稱-->list:return:'''data_list = []for city in allcity:city_quote = quote(city)newurl = self.url+f'method=jsb.app.fetch&rank_data_type=116&month=202412&energy_type&price=0%2C-1&manufacturer&rank_city_name={city_quote}&market_time=0&offset=0&count=500&scm_version=1.0.0.2209&iid=3991681621300419&device_id=3991681621296323&ac=wifi&channel=home&aid=36&app_name=automobile&version_code=748&version_name=7.4.8&device_platform=android&os=android&ab_client=a1%2Cc2%2Ce1%2Cf2%2Cg2%2Cf7&ab_group=3167590%2C3577236&ssmix=a&device_type=PCLM10&device_brand=OPPO&language=zh&os_api=28&os_version=9&manifest_version_code=748&resolution=720*1280&dpi=320&update_version_code=7483&_rticket=1738335686380&cdid=637e94d4-1e98-49a9-9b9e-7f567951c149&rom_version=coloros__pq3b.190801.10161630+release-keys&longi_lati_type=0&longi_lati_time=0&content_sort_mode=0&total_memory=3.85&cpu_name=Qualcomm+Technologies%2C+Inc+MSM8998&overall_score=8.6995&cpu_score=7.9848&host_abi=armeabi-v7a'print(f'準備獲取=={city}==的數據---{newurl}---')citynames = self.session.get(url=newurl, headers=self.headers).json()city_sales = {city:citynames}data_list.append(city_sales)# print(citynames)print('----------獲取銷量數據完成-----------')data_json = {'SalesValue':data_list}with open(f'data/data_salesvolume.json','w',encoding='utf-8') as f:json.dump(data_json,f,ensure_ascii=False,indent=4)hdfs_path = "/data_doncar/data_carkm.json" # HDFS 目標路徑self.hdfs_client.create(hdfs_path, data_json, overwrite=True) # 上傳到 HDFS
spark清洗數據
創建對象
package org.exampleimport org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._object DonCar_spark {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)Logger.getLogger("akka").setLevel(Level.ERROR)val spark = SparkSession.builder().appName("car_analysis").master("local[*]").config("spark.mongodb.read.connection.uri", "mongodb://root:123456@192.168.121.134:27017").getOrCreate()
連接mongodb數據庫
def readmongocollection(db:String,collection:String) = {spark.read.format("mongodb").option("database",db).option("collection",collection).load()}
創建臨時表
val car_info = readmongocollection("doncar_top","car_info")val citys_sv = readmongocollection("doncar_top","citys_sv")val nationwide_sv = readmongocollection("doncar_top","nationwide_sv")citys_sv.createOrReplaceTempView("citys_table")car_info.createOrReplaceTempView("car_table")nationwide_sv.createOrReplaceTempView("nationwide_table")
數據分析 并且把數據存到HDFS
// 各城市汽車銷量val citys_sales = spark.sql("select city,sales from citys_table").groupBy("city").sum("sales")// 電車續航排名val brand_max_km = spark.sql("select brand,range_km from car_table").groupBy("brand").max()brand_max_km.limit(7).show()// 油電銷量val car_type_sales =spark.sql("select sales,car_type from nationwide_table ").withColumn("category",when(col("car_type") === 0,0).otherwise(1)).groupBy("category").sum("sales")car_type_sales.show()// 能源占比
// 0 純油
// 1 純電
// 2 混動
// 3-4 增程val car_type = spark.sql("select car_type,sales from nationwide_table").groupBy("car_type").sum("sales")// car_type.printSchema()val car_type2 = car_type.withColumn("sum(sales)",when(col("car_type") === 3,col("sum(sales)")+car_type.filter(col("car_type") === 4).select("sum(sales)").first().getLong(0)).otherwise(col("sum(sales)")))car_type2.show()//銷量——價格關系val price_sales = spark.sql("SELECT min_price,max_price,sales from nationwide_table").withColumn("average_price",(col("min_price") + col("max_price")) / 2).drop("min_price","max_price")price_sales.show()// 品牌銷量val brand_sales = spark.sql("SELECT brand_name,model,sales FROM nationwide_table").groupBy("brand_name").sum("sales").sort(col("sum(sales)").desc)brand_sales.show()// 城市銷量排名val city_top = citys_sales.sort(col("sum(sales)").desc)city_top.show()// 車輛銷售排名val car_top = spark.sql("SELECT model,sales FROM nationwide_table").sort(col("sales").desc)car_top.show()}
}
Flask+echarts進行可視化
from flask import Flask, render_template, redirect, url_for, request, flash, session,jsonify
from SparkAnalysis import analysis
from SparkAnalysis import Get_HdfsData
import pymysqlapp = Flask(__name__)#重定向路由
@app.route('/')
def default():return redirect(url_for('login'))@app.route('/login', methods=['GET', 'POST'])
def login():if request.method == 'POST':username = request.form['username']password = request.form['password']conn = pymysql.connect(host='localhost', # 數據庫主機地址user='root', # 數據庫用戶名password='123456', # 數據庫密碼database='users' # 數據庫名稱)with conn.cursor() as cursor:cursor.execute("SELECT * FROM account WHERE username = %s AND password = %s", (username, password))account = cursor.fetchone() # 獲取第一行記錄conn.close()if account:return redirect(url_for('index'))else:return redirect(url_for('login'))return render_template('login.html')@app.route('/register', methods=['GET', 'POST'])
def register():if request.method == 'POST':username = request.form['username']password = request.form['password']conn = pymysql.connect(host='localhost',user='root',password='123456',database='users')with conn.cursor() as cursor:cursor.execute("SELECT * FROM account WHERE username = %s", (username,))if cursor.fetchone():return redirect(url_for('register'))# 如果用戶名不存在,插入新用戶cursor.execute("INSERT INTO account (username, password) VALUES (%s, %s)", (username, password))conn.commit()conn.close()return redirect(url_for('login'))return render_template('register.html')@app.route('/index')
def index():return render_template('index.html',map_data=Get_HdfsData.get_map_data(),bar1_x = analysis.get_bar1_data()['brand'].values.tolist(),# bar1_y=Get_HdfsData.get_bar1_data()['range_km'].values.tolist(),bar1_y=[float(num) for num in analysis.get_bar1_data()['range_km'].values.tolist()],type_0 = Get_HdfsData.get_num_data()[0][2:],type_n0 = Get_HdfsData.get_num_data()[1][2:],pie_data = Get_HdfsData.get_pie_data(),scatter_data = Get_HdfsData.get_scatter_data(),bar2_x=analysis.get_bar2_data()['brand_name'].values.tolist()[:5],bar2_y=analysis.get_bar2_data()['sales'].values.tolist()[:5],line1_x_top=Get_HdfsData.get_line1_data()['city'].values.tolist()[:12],line1_y_top=Get_HdfsData.get_line1_data()['value'].values.tolist()[:12],line1_x_last=analysis.get_line1_data()['city'].values.tolist()[-12:],line1_y_last=analysis.get_line1_data()['value'].values.tolist()[-12:],line2_x_top=Get_HdfsData.get_line2_data()['model'].values.tolist()[:10],line2_y_top=Get_HdfsData.get_line2_data()['sales'].values.tolist()[:10],)if __name__ == '__main__':app.run(host='127.0.0.1', port=5001)
echarts部分代碼
(function() {var myChart = echarts.init(document.querySelector(".map .chart"));echarts.registerMap('china', chinaData);option = {tooltip: {trigger: 'item',formatter: '{b}<br/>{c} (輛)'},visualMap: {min: 1,max: 40000,text: ['High', 'Low'],realtime: false,calculable: true,inRange: {color: ['lightskyblue', 'yellow', 'orangered']},textStyle: {color: 'red' // 設置字體顏色為紅色}},series: [{name: 'city_map',type: 'map',map: 'china',data: map_data,roam: true // 在這里添加 roam 屬性}]};myChart.setOption(option);// 監聽瀏覽器縮放,圖表對象調用縮放resize函數window.addEventListener("resize", function() {myChart.resize();});}
)();