一、項目需求分析
某電商平臺需實現廣告實時點擊分析系統,核心需求為實時統計以下內容的Top10:
- 各個廣告的點擊量
- 各個省份的廣告點擊量
- 各個城市的廣告點擊量
通過實時掌握廣告投放效果,為廣告投放策略調整和大規模投入提供依據,以實現公司經濟回報最大化。
二、數據流程設計
數據流程如下:
- 服務器產生的廣告點擊日志,由Flume進行實時采集
- Flume將采集到的數據寫入Kafka消息隊列
- Spark Streaming從Kafka消費數據并進行實時計算
- 計算結果一方面入庫到MySQL數據庫,另一方面通過連接Davinci進行BI分析,實現數據可視化展示
三、開發步驟
3.1 數據準備
- 數據集文件名為ad.log,包含電商平臺廣告點擊日志,數據格式為:時間、省份ID、城市ID、用戶ID、廣告ID。
- 樣本數據示例:
1516609143867 6 7 64 16 1516609143869 9 4 75 18 1516609143869 1 7 87 12
3.2 業務建表
- 在MySQL節點創建advertise數據庫:
create database advertise;
- 創建相關數據表:
- adversisecount表(存儲廣告點擊量)
CREATE TABLE adversisecount(adname VARCHAR(20) NOT NULL,COUNT INT(11) NOT NULL );
- provincecount表(存儲省份廣告點擊量)
create table provincecount(province varchar(20) not null,count int(11) not null );
- citycount表(存儲城市廣告點擊量)
CREATE TABLE citycount(city VARCHAR(20) NOT NULL,COUNT INT(11) NOT NULL );
- 執行advertiseinfo.sql、distinctcode.sql腳本
- adversisecount表(存儲廣告點擊量)
CREATE DATABASE /*!32312 IF NOT EXISTS*/`advertise` /*!40100 DEFAULT CHARACTER SET utf8 */;USE `advertise`;/*Table structure for table `advertiseinfo` */DROP TABLE IF EXISTS `advertiseinfo`;CREATE TABLE `advertiseinfo` (`aid` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(50) DEFAULT NULL,PRIMARY KEY (`aid`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;/*Data for the table `advertiseinfo` */insert into `advertiseinfo`(`aid`,`name`) values (1,'論道云原生,且看大數據江湖'),(2,'首屆「奇想獎」元宇宙征文大賽'),(3,'你真的懂Web滲透測試碼?'),(4,'運維工程師,如何從每月3k漲到每月3w?'),(5,'Python人工智能全程套餐課'),(6,'Java入門到進階一卡通'),(7,'王者技術體系課立即搶購'),(8,'報考C認證得超值學習大禮包'),(9,'開魔盒贏豪禮'),(10,'超級實習生等你來拿'),(11,'Python機器學習'),(12,'2022年,為什么一定要學網絡安全'),(13,'月薪2萬,為啥找不到運維人才'),(14,'k8s從蒙圈到熟練:搞懂技術就靠他了!'),(15,'重要通知:網工想漲工資,可以考個證'),(16,'Java不懂這些核心技能,還想去大廠'),(17,'你真的懂網絡安全碼?'),(18,'數據分析師掌握這4點,大廠搶著要'),(19,'做運維,為什么Linux必須精通'),(20,'云計算正在\"殺死\"網工運維');
CREATE DATABASE /*!32312 IF NOT EXISTS*/`advertise` /*!40100 DEFAULT CHARACTER SET utf8 */;USE `advertise`;/*Table structure for table `distinctcode` */DROP TABLE IF EXISTS `distinctcode`;CREATE TABLE `distinctcode` (`id` int(11) NOT NULL AUTO_INCREMENT,`province` varchar(50) CHARACTER SET utf8 DEFAULT NULL,`provinceCode` varchar(20) CHARACTER SET utf8 NOT NULL,`city` varchar(50) CHARACTER SET utf8 NOT NULL,`cityCode` varchar(20) CHARACTER SET utf8 NOT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=161 DEFAULT CHARSET=latin1;/*Data for the table `distinctcode` */insert into `distinctcode`(`id`,`province`,`provinceCode`,`city`,`cityCode`) values (1,'北京','BJ','朝陽區','BJ-CY'),(2,'北京','BJ','海淀區','BJ-HD'),(3,'北京','BJ','通州區','BJ-TZ'),(4,'北京','BJ','豐臺區','BJ-FS'),(5,'北京','BJ','昌平區','BJ-FT'),(6,'廣東省','GD','東莞市','GD-DG'),(7,'廣東省','GD','廣州市','GD-GZ'),(8,'廣東省','GD','中山市','GD-ZS'),(9,'廣東省','GD','深圳市','GD-SZ'),(10,'廣東省','GD','惠州市','GD-HZ'),(11,'山東省','SD','濟南市','SD-JN'),(12,'山東省','SD','青島市','SD-QD'),(13,'山東省','SD','臨沂市','SD-LY'),(14,'山東省','SD','濟寧市','SD-JN'),(15,'山東省','SD','菏澤市','SD-HZ'),(16,'江蘇省','JS','蘇州市','JS-SZ'),(17,'江蘇省','JS','徐州市','JS-XZ'),(18,'江蘇省','JS','鹽城市','JS-YC'),(19,'江蘇省','JS','無錫市','JS-WX'),(20,'江蘇省','JS','南京市','JS-NJ'),(21,'河南省','HN','鄭州市','HN-ZZ'),(22,'河南省','HN','南陽市','HN-NY'),(23,'河南省','HN','新鄉市','HN-XX'),(24,'河南省','HN','安陽市','HN-AY'),(25,'河南省','HN','洛陽市','HN-LY'),(26,'上海市','SH','松江區','SH-SJ'),(27,'上海市','SH','寶山區','SH-BS'),(28,'上海市','SH','金山區','SH-JS'),(29,'上海市','SH','嘉定區','SH-JD'),(30,'上海市','SH','南匯區','SH-NH'),(31,'河北省','HB','石家莊市','HB-SJZ'),(32,'河北省','HB','唐山市','HB-TS'),(33,'河北省','HB','保定市','HB-BD'),(34,'河北省','HB','邯鄲市','HB-HD'),(35,'河北省','HB','邢臺市','HB-XT'),(36,'浙江省','ZJ','溫州市','ZJ-WZ'),(37,'浙江省','ZJ','寧波市','ZJ-NB'),(38,'浙江省','ZJ','杭州市','ZJ-HZ'),(39,'浙江省','ZJ','臺州市','ZJ-TZ'),(40,'浙江省','ZJ','嘉興市','ZJ-JX'),(41,'陜西省','SX','西安市','SX-XA'),(42,'陜西省','SX','咸陽市','SX-XY'),(43,'陜西省','SX','寶雞市','SX-BJ'),(44,'陜西省','SX','漢中市','SX-HZ'),(45,'陜西省','SX','渭南市','SX-WN'),(46,'湖南省','HN','長沙市','HN-CS'),(47,'湖南省','HN','邵陽市','HN-SY'),(48,'湖南省','HN','常德市','HN-CD'),(49,'湖南省','HN','衡陽市','HN-HY'),(50,'湖南省','HN','株洲市','HN-JZ'),(51,'重慶市','CQ','江北區','CQ-JB'),(52,'重慶市','CQ','渝北區','CQ-YB'),(53,'重慶市','CQ','沙坪壩區','CQ-SPB'),(54,'重慶市','CQ','九龍坡區','CQ-JLP'),(55,'重慶市','CQ','萬州區','CQ-WZ'),(56,'福建省','FJ','漳州市','FJ-ZZ'),(57,'福建省','FJ','廈門市','FJ-XM'),(58,'福建省','FJ','泉州市','FJ-QZ'),(59,'福建省','FJ','福州市','FJ-FZ'),(60,'福建省','FJ','莆田市','FJ-PT'),(61,'天津市','TJ','和平區','TJ-HP'),(62,'天津市','TJ','北辰區','TJ-BC'),(63,'天津市','TJ','河北區','TJ-HB'),(64,'天津市','TJ','河西區','TJ-HX'),(65,'天津市','TJ','西青區','TJ-XQ'),(66,'云南省','YN','昆明市','YN-KM'),(67,'云南省','YN','紅河州','YN-HH'),(68,'云南省','YN','大理州','YN-DL'),(69,'云南省','YN','文山州','YN-WS'),(70,'云南省','YN','德宏州','YN-DH'),(71,'四川省','SC','成都市','SC-CD'),(72,'四川省','SC','綿陽市','SC-MY'),(73,'四川省','SC','廣元市','SC-GY'),(74,'四川省','SC','達州市','SC-DZ'),(75,'四川省','SC','南充市','SC-NC'),(76,'廣西','GX','貴港市','GX-GG'),(77,'廣西','GX','玉林市','GX-YL'),(78,'廣西','GX','北海市','GX-BH'),(79,'廣西','GX','南寧市','GX-NN'),(80,'廣西','GX','柳州市','GX-LZ'),(81,'安徽省','AH','蕪湖市','AH-WH'),(82,'安徽省','AH','合肥市','AH-HF'),(83,'安徽省','AH','六安市','AH-LA'),(84,'安徽省','AH','宿州市','AH-SZ'),(85,'安徽省','AH','阜陽市','AH-FY'),(86,'海南省','HN','三亞市','HN-SY'),(87,'海南省','HN','海口市','HN-HK'),(88,'海南省','HN','瓊海市','HN-QH'),(89,'海南省','HN','文昌市','HN-WC'),(90,'海南省','HN','東方市','HN-DF'),(91,'江西省','JX','南昌市','JX-NC'),(92,'江西省','JX','贛州市','JX-GZ'),(93,'江西省','JX','上饒市','JX-SR'),(94,'江西省','JX','吉安市','JX-JA'),(95,'江西省','JX','九江市','JX-JJ'),(96,'湖北省','HB','武漢市','HB-WH'),(97,'湖北省','HB','宜昌市','HB-YC'),(98,'湖北省','HB','襄樊市','HB-XF'),(99,'湖北省','HB','荊州市','HB-JZ'),(100,'湖北省','HB','恩施州','HB-NS'),(101,'山西省','SX','太原市','SX-TY'),(102,'山西省','SX','大同市','SX-DT'),(103,'山西省','SX','運城市','SX-YC'),(104,'山西省','SX','長治市','SX-CZ'),(105,'山西省','SX','晉城市','SX-JC'),(106,'遼寧省','LN','大連市','LN-DL'),(107,'遼寧省','LN','沈陽市','LN-SY'),(108,'遼寧省','LN','丹東市','LN-DD'),(109,'遼寧省','LN','遼陽市','LN-LY'),(110,'遼寧省','LN','葫蘆島市','LN-HLD'),(111,'臺灣省','TW','臺北市','TW-TB'),(112,'臺灣省','TW','高雄市','TW-GX'),(113,'臺灣省','TW','臺中市','TW-TZ'),(114,'臺灣省','TW','新竹市','TW-XZ'),(115,'臺灣省','TW','基隆市','TW-JL'),(116,'黑龍江','HLJ','齊齊哈爾市','HLJ-QQHE'),(117,'黑龍江','HLJ','哈爾濱市','HLJ-HEB'),(118,'黑龍江','HLJ','大慶市','HLJ-DQ'),(119,'黑龍江','HLJ','佳木斯市','HLJ-JMS'),(120,'黑龍江','HLJ','雙鴨山市','HLJ-SYS'),(121,'內蒙古自治區','NMG','赤峰市','NMG-CF'),(122,'內蒙古自治區','NMG','包頭市','NMG-BT'),(123,'內蒙古自治區','NMG','通遼市','NMG-TL'),(124,'內蒙古自治區','NMG','呼和浩特市','NMG-FHHT'),(125,'內蒙古自治區','NMG','烏海市','NMG-WH'),(126,'貴州省','GZ','貴陽市','GZ-GY'),(127,'貴州省','GZ','黔東南州','GZ-QDN'),(128,'貴州省','GZ','黔南州','GZ-QN'),(129,'貴州省','GZ','遵義市','GZ-ZY'),(130,'貴州省','GZ','黔西南州','GZ-QXN'),(131,'甘肅省','GS','蘭州市','GS-LZ'),(132,'甘肅省','GS','天水市','GS-TS'),(133,'甘肅省','GS','慶陽市','GS-QY'),(134,'甘肅省','GS','武威市','GS-WW'),(135,'甘肅省','GS','酒泉市','GS-JQ'),(136,'青海省','QH','西寧市','QH-XN'),(137,'青海省','QH','海西州','QH-HX'),(138,'青海省','QH','海東地區','QH-HD'),(139,'青海省','QH','海北州','QH-HB'),(140,'青海省','QH','果洛州','QH-GL'),(141,'新疆','XJ','烏魯木齊市','XJ-WLMQ'),(142,'新疆','XJ','伊犁州','XJ-YL'),(143,'新疆','XJ','昌吉州','XJ-CJ'),(144,'新疆','XJ','石河子市','XJ-SHZ'),(145,'新疆','XJ','哈密地區','XJ-HM'),(146,'西藏自治區','XZ','拉薩市','XZ-LS'),(147,'西藏自治區','XZ','山南地區','XZ-SN'),(148,'西藏自治區','XZ','林芝地區','XZ-LZ'),(149,'西藏自治區','XZ','日喀則地區','XZ-RKZ'),(150,'西藏自治區','XZ','阿里地區','XZ-AL'),(151,'吉林省','JL','吉林市','JL-JL'),(152,'吉林省','JL','長春市','JL-CC'),(153,'吉林省','JL','白山市','JL-BS'),(154,'吉林省','JL','白城市','JL-BC'),(155,'吉林省','JL','延邊州','JL-YB'),(156,'寧夏','NX','銀川市','NX-YC'),(157,'寧夏','NX','吳忠市','NX-WZ'),(158,'寧夏','NX','中衛市','NX-ZW'),(159,'寧夏','NX','石嘴山市','NX-SZS'),(160,'寧夏','NX','固原市','NX-GY');
3.3 模擬生成數據
- 編寫模擬程序:使用Java編寫AnalogData類,實現從輸入文件讀取數據并按一定速度寫入輸出文件,模擬實時產生的廣告點擊日志。
import java.io.*;public class AnalogData_v2 {public static void main(String[] args) {// 參數校驗if (args.length < 2) {System.err.println("用法: java AnalogData <輸入文件路徑> <輸出文件路徑>");System.exit(1);}String inputFile = args[0];String outputFile = args[1];try {readData(inputFile, outputFile);} catch (FileNotFoundException e) {System.err.println("錯誤: 文件不存在 - " + e.getMessage());} catch (UnsupportedEncodingException e) {System.err.println("錯誤: 不支持的編碼 - " + e.getMessage());} catch (IOException e) {System.err.println("IO異常: " + e.getMessage());} catch (InterruptedException e) {System.err.println("操作被中斷: " + e.getMessage());Thread.currentThread().interrupt(); // 恢復中斷狀態}}public static void readData(String inputFile, String outputFile)throws IOException, InterruptedException {// 使用try-with-resources自動關閉輸入/輸出流try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(inputFile), "GBK"));BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outputFile, true)))) {String line;int counter = 1;while ((line = reader.readLine()) != null) {System.out.printf("第%d行:%s%n", counter, line);writer.write(line);writer.newLine(); // 使用平臺無關的換行符writer.flush(); // 確保數據寫入磁盤counter++;Thread.sleep(1000); // 控制處理速度}}}
}
[root@kafka01 sparkKS]> javac AnalogData_v2.java
[root@kafka01 sparkKS]> java AnalogData_v2 ./ad.log /opt/apache-flume-1.9.0-bin/logs/ad.log
第1行:1516609143867 6 7 64 16
第2行:1516609143869 9 4 75 18
第3行:1516609143869 1 7 87 12
第4行:1516609143869 2 8 92 9
第5行:1516609143869 6 7 84 24
第6行:1516609143869 1 8 95 5
- 項目打包編譯:在IDEA中編譯打包項目為bigdata.jar,上傳至MySQL節點的/root/sparkKS/lib目錄。
- 編寫shell腳本:
- 在/root/sparkKS/目錄創建ad.sh腳本,用于執行模擬數據程序
- 創建common.sh腳本定義環境變量等配置
- 給ad.sh腳本授權:
chmod u+x ad.sh
3.4 業務代碼實現
- 引入項目依賴:在pom.xml文件中添加MySQL連接、Spark Streaming及Kafka相關依賴。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>data</groupId><artifactId>data</artifactId><version>1.0-SNAPSHOT</version><properties><scala.version>2.12.15</scala.version><spark.version>3.3.0</spark.version><kafka.version>3.6.1</kafka.version><mysql.version>8.0.27</mysql.version></properties><dependencies><!-- MySQL 8 驅動 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!-- Spark 依賴 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.3.0</version></dependency><!-- Kafka 客戶端依賴 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version></dependency><!-- Scala 庫 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency>
</dependencies><build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins><!-- Scala 編譯插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>4.8.1</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打包插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.6.0</version><configuration><archive><manifest><mainClass>data.kafka_sparkStreaming_mysql</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><!-- 確保使用Java 8 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin>
</plugins>
</build></project>
- 開發Spark Streaming應用程序:
- 配置Spark Streaming和Kafka連接參數
- 從Kafka讀取數據并進行過濾處理
- 分別統計各個廣告、省份、城市的點擊量
- 通過foreachRDD和foreachPartition將統計結果寫入MySQL數據庫,實現數據的更新或插入操作
package dataimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import java.util.{HashMap => JHashMap}object KafkaSparkStreamingMysql {// MySQL 8 配置private val mysqlUrl = "jdbc:mysql://192.168.100.153:3306/advertise?useSSL=false&serverTimezone=UTC"private val mysqlUser = "root"private val mysqlPassword = "123456"def main(args: Array[String]): Unit = {// 加載MySQL 8驅動Class.forName("com.mysql.cj.jdbc.Driver")val sparkConf = new SparkConf().setAppName("advertise").setMaster("local[2]").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")val ssc = new StreamingContext(sparkConf, Seconds(1))// 使用Java HashMap替代Scala Map解決循環繼承問題val kafkaParams = new JHashMap[String, Object]()kafkaParams.put("bootstrap.servers", "192.168.100.150:9092,192.168.100.151:9092,192.168.100.152:9092")kafkaParams.put("key.deserializer", classOf[StringDeserializer])kafkaParams.put("value.deserializer", classOf[StringDeserializer])kafkaParams.put("group.id", "advertise")kafkaParams.put("auto.offset.reset", "earliest")kafkaParams.put("enable.auto.commit", false.asInstanceOf[Object])// 創建Kafka流val topics = Array("advertise")val topicsAsList = java.util.Arrays.asList(topics: _*)val stream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicsAsList, kafkaParams))// 處理數據流val lines = stream.map(record => record.value)lines.foreachRDD { rdd =>if (!rdd.isEmpty()) {rdd.foreach(println)}}// 過濾無效數據val filter = lines.map(_.split("\\s+")).filter(_.length == 5)// 統計廣告點擊量processCounts(filter.map(x => (x(4), 1)).reduceByKey(_ + _), "adversisecount", "adname")// 統計省份點擊量processCounts(filter.map(x => (x(1), 1)).reduceByKey(_ + _), "provincecount", "province")// 統計城市點擊量processCounts(filter.map(x => (x(2), 1)).reduceByKey(_ + _), "citycount", "city")ssc.start()ssc.awaitTermination()}/*** 通用的計數處理方法*/private def processCounts(counts: org.apache.spark.streaming.dstream.DStream[(String, Int)],tableName: String,idColumn: String): Unit = {counts.foreachRDD { rdd =>if (!rdd.isEmpty()) {rdd.foreachPartition { records =>updateOrInsertToMysql(records, tableName, idColumn)}}}}/*** 更新或插入MySQL數據 (使用預編譯語句防止SQL注入)*/private def updateOrInsertToMysql(records: Iterator[(String, Int)],tableName: String,idColumn: String): Unit = {var conn: Connection = nullvar checkStmt: PreparedStatement = nullvar updateStmt: PreparedStatement = nullvar insertStmt: PreparedStatement = nulltry {conn = DriverManager.getConnection(mysqlUrl, mysqlUser, mysqlPassword)// 準備SQL語句val checkSql = s"SELECT 1 FROM $tableName WHERE $idColumn = ?"val updateSql = s"UPDATE $tableName SET count = count + ? WHERE $idColumn = ?"val insertSql = s"INSERT INTO $tableName($idColumn, count) VALUES(?, ?)"// 預編譯SQL語句checkStmt = conn.prepareStatement(checkSql)updateStmt = conn.prepareStatement(updateSql)insertStmt = conn.prepareStatement(insertSql)records.foreach { case (name, count) =>// 檢查記錄是否存在checkStmt.setString(1, name)val resultSet = checkStmt.executeQuery()if (resultSet.next()) {// 更新記錄updateStmt.setInt(1, count)updateStmt.setString(2, name)updateStmt.executeUpdate()} else {// 插入新記錄insertStmt.setString(1, name)insertStmt.setInt(2, count)insertStmt.executeUpdate()}// 關閉結果集if (resultSet != null) resultSet.close()}} catch {case e: Exception =>println(s"處理表 $tableName 時出錯: ${e.getMessage}")e.printStackTrace()} finally {// 關閉所有資源if (checkStmt != null) checkStmt.close()if (updateStmt != null) updateStmt.close()if (insertStmt != null) insertStmt.close()if (conn != null) conn.close()}}
}
3.5 打通整個項目流程
-
啟動MySQL服務
-
啟動Kafka集群,并創建advertise主題
-
啟動Spark Streaming應用程序,在IDEA中本地運行或打包提交到Spark集群
-
啟動Flume聚合服務:在kafka2和kafka3節點配置avro-file-selector-kafka.properties并啟動
[root@kafka02 apache-flume-1.9.0-bin]# cat conf/avro-file-selector-kafka.properties
#定義source、channel、sink的名稱
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
# 定義和配置一個avro Source
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234
# 定義和配置一個file channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /opt/apache-flume-1.9.0-bin/checkpointDir
agent1.channels.c1.dataDirs = /opt/apache-flume-1.9.0-bin/dataDirs
# 定義和配置一個kafka sink
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = advertise
agent1.sinks.k1.brokerList = kafka01:9092,kafka02:9092,kafka03:9092
agent1.sinks.k1.producer.acks = 1
agent1.sinks.k1.channel = c1
################################################################################################
[root@kafka03 ~]# cat /opt/apache-flume-1.9.0-bin/conf/avro-file-selector-kafka.properties
#定義source、channel、sink的名稱
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
# 定義和配置一個avro Source
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234
# 定義和配置一個file channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /opt/apache-flume-1.9.0-bin/checkpointDir
agent1.channels.c1.dataDirs = /opt/apache-flume-1.9.0-bin/dataDirs
# 定義和配置一個kafka sink
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = advertise
agent1.sinks.k1.brokerList = kafka01:9092,kafka02:9092,kafka03:9092
agent1.sinks.k1.producer.acks = 1
agent1.sinks.k1.channel = c1
- 啟動Flume采集服務:在kafka1節點配置taildir-file-selector-avro.properties并啟動
[root@kafka01 sparkKS]cat /opt/apache-flume-1.9.0-bin/conf/taildir-file-selector-avro.properties
#定義source、channel、sink的名稱
agent1.sources = taildirSource
agent1.channels = fileChannel
agent1.sinkgroups = g1
agent1.sinks = k1 k2
# 定義和配置一個TAILDIR Source
agent1.sources.taildirSource.type = TAILDIR
agent1.sources.taildirSource.positionFile = /opt/apache-flume-1.9.0-bin/taildir_position.json
agent1.sources.taildirSource.filegroups = f1
agent1.sources.taildirSource.filegroups.f1 = /opt/apache-flume-1.9.0-bin/logs/ad.log
agent1.sources.taildirSource.channels = fileChannel
# 定義和配置一個file channel
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /opt/apache-flume-1.9.0-bin/checkpointDir
agent1.channels.fileChannel.dataDirs = /opt/apache-flume-1.9.0-bin/dataDirs
#定義和配置一個 sink組
agent1.sinkgroups.g1.sinks = k1 k2
#為sink組定義一個處理器,load_balance表示負載均衡 failover表示故障切換
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
#定義處理器數據發送方式,round_robin表示輪詢發送 random表示隨機發送
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000
#定義一個sink將數據發送給kafka02節點
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = fileChannel
agent1.sinks.k1.batchSize = 1
agent1.sinks.k1.hostname = kafka02
agent1.sinks.k1.port = 1234
#定義另一個sink將數據發送給kafka03節點
agent1.sinks.k2.type = avro
agent1.sinks.k2.channel = fileChannel
agent1.sinks.k2.batchSize = 1
agent1.sinks.k2.hostname = kafka03
agent1.sinks.k2.port = 1234
- 模擬產生數據:執行ad.sh腳本,將數據寫入指定文件,模擬實時日志
3.6 Davinci數據可視化分析
-
啟動Davinci服務并登錄
-
創建新項目和數據源連接(連接到MySQL的advertise數據庫)
-
創建視圖(view):
- 廣告點擊前10統計:關聯adversisecount和advertiseinfo表
- 廣告省份點擊前10:關聯provincecount和distinctcode表
- 廣告城市點擊前10:關聯citycount和distinctcode表
-
創建圖表(Widget):為三個視圖分別創建柱狀圖
-
創建大屏(Dashboard):
- 添加創建的圖表
- 設置數據刷新模式和時長(定時刷新,30秒)
- 完成大屏制作,實現廣告點擊數據的實時可視化展示
bin/stop-server.sh
bin/start-server.sh
##################################
cat /opt/davinci/config/application.yml
##################################
server:protocol: httpaddress: 192.168.100.150port: 38080servlet:context-path: /
jwtToken:secret: secrettimeout: 1800000algorithm: HS512
source:initial-size: 2min-idle: 1max-wait: 6000max-active: 10break-after-acquire-failure: trueconnection-error-retry-attempts: 0query-timeout: 600000validationQueryTimeout: 30000enable-query-log: falseresult-limit: 1000000
spring:mvc:async:request-timeout: 30sdatasource:url: jdbc:mysql://192.168.100.153:3306/advertise?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=trueusername: rootpassword: 123456driver-class-name: com.mysql.jdbc.Driverinitial-size: 2min-idle: 1max-wait: 60000max-active: 10redis:isEnable: falsehost: 127.0.0.1port: 6379password:database: 0timeout: 1000jedis:pool:max-active: 8max-wait: 1max-idle: 8min-idle: 0mail:host: smtp.163.comport: 465username: a351719672@163.comfromAddress:password: xxxxxnickname: luoboziproperties:smtp:starttls:enable: truerequired: trueauth: truemail:smtp:ssl:enable: trueldap:urls:username:password:base:domainName: # domainName 指 企業郵箱后綴,如企業郵箱為:xxx@example.com, 這里值為 '@example.com'
screenshot:default_browser: PHANTOMJS # PHANTOMJS or CHROMEtimeout_second: 600phantomjs_path: /opt/davinci/phantomjschromedriver_path: $your_chromedriver_path$
data-auth-center:channels:- name:base-url:auth-code:
statistic:enable: falseelastic_urls:elastic_user:elastic_index_prefix:mysql_url:mysql_username:mysql_password:kafka.bootstrap.servers:kafka.topic:java.security.krb5.conf:java.security.keytab:java.security.principal: