【Project】基于spark-App端口懂車帝數據采集與可視化

文章目錄

      • hadoop完全分布式部署
        • hdfs-site.xml
        • core-site.xml
        • marpred-site.xml
        • yarn-site.xml
      • spark集群部署
        • spark-env.sh
      • mongodb分片模式部署
        • config 服務器
          • 初始化config 副本集
        • shard 服務器
          • 初始化shard 副本集
        • mongos服務器
          • 添加shard
          • 設置chunk大小
        • 啟動分片
          • 為集合 user 創建索引并進行分片
      • fillder抓包+數據采集
      • spark清洗數據
      • Flask+echarts進行可視化
      • 效果圖

hadoop完全分布式部署

hdfs-site.xml
<configuration><property><name>dfs.namenode.name.dir</name><value>file:/usr/local/src/hadoop/dfs/name</value></property><property><name>dfs.datanode.data.dir</name><value>file:/usr/local/src/hadoop/dfs/data</value></property><property><name>dfs.replication</name><value>3</value></property></property><!-- Secondary NameNode 所在主機的ip和端口  --><property><name>dfs.namenode.secondary.http-address</name><value>master02:9890</value></property>
</configuration>
core-site.xml
<configuration><!--   --><!-- 設置hadoop的文件系統,由URI指定  --><property><!-- 指定namenode地址節點所在機器  --><name>fs.defaultFS</name><value>hdfs://master01:9000</value></property><!-- 配置hadoop臨時目錄,默認/tmp/hadoop-${user.name}  --><property><name>hadoop.tmp.dir</name><value>/usr/local/src/hadoop/tmp/</value></property></configuration>
marpred-site.xml
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>mapreduce.jobhistory.address</name><value>master01:10020</value></property>
</configuration>
yarn-site.xml
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>mapreduce.jobhistory.address</name><value>master01:10020</value></property>
</configuration>

spark集群部署

spark-env.sh
export JAVA_HOME=/usr/local/src/jdk1.8.0_152
export HADOOP_CONF_DIR=/usr/local/src/hadoop-3.2.2/etc/hadoop/
export SPARK_MASTER_IP=master01
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_MEMORY=512m
export SPARK_WORKER_CORES=1
export SPARK_EXECUTOR_MEMORY=512m
export SPARK_EXECUTOR_CORES=1
export SPARK_WORKER_INSTANCES=1

mongodb分片模式部署

服務器名稱IP地址Shard1 (端口/角色)Shard2 (端口/角色)Shard3 (端口/角色)mongos (端口)Config Server (端口/角色)
master01192.168.121.13427018 (主節點)27020 (仲裁節點)27019 (副節點)2702127022 (主節點)
node01192.168.121.13527019 (副節點)27018 (主節點)27020 (仲裁節點)2702127022 (副節點)
node02192.168.121.13627020 (仲裁節點)27019 (副節點)27018 (主節點)-27022 (副節點)
config 服務器
dbpath=/usr/local/src/mongodb_demo/shardcluster/configServer/data
logpath=/usr/local/src/mongodb_demo/shardcluster/configServer/logs/config_server.log
port=27022
bind_ip=master01
logappend=true
fork=trues
maxConns=5000
replSet=configs
configsvr=true
初始化config 副本集
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/configServer/configFile/mongodb_config.conf$ ./mongo --host master01 --port 27022
> rs.initiate()
configs:SECONDARY> rs.add('node01:27022')
configs:PRIMARY> rs.add('node02:27022')
shard 服務器
dbpath=/usr/local/src/mongodb_demo/shardcluster/shard/shard1_data
logpath=/usr/local/src/mongodb_demo/shardcluster/shard/logs/shard1.log
port=27018
bind_ip=master01
logappend=true
fork=true
maxConns=5000
replSet=shard1
shardsvr=true

類似地,編輯 shard2.conf 和 shard3.conf,分別修改 dbpath, logpath, port, bind_ip, 和 replSet 參數。

初始化shard 副本集
# 所有節點都啟動
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/shard/configFile/mongodb_shard1.conf
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/shard/configFile/mongodb_shard2.conf
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/shard/configFile/mongodb_shard3.conf# master01節點
$ ./mongo --host master01 --port 27018
> rs.initiate()
configs:SECONDARY> rs.add('node01:27019')
configs:PRIMARY> rs.add('node02:27020')# node01節點
$ ./mongo --host node01--port 27018
> rs.initiate()
configs:SECONDARY> rs.add('node02:27019')
configs:PRIMARY> rs.add('master01 :27020')# node02節點
$ ./mongo --host node02 --port 27018
> rs.initiate()
configs:SECONDARY> rs.add('master01 :27019')
configs:PRIMARY> rs.add('node01:27020')
mongos服務器
logpath=/usr/local/src/mongodb_demo/shardcluster/mongos/logs/mongos.log
logappend=true
port=27021
bind_ip=master01
fork=true
configdb=configs/master01:27022,node01:27022,node02:27022
maxConns=20000

類似地,在node01節點配置

添加shard

$ ./mongo --host master01--port 27021mongos> use gateway
mongos> sh.addShard("shard1/master01:27018,node01:27019,node02:27020")
mongos> sh.addShard("shard2/master01:27020,node01:27018,node02:27019")
mongos> sh.addShard("shard3/master01:27019,node01:27020,node02:27018")
設置chunk大小
use config;
db.settings.insertOne({ _id: "chunksize", value: 64 });
啟動分片
mongos> use gateway
mongos> sh.enableSharding("xxxdatabase")
為集合 user 創建索引并進行分片
mongos> use xxxdatabase
mongos> db.user.createIndex({"id":1})
mongos> sh.shardCollection("xxxdatabase.xxxcollection",{"id":1})

fillder抓包+數據采集

在這里插入圖片描述

    def data_SalesVolume(self,allcity):'''獲取全國和各城市車輛銷售量排名數據結果存儲到本地和hdfs:param allcity: 城市名稱-->list:return:'''data_list = []for city in allcity:city_quote = quote(city)newurl = self.url+f'method=jsb.app.fetch&rank_data_type=116&month=202412&energy_type&price=0%2C-1&manufacturer&rank_city_name={city_quote}&market_time=0&offset=0&count=500&scm_version=1.0.0.2209&iid=3991681621300419&device_id=3991681621296323&ac=wifi&channel=home&aid=36&app_name=automobile&version_code=748&version_name=7.4.8&device_platform=android&os=android&ab_client=a1%2Cc2%2Ce1%2Cf2%2Cg2%2Cf7&ab_group=3167590%2C3577236&ssmix=a&device_type=PCLM10&device_brand=OPPO&language=zh&os_api=28&os_version=9&manifest_version_code=748&resolution=720*1280&dpi=320&update_version_code=7483&_rticket=1738335686380&cdid=637e94d4-1e98-49a9-9b9e-7f567951c149&rom_version=coloros__pq3b.190801.10161630+release-keys&longi_lati_type=0&longi_lati_time=0&content_sort_mode=0&total_memory=3.85&cpu_name=Qualcomm+Technologies%2C+Inc+MSM8998&overall_score=8.6995&cpu_score=7.9848&host_abi=armeabi-v7a'print(f'準備獲取=={city}==的數據---{newurl}---')citynames = self.session.get(url=newurl, headers=self.headers).json()city_sales = {city:citynames}data_list.append(city_sales)# print(citynames)print('----------獲取銷量數據完成-----------')data_json = {'SalesValue':data_list}with open(f'data/data_salesvolume.json','w',encoding='utf-8') as f:json.dump(data_json,f,ensure_ascii=False,indent=4)hdfs_path = "/data_doncar/data_carkm.json"  # HDFS 目標路徑self.hdfs_client.create(hdfs_path, data_json, overwrite=True)  # 上傳到 HDFS

spark清洗數據

創建對象

package org.exampleimport org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._object DonCar_spark {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)Logger.getLogger("akka").setLevel(Level.ERROR)val spark = SparkSession.builder().appName("car_analysis").master("local[*]").config("spark.mongodb.read.connection.uri", "mongodb://root:123456@192.168.121.134:27017").getOrCreate()

連接mongodb數據庫

    def readmongocollection(db:String,collection:String) = {spark.read.format("mongodb").option("database",db).option("collection",collection).load()}

創建臨時表

val car_info = readmongocollection("doncar_top","car_info")val citys_sv = readmongocollection("doncar_top","citys_sv")val nationwide_sv = readmongocollection("doncar_top","nationwide_sv")citys_sv.createOrReplaceTempView("citys_table")car_info.createOrReplaceTempView("car_table")nationwide_sv.createOrReplaceTempView("nationwide_table")

數據分析 并且把數據存到HDFS


//    各城市汽車銷量val citys_sales = spark.sql("select city,sales from citys_table").groupBy("city").sum("sales")//    電車續航排名val brand_max_km = spark.sql("select brand,range_km from car_table").groupBy("brand").max()brand_max_km.limit(7).show()//    油電銷量val car_type_sales =spark.sql("select sales,car_type from nationwide_table ").withColumn("category",when(col("car_type") === 0,0).otherwise(1)).groupBy("category").sum("sales")car_type_sales.show()// 能源占比
//    0 純油
//    1 純電
//    2 混動
//    3-4 增程val car_type = spark.sql("select car_type,sales from nationwide_table").groupBy("car_type").sum("sales")//    car_type.printSchema()val car_type2 = car_type.withColumn("sum(sales)",when(col("car_type") === 3,col("sum(sales)")+car_type.filter(col("car_type") === 4).select("sum(sales)").first().getLong(0)).otherwise(col("sum(sales)")))car_type2.show()//銷量——價格關系val price_sales = spark.sql("SELECT min_price,max_price,sales from nationwide_table").withColumn("average_price",(col("min_price") + col("max_price")) / 2).drop("min_price","max_price")price_sales.show()//   品牌銷量val brand_sales = spark.sql("SELECT brand_name,model,sales FROM nationwide_table").groupBy("brand_name").sum("sales").sort(col("sum(sales)").desc)brand_sales.show()//    城市銷量排名val city_top = citys_sales.sort(col("sum(sales)").desc)city_top.show()//    車輛銷售排名val car_top = spark.sql("SELECT model,sales FROM nationwide_table").sort(col("sales").desc)car_top.show()}
}

在這里插入圖片描述

Flask+echarts進行可視化

from flask import Flask, render_template, redirect, url_for, request, flash, session,jsonify
from SparkAnalysis import analysis
from SparkAnalysis import Get_HdfsData
import pymysqlapp = Flask(__name__)#重定向路由
@app.route('/')
def default():return redirect(url_for('login'))@app.route('/login', methods=['GET', 'POST'])
def login():if request.method == 'POST':username = request.form['username']password = request.form['password']conn = pymysql.connect(host='localhost',  # 數據庫主機地址user='root',  # 數據庫用戶名password='123456',  # 數據庫密碼database='users'  # 數據庫名稱)with conn.cursor() as cursor:cursor.execute("SELECT * FROM account WHERE username = %s AND password = %s", (username, password))account = cursor.fetchone()  # 獲取第一行記錄conn.close()if account:return redirect(url_for('index'))else:return redirect(url_for('login'))return render_template('login.html')@app.route('/register', methods=['GET', 'POST'])
def register():if request.method == 'POST':username = request.form['username']password = request.form['password']conn = pymysql.connect(host='localhost',user='root',password='123456',database='users')with conn.cursor() as cursor:cursor.execute("SELECT * FROM account WHERE username = %s", (username,))if cursor.fetchone():return redirect(url_for('register'))# 如果用戶名不存在,插入新用戶cursor.execute("INSERT INTO account (username, password) VALUES (%s, %s)", (username, password))conn.commit()conn.close()return redirect(url_for('login'))return render_template('register.html')@app.route('/index')
def index():return render_template('index.html',map_data=Get_HdfsData.get_map_data(),bar1_x = analysis.get_bar1_data()['brand'].values.tolist(),# bar1_y=Get_HdfsData.get_bar1_data()['range_km'].values.tolist(),bar1_y=[float(num) for num in analysis.get_bar1_data()['range_km'].values.tolist()],type_0 = Get_HdfsData.get_num_data()[0][2:],type_n0 = Get_HdfsData.get_num_data()[1][2:],pie_data = Get_HdfsData.get_pie_data(),scatter_data = Get_HdfsData.get_scatter_data(),bar2_x=analysis.get_bar2_data()['brand_name'].values.tolist()[:5],bar2_y=analysis.get_bar2_data()['sales'].values.tolist()[:5],line1_x_top=Get_HdfsData.get_line1_data()['city'].values.tolist()[:12],line1_y_top=Get_HdfsData.get_line1_data()['value'].values.tolist()[:12],line1_x_last=analysis.get_line1_data()['city'].values.tolist()[-12:],line1_y_last=analysis.get_line1_data()['value'].values.tolist()[-12:],line2_x_top=Get_HdfsData.get_line2_data()['model'].values.tolist()[:10],line2_y_top=Get_HdfsData.get_line2_data()['sales'].values.tolist()[:10],)if __name__ == '__main__':app.run(host='127.0.0.1', port=5001)

echarts部分代碼


(function() {var myChart = echarts.init(document.querySelector(".map .chart"));echarts.registerMap('china', chinaData);option = {tooltip: {trigger: 'item',formatter: '{b}<br/>{c} (輛)'},visualMap: {min: 1,max: 40000,text: ['High', 'Low'],realtime: false,calculable: true,inRange: {color: ['lightskyblue', 'yellow', 'orangered']},textStyle: {color: 'red' // 設置字體顏色為紅色}},series: [{name: 'city_map',type: 'map',map: 'china',data: map_data,roam: true // 在這里添加 roam 屬性}]};myChart.setOption(option);// 監聽瀏覽器縮放,圖表對象調用縮放resize函數window.addEventListener("resize", function() {myChart.resize();});}
)();

效果圖

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/80617.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/80617.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/80617.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

brew 安裝openjdk查看其版本

使用brew&#xff08;如果你使用Homebrew安裝&#xff09; 如果你通過Homebrew安裝了OpenJDK&#xff0c;可以使用以下命令來查看安裝的版本,&#xff1a; brew list --versions openjdk8 這將會列出所有通過Homebrew安裝的OpenJDK版本及其版本號。 3. 查看/usr/libexec/ja…

【Linux網絡】構建與優化HTTP請求處理 - HttpRequest從理解到實現

&#x1f4e2;博客主頁&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;博客倉庫&#xff1a;https://gitee.com/JohnKingW/linux_test/tree/master/lesson &#x1f4e2;歡迎點贊 &#x1f44d; 收藏 ?留言 &#x1f4dd; 如有錯誤敬請指正&#xff01; &…

Day12(回溯法)——LeetCode51.N皇后39.組合總和

1 前言 今天刷了三道回溯法和一道每日推薦&#xff0c;三道回溯法也迷迷糊糊的&#xff0c;每日推薦把自己繞進去了&#xff0c;雖然是一道之前做過的題的變種。刷的腦子疼。。。今天挑兩道回溯題寫一下吧&#xff0c;其中有一道是之前做過的N皇后&#xff0c;今天在詳細寫一寫…

初階數據結構:二叉搜索樹

目錄 概念 性能 效率分析 二分缺陷 功能 插入 查找 刪除 實現 應用 概念 二叉搜索樹&#xff08;又稱&#xff1a;二叉排序樹&#xff09;&#xff0c;是由一些具有特別性質的二叉樹衍變而來。 只要一棵二叉樹具備以下性質&#xff0c;即可稱作二叉搜索樹。 【1】若…

詳解springcloud gateway工作原理、斷言、filter、uri、id、全局跨域、globalfilter等以及關鍵源碼實現

1.gateway概念 網關就是當前微服務項目的"統一入口"程序中的網關就是當前微服務項目對外界開放的統一入口所有外界的請求都需要先經過網關才能訪問到我們的程序提供了統一入口之后,方便對所有請求進行統一的檢查和管理 2. 網關的主要功能 將所有請求統一經過網關網…

C#中的弱引用使用

弱引用&#xff08;Weak Reference&#xff09;是一種特殊的引用類型&#xff0c;它允許你引用一個對象&#xff0c;但不會阻止該對象被垃圾回收器&#xff08;GC&#xff09;回收。弱引用通常用于需要緩存或跟蹤對象&#xff0c;但又不希望因保留引用而導致內存泄漏的場景。弱…

spring響應式編程系列:異步生產數據

目錄 示例 大致流程 create new MonoCreate subscribe new LambdaMonoSubscriber monoCreate.subscribe accept success onNext 時序圖 類圖 數據發布者 MonoCreate 數據訂閱者 LambdaMonoSubscriber 訂閱的消息體 DefaultMonoSink 本篇文章我們來研究如何將…

MCP Python SDK構建的**SQLite瀏覽器**的完整操作指南

以下是使用MCP Python SDK構建的SQLite瀏覽器的完整操作指南&#xff1a; 一、環境準備 安裝依賴 # 安裝MCP SDK及SQLite支持 pip install mcp sqlite3創建測試數據庫 sqlite3 test.db <<EOF CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT); IN…

【Python爬蟲基礎篇】--3.cookie和session

目錄 1.cookie 1.1.定義 1.2.參數 1.3.分類 2.session 3.使用cookie登錄微博 4.使用session登錄 1.cookie 由于http是一個無狀態的協議&#xff0c;請求與請求之間無法相互傳遞或者記錄一些信息&#xff0c;cookie和session正是為了解決這個問題而產生。 例子&#xff1…

風車郵箱系統詳細使用指南:Windows與Ubuntu雙平臺解析

風車郵箱系統V1.2使用手冊 風車郵箱系統詳細使用指南&#xff1a;Windows與Ubuntu雙平臺解析 前言 在日常網絡活動中&#xff0c;我們經常需要一個臨時郵箱來注冊各類網站或接收驗證碼&#xff0c;但不想使用自己的真實郵箱。「風車無線郵箱系統」作為一款優秀的臨時郵箱工具…

同樣的接口用postman/apifox能跑通,用jmeter跑就報錯500

之前沒用過jmeter,第一次用調試壓測腳本遇到了問題 一樣的接口用postman能跑通&#xff0c;用jmeter跑就報錯500&#xff0c;百度很多文章都說是該接口需要加一個‘內容編碼’改成utf-8,我加了還是不行 后來我就想到apifox好像有隱藏的header&#xff0c;然后開始比較apifox的…

1656打印路徑-Floyd回溯/圖論-鏈表/數據結構

藍橋賬戶中心 1.稅收&#xff1a; “城市的稅收”&#xff1a;所以是中介點的稅收&#xff0c;經過該點后加上 2.路徑&#xff1a; 用數組存儲前驅節點從而串成鏈表 pre[ i ][ j ]代表的是從 i 到 j 的最短路徑上 j 的前驅節點是什么 那么便可以pre[ i ][ j ]k 把k加入pa…

Eigen矩陣操作類 (Map, Block, 視圖類)

1. Map 類&#xff1a;內存映射&#xff08;零拷貝操作&#xff09; 核心功能 將現有的 C/C 數組或緩沖區映射為 Eigen 矩陣/向量&#xff0c;不復制數據&#xff0c;直接操作原內存。 模板參數 cpp Map<Matrix<Scalar, Rows, Cols, Options, MaxRows, MaxCols>&…

多系統安裝經驗,移動硬盤,ubuntu grub修改/etc/fstab 移動硬盤需要改成nfts格式才能放steam游戲

總結&#xff1a;我硬盤會自動掛載&#xff0c;直接格式化nfts&#xff0c;steam就能裝里面了 機械硬盤裝系統真的不行&#xff0c;超級慢游戲還跑不了 --------------------------------------------------------------------底下都不用看 筆記本一個系統&#xff0c;移動硬盤…

JFLAP SOFTWARE 編譯原理用(自動機繪圖)

csdn全是蛆蟲&#xff0c;2mb的軟件&#xff0c;都在那里搞收費&#xff0c;我就看不慣&#xff0c;我就放出來&#xff0c;那咋了&#xff01;&#xff01;&#xff01; https://pan.baidu.com/s/1IuEfHScynjCCUF5ScF26KA 通過網盤分享的文件&#xff1a;JFLAP7.1.jar 鏈接: h…

[Windows] Disk Sorter文件分類管理軟件 v16.7.18

[Windows] Disk Sorter文件分類管理 鏈接&#xff1a;https://pan.xunlei.com/s/VOOl0sDntAdHvlMkc7N0ZOD-A1?pwd966n# Disk Sorter是一個功能強大的文件分類管理軟件&#xff0c;允許對本地磁盤、網絡共享、NAS設備和企業存儲系統中的文件進行分類&#xff0c;并且支持生成…

STM32提高篇: 藍牙通訊

STM32提高篇: 藍牙通訊 一.藍牙通訊介紹1.藍牙技術類型 二.藍牙協議棧1.藍牙芯片架構2.BLE低功耗藍牙協議棧框架 三.ESP32-C3中的藍牙功能1.廣播2.掃描3.通訊 四.發送和接收 一.藍牙通訊介紹 藍牙&#xff0c;是一種利用低功率無線電&#xff0c;支持設備短距離通信的無線電技…

6.1.多級緩存架構

目錄 一、多級緩存基礎與核心概念 緩存的定義與價值 ? 緩存的應用場景&#xff08;高并發、低延遲、減輕數據庫壓力&#xff09; ? 多級緩存 vs 單級緩存的優劣對比 多級緩存核心組件 ? 本地緩存&#xff08;Caffeine、Guava Cache&#xff09; ? 分布式緩存&#xff08;…

MySQL的MVCC【學習筆記】

MVCC 事務的隔離級別分為四種&#xff0c;其中Read Committed和Repeatable Read隔離級別&#xff0c;部分實現就是通過MVCC&#xff08;Multi-Version Concurrency Control&#xff0c;多版本并發控制&#xff09; 版本鏈 版本鏈是通過undo日志實現的&#xff0c; 事務每次修改…

基于OpenMV+STM32+OLED與YOLOv11+PaddleOCR的嵌入式車牌識別系統開發筆記

基于OpenMV、STM32與OLED的嵌入式車牌識別系統開發筆記 基于OpenMV、STM32與OLED的嵌入式車牌識別系統開發筆記系統架構全景 一、實物演示二、OpenMV端設計要點1. 硬件配置優化2. 智能幀率控制算法3. 數據傳輸協議設計 三、PyTorch后端核心實現&#xff1a;YOLOv11與PaddleOCR的…