【Project】kafka+flume+davinci廣告點擊實時分析系統

一、項目需求分析

某電商平臺需實現廣告實時點擊分析系統,核心需求為實時統計以下內容的Top10:

  • 各個廣告的點擊量
  • 各個省份的廣告點擊量
  • 各個城市的廣告點擊量

通過實時掌握廣告投放效果,為廣告投放策略調整和大規模投入提供依據,以實現公司經濟回報最大化。

二、數據流程設計

數據流程如下:

  1. 服務器產生的廣告點擊日志,由Flume進行實時采集
  2. Flume將采集到的數據寫入Kafka消息隊列
  3. Spark Streaming從Kafka消費數據并進行實時計算
  4. 計算結果一方面入庫到MySQL數據庫,另一方面通過連接Davinci進行BI分析,實現數據可視化展示

三、開發步驟

3.1 數據準備

  • 數據集文件名為ad.log,包含電商平臺廣告點擊日志,數據格式為:時間、省份ID、城市ID、用戶ID、廣告ID。
  • 樣本數據示例:
    1516609143867 6 7 64 16
    1516609143869 9 4 75 18
    1516609143869 1 7 87 12
    

3.2 業務建表

  1. 在MySQL節點創建advertise數據庫:create database advertise;
  2. 創建相關數據表:
    • adversisecount表(存儲廣告點擊量)
      CREATE TABLE adversisecount(adname VARCHAR(20) NOT NULL,COUNT INT(11) NOT NULL
      );
      
    • provincecount表(存儲省份廣告點擊量)
      create table provincecount(province varchar(20) not null,count int(11) not null
      );
      
    • citycount表(存儲城市廣告點擊量)
      CREATE TABLE citycount(city VARCHAR(20) NOT NULL,COUNT INT(11) NOT NULL
      );
      
    • 執行advertiseinfo.sql、distinctcode.sql腳本
CREATE DATABASE /*!32312 IF NOT EXISTS*/`advertise` /*!40100 DEFAULT CHARACTER SET utf8 */;USE `advertise`;/*Table structure for table `advertiseinfo` */DROP TABLE IF EXISTS `advertiseinfo`;CREATE TABLE `advertiseinfo` (`aid` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(50) DEFAULT NULL,PRIMARY KEY (`aid`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;/*Data for the table `advertiseinfo` */insert  into `advertiseinfo`(`aid`,`name`) values (1,'論道云原生,且看大數據江湖'),(2,'首屆「奇想獎」元宇宙征文大賽'),(3,'你真的懂Web滲透測試碼?'),(4,'運維工程師,如何從每月3k漲到每月3w?'),(5,'Python人工智能全程套餐課'),(6,'Java入門到進階一卡通'),(7,'王者技術體系課立即搶購'),(8,'報考C認證得超值學習大禮包'),(9,'開魔盒贏豪禮'),(10,'超級實習生等你來拿'),(11,'Python機器學習'),(12,'2022年,為什么一定要學網絡安全'),(13,'月薪2萬,為啥找不到運維人才'),(14,'k8s從蒙圈到熟練:搞懂技術就靠他了!'),(15,'重要通知:網工想漲工資,可以考個證'),(16,'Java不懂這些核心技能,還想去大廠'),(17,'你真的懂網絡安全碼?'),(18,'數據分析師掌握這4點,大廠搶著要'),(19,'做運維,為什么Linux必須精通'),(20,'云計算正在\"殺死\"網工運維');
CREATE DATABASE /*!32312 IF NOT EXISTS*/`advertise` /*!40100 DEFAULT CHARACTER SET utf8 */;USE `advertise`;/*Table structure for table `distinctcode` */DROP TABLE IF EXISTS `distinctcode`;CREATE TABLE `distinctcode` (`id` int(11) NOT NULL AUTO_INCREMENT,`province` varchar(50) CHARACTER SET utf8 DEFAULT NULL,`provinceCode` varchar(20) CHARACTER SET utf8 NOT NULL,`city` varchar(50) CHARACTER SET utf8 NOT NULL,`cityCode` varchar(20) CHARACTER SET utf8 NOT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=161 DEFAULT CHARSET=latin1;/*Data for the table `distinctcode` */insert  into `distinctcode`(`id`,`province`,`provinceCode`,`city`,`cityCode`) values (1,'北京','BJ','朝陽區','BJ-CY'),(2,'北京','BJ','海淀區','BJ-HD'),(3,'北京','BJ','通州區','BJ-TZ'),(4,'北京','BJ','豐臺區','BJ-FS'),(5,'北京','BJ','昌平區','BJ-FT'),(6,'廣東省','GD','東莞市','GD-DG'),(7,'廣東省','GD','廣州市','GD-GZ'),(8,'廣東省','GD','中山市','GD-ZS'),(9,'廣東省','GD','深圳市','GD-SZ'),(10,'廣東省','GD','惠州市','GD-HZ'),(11,'山東省','SD','濟南市','SD-JN'),(12,'山東省','SD','青島市','SD-QD'),(13,'山東省','SD','臨沂市','SD-LY'),(14,'山東省','SD','濟寧市','SD-JN'),(15,'山東省','SD','菏澤市','SD-HZ'),(16,'江蘇省','JS','蘇州市','JS-SZ'),(17,'江蘇省','JS','徐州市','JS-XZ'),(18,'江蘇省','JS','鹽城市','JS-YC'),(19,'江蘇省','JS','無錫市','JS-WX'),(20,'江蘇省','JS','南京市','JS-NJ'),(21,'河南省','HN','鄭州市','HN-ZZ'),(22,'河南省','HN','南陽市','HN-NY'),(23,'河南省','HN','新鄉市','HN-XX'),(24,'河南省','HN','安陽市','HN-AY'),(25,'河南省','HN','洛陽市','HN-LY'),(26,'上海市','SH','松江區','SH-SJ'),(27,'上海市','SH','寶山區','SH-BS'),(28,'上海市','SH','金山區','SH-JS'),(29,'上海市','SH','嘉定區','SH-JD'),(30,'上海市','SH','南匯區','SH-NH'),(31,'河北省','HB','石家莊市','HB-SJZ'),(32,'河北省','HB','唐山市','HB-TS'),(33,'河北省','HB','保定市','HB-BD'),(34,'河北省','HB','邯鄲市','HB-HD'),(35,'河北省','HB','邢臺市','HB-XT'),(36,'浙江省','ZJ','溫州市','ZJ-WZ'),(37,'浙江省','ZJ','寧波市','ZJ-NB'),(38,'浙江省','ZJ','杭州市','ZJ-HZ'),(39,'浙江省','ZJ','臺州市','ZJ-TZ'),(40,'浙江省','ZJ','嘉興市','ZJ-JX'),(41,'陜西省','SX','西安市','SX-XA'),(42,'陜西省','SX','咸陽市','SX-XY'),(43,'陜西省','SX','寶雞市','SX-BJ'),(44,'陜西省','SX','漢中市','SX-HZ'),(45,'陜西省','SX','渭南市','SX-WN'),(46,'湖南省','HN','長沙市','HN-CS'),(47,'湖南省','HN','邵陽市','HN-SY'),(48,'湖南省','HN','常德市','HN-CD'),(49,'湖南省','HN','衡陽市','HN-HY'),(50,'湖南省','HN','株洲市','HN-JZ'),(51,'重慶市','CQ','江北區','CQ-JB'),(52,'重慶市','CQ','渝北區','CQ-YB'),(53,'重慶市','CQ','沙坪壩區','CQ-SPB'),(54,'重慶市','CQ','九龍坡區','CQ-JLP'),(55,'重慶市','CQ','萬州區','CQ-WZ'),(56,'福建省','FJ','漳州市','FJ-ZZ'),(57,'福建省','FJ','廈門市','FJ-XM'),(58,'福建省','FJ','泉州市','FJ-QZ'),(59,'福建省','FJ','福州市','FJ-FZ'),(60,'福建省','FJ','莆田市','FJ-PT'),(61,'天津市','TJ','和平區','TJ-HP'),(62,'天津市','TJ','北辰區','TJ-BC'),(63,'天津市','TJ','河北區','TJ-HB'),(64,'天津市','TJ','河西區','TJ-HX'),(65,'天津市','TJ','西青區','TJ-XQ'),(66,'云南省','YN','昆明市','YN-KM'),(67,'云南省','YN','紅河州','YN-HH'),(68,'云南省','YN','大理州','YN-DL'),(69,'云南省','YN','文山州','YN-WS'),(70,'云南省','YN','德宏州','YN-DH'),(71,'四川省','SC','成都市','SC-CD'),(72,'四川省','SC','綿陽市','SC-MY'),(73,'四川省','SC','廣元市','SC-GY'),(74,'四川省','SC','達州市','SC-DZ'),(75,'四川省','SC','南充市','SC-NC'),(76,'廣西','GX','貴港市','GX-GG'),(77,'廣西','GX','玉林市','GX-YL'),(78,'廣西','GX','北海市','GX-BH'),(79,'廣西','GX','南寧市','GX-NN'),(80,'廣西','GX','柳州市','GX-LZ'),(81,'安徽省','AH','蕪湖市','AH-WH'),(82,'安徽省','AH','合肥市','AH-HF'),(83,'安徽省','AH','六安市','AH-LA'),(84,'安徽省','AH','宿州市','AH-SZ'),(85,'安徽省','AH','阜陽市','AH-FY'),(86,'海南省','HN','三亞市','HN-SY'),(87,'海南省','HN','海口市','HN-HK'),(88,'海南省','HN','瓊海市','HN-QH'),(89,'海南省','HN','文昌市','HN-WC'),(90,'海南省','HN','東方市','HN-DF'),(91,'江西省','JX','南昌市','JX-NC'),(92,'江西省','JX','贛州市','JX-GZ'),(93,'江西省','JX','上饒市','JX-SR'),(94,'江西省','JX','吉安市','JX-JA'),(95,'江西省','JX','九江市','JX-JJ'),(96,'湖北省','HB','武漢市','HB-WH'),(97,'湖北省','HB','宜昌市','HB-YC'),(98,'湖北省','HB','襄樊市','HB-XF'),(99,'湖北省','HB','荊州市','HB-JZ'),(100,'湖北省','HB','恩施州','HB-NS'),(101,'山西省','SX','太原市','SX-TY'),(102,'山西省','SX','大同市','SX-DT'),(103,'山西省','SX','運城市','SX-YC'),(104,'山西省','SX','長治市','SX-CZ'),(105,'山西省','SX','晉城市','SX-JC'),(106,'遼寧省','LN','大連市','LN-DL'),(107,'遼寧省','LN','沈陽市','LN-SY'),(108,'遼寧省','LN','丹東市','LN-DD'),(109,'遼寧省','LN','遼陽市','LN-LY'),(110,'遼寧省','LN','葫蘆島市','LN-HLD'),(111,'臺灣省','TW','臺北市','TW-TB'),(112,'臺灣省','TW','高雄市','TW-GX'),(113,'臺灣省','TW','臺中市','TW-TZ'),(114,'臺灣省','TW','新竹市','TW-XZ'),(115,'臺灣省','TW','基隆市','TW-JL'),(116,'黑龍江','HLJ','齊齊哈爾市','HLJ-QQHE'),(117,'黑龍江','HLJ','哈爾濱市','HLJ-HEB'),(118,'黑龍江','HLJ','大慶市','HLJ-DQ'),(119,'黑龍江','HLJ','佳木斯市','HLJ-JMS'),(120,'黑龍江','HLJ','雙鴨山市','HLJ-SYS'),(121,'內蒙古自治區','NMG','赤峰市','NMG-CF'),(122,'內蒙古自治區','NMG','包頭市','NMG-BT'),(123,'內蒙古自治區','NMG','通遼市','NMG-TL'),(124,'內蒙古自治區','NMG','呼和浩特市','NMG-FHHT'),(125,'內蒙古自治區','NMG','烏海市','NMG-WH'),(126,'貴州省','GZ','貴陽市','GZ-GY'),(127,'貴州省','GZ','黔東南州','GZ-QDN'),(128,'貴州省','GZ','黔南州','GZ-QN'),(129,'貴州省','GZ','遵義市','GZ-ZY'),(130,'貴州省','GZ','黔西南州','GZ-QXN'),(131,'甘肅省','GS','蘭州市','GS-LZ'),(132,'甘肅省','GS','天水市','GS-TS'),(133,'甘肅省','GS','慶陽市','GS-QY'),(134,'甘肅省','GS','武威市','GS-WW'),(135,'甘肅省','GS','酒泉市','GS-JQ'),(136,'青海省','QH','西寧市','QH-XN'),(137,'青海省','QH','海西州','QH-HX'),(138,'青海省','QH','海東地區','QH-HD'),(139,'青海省','QH','海北州','QH-HB'),(140,'青海省','QH','果洛州','QH-GL'),(141,'新疆','XJ','烏魯木齊市','XJ-WLMQ'),(142,'新疆','XJ','伊犁州','XJ-YL'),(143,'新疆','XJ','昌吉州','XJ-CJ'),(144,'新疆','XJ','石河子市','XJ-SHZ'),(145,'新疆','XJ','哈密地區','XJ-HM'),(146,'西藏自治區','XZ','拉薩市','XZ-LS'),(147,'西藏自治區','XZ','山南地區','XZ-SN'),(148,'西藏自治區','XZ','林芝地區','XZ-LZ'),(149,'西藏自治區','XZ','日喀則地區','XZ-RKZ'),(150,'西藏自治區','XZ','阿里地區','XZ-AL'),(151,'吉林省','JL','吉林市','JL-JL'),(152,'吉林省','JL','長春市','JL-CC'),(153,'吉林省','JL','白山市','JL-BS'),(154,'吉林省','JL','白城市','JL-BC'),(155,'吉林省','JL','延邊州','JL-YB'),(156,'寧夏','NX','銀川市','NX-YC'),(157,'寧夏','NX','吳忠市','NX-WZ'),(158,'寧夏','NX','中衛市','NX-ZW'),(159,'寧夏','NX','石嘴山市','NX-SZS'),(160,'寧夏','NX','固原市','NX-GY');

3.3 模擬生成數據

  1. 編寫模擬程序:使用Java編寫AnalogData類,實現從輸入文件讀取數據并按一定速度寫入輸出文件,模擬實時產生的廣告點擊日志。
import java.io.*;public class AnalogData_v2 {public static void main(String[] args) {// 參數校驗if (args.length < 2) {System.err.println("用法: java AnalogData <輸入文件路徑> <輸出文件路徑>");System.exit(1);}String inputFile = args[0];String outputFile = args[1];try {readData(inputFile, outputFile);} catch (FileNotFoundException e) {System.err.println("錯誤: 文件不存在 - " + e.getMessage());} catch (UnsupportedEncodingException e) {System.err.println("錯誤: 不支持的編碼 - " + e.getMessage());} catch (IOException e) {System.err.println("IO異常: " + e.getMessage());} catch (InterruptedException e) {System.err.println("操作被中斷: " + e.getMessage());Thread.currentThread().interrupt(); // 恢復中斷狀態}}public static void readData(String inputFile, String outputFile)throws IOException, InterruptedException {// 使用try-with-resources自動關閉輸入/輸出流try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(inputFile), "GBK"));BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outputFile, true)))) {String line;int counter = 1;while ((line = reader.readLine()) != null) {System.out.printf("第%d行:%s%n", counter, line);writer.write(line);writer.newLine(); // 使用平臺無關的換行符writer.flush();   // 確保數據寫入磁盤counter++;Thread.sleep(1000); // 控制處理速度}}}
}
[root@kafka01 sparkKS]> javac  AnalogData_v2.java
[root@kafka01 sparkKS]> java AnalogData_v2 ./ad.log /opt/apache-flume-1.9.0-bin/logs/ad.log
第1行:1516609143867 6 7 64 16
第2行:1516609143869 9 4 75 18
第3行:1516609143869 1 7 87 12
第4行:1516609143869 2 8 92 9
第5行:1516609143869 6 7 84 24
第6行:1516609143869 1 8 95 5
  1. 項目打包編譯:在IDEA中編譯打包項目為bigdata.jar,上傳至MySQL節點的/root/sparkKS/lib目錄。
  2. 編寫shell腳本
    • 在/root/sparkKS/目錄創建ad.sh腳本,用于執行模擬數據程序
    • 創建common.sh腳本定義環境變量等配置
    • 給ad.sh腳本授權:chmod u+x ad.sh

3.4 業務代碼實現

  1. 引入項目依賴:在pom.xml文件中添加MySQL連接、Spark Streaming及Kafka相關依賴。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>data</groupId><artifactId>data</artifactId><version>1.0-SNAPSHOT</version><properties><scala.version>2.12.15</scala.version><spark.version>3.3.0</spark.version><kafka.version>3.6.1</kafka.version><mysql.version>8.0.27</mysql.version></properties><dependencies><!-- MySQL 8 驅動 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!-- Spark 依賴 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.3.0</version></dependency><!-- Kafka 客戶端依賴 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version></dependency><!-- Scala 庫 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency>
</dependencies><build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins><!-- Scala 編譯插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>4.8.1</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打包插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.6.0</version><configuration><archive><manifest><mainClass>data.kafka_sparkStreaming_mysql</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><!-- 確保使用Java 8 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin>
</plugins>
</build></project>
  1. 開發Spark Streaming應用程序
    • 配置Spark Streaming和Kafka連接參數
    • 從Kafka讀取數據并進行過濾處理
    • 分別統計各個廣告、省份、城市的點擊量
    • 通過foreachRDD和foreachPartition將統計結果寫入MySQL數據庫,實現數據的更新或插入操作
package dataimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import java.util.{HashMap => JHashMap}object KafkaSparkStreamingMysql {// MySQL 8 配置private val mysqlUrl = "jdbc:mysql://192.168.100.153:3306/advertise?useSSL=false&serverTimezone=UTC"private val mysqlUser = "root"private val mysqlPassword = "123456"def main(args: Array[String]): Unit = {// 加載MySQL 8驅動Class.forName("com.mysql.cj.jdbc.Driver")val sparkConf = new SparkConf().setAppName("advertise").setMaster("local[2]").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")val ssc = new StreamingContext(sparkConf, Seconds(1))// 使用Java HashMap替代Scala Map解決循環繼承問題val kafkaParams = new JHashMap[String, Object]()kafkaParams.put("bootstrap.servers", "192.168.100.150:9092,192.168.100.151:9092,192.168.100.152:9092")kafkaParams.put("key.deserializer", classOf[StringDeserializer])kafkaParams.put("value.deserializer", classOf[StringDeserializer])kafkaParams.put("group.id", "advertise")kafkaParams.put("auto.offset.reset", "earliest")kafkaParams.put("enable.auto.commit", false.asInstanceOf[Object])// 創建Kafka流val topics = Array("advertise")val topicsAsList = java.util.Arrays.asList(topics: _*)val stream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicsAsList, kafkaParams))// 處理數據流val lines = stream.map(record => record.value)lines.foreachRDD { rdd =>if (!rdd.isEmpty()) {rdd.foreach(println)}}// 過濾無效數據val filter = lines.map(_.split("\\s+")).filter(_.length == 5)// 統計廣告點擊量processCounts(filter.map(x => (x(4), 1)).reduceByKey(_ + _), "adversisecount", "adname")// 統計省份點擊量processCounts(filter.map(x => (x(1), 1)).reduceByKey(_ + _), "provincecount", "province")// 統計城市點擊量processCounts(filter.map(x => (x(2), 1)).reduceByKey(_ + _), "citycount", "city")ssc.start()ssc.awaitTermination()}/*** 通用的計數處理方法*/private def processCounts(counts: org.apache.spark.streaming.dstream.DStream[(String, Int)],tableName: String,idColumn: String): Unit = {counts.foreachRDD { rdd =>if (!rdd.isEmpty()) {rdd.foreachPartition { records =>updateOrInsertToMysql(records, tableName, idColumn)}}}}/*** 更新或插入MySQL數據 (使用預編譯語句防止SQL注入)*/private def updateOrInsertToMysql(records: Iterator[(String, Int)],tableName: String,idColumn: String): Unit = {var conn: Connection = nullvar checkStmt: PreparedStatement = nullvar updateStmt: PreparedStatement = nullvar insertStmt: PreparedStatement = nulltry {conn = DriverManager.getConnection(mysqlUrl, mysqlUser, mysqlPassword)// 準備SQL語句val checkSql = s"SELECT 1 FROM $tableName WHERE $idColumn = ?"val updateSql = s"UPDATE $tableName SET count = count + ? WHERE $idColumn = ?"val insertSql = s"INSERT INTO $tableName($idColumn, count) VALUES(?, ?)"// 預編譯SQL語句checkStmt = conn.prepareStatement(checkSql)updateStmt = conn.prepareStatement(updateSql)insertStmt = conn.prepareStatement(insertSql)records.foreach { case (name, count) =>// 檢查記錄是否存在checkStmt.setString(1, name)val resultSet = checkStmt.executeQuery()if (resultSet.next()) {// 更新記錄updateStmt.setInt(1, count)updateStmt.setString(2, name)updateStmt.executeUpdate()} else {// 插入新記錄insertStmt.setString(1, name)insertStmt.setInt(2, count)insertStmt.executeUpdate()}// 關閉結果集if (resultSet != null) resultSet.close()}} catch {case e: Exception =>println(s"處理表 $tableName 時出錯: ${e.getMessage}")e.printStackTrace()} finally {// 關閉所有資源if (checkStmt != null) checkStmt.close()if (updateStmt != null) updateStmt.close()if (insertStmt != null) insertStmt.close()if (conn != null) conn.close()}}
}

3.5 打通整個項目流程

  1. 啟動MySQL服務

  2. 啟動Kafka集群,并創建advertise主題
    在這里插入圖片描述

  3. 啟動Spark Streaming應用程序,在IDEA中本地運行或打包提交到Spark集群

  4. 啟動Flume聚合服務:在kafka2和kafka3節點配置avro-file-selector-kafka.properties并啟動

[root@kafka02 apache-flume-1.9.0-bin]# cat conf/avro-file-selector-kafka.properties
#定義source、channel、sink的名稱
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
# 定義和配置一個avro Source
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234
# 定義和配置一個file channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /opt/apache-flume-1.9.0-bin/checkpointDir
agent1.channels.c1.dataDirs = /opt/apache-flume-1.9.0-bin/dataDirs
# 定義和配置一個kafka sink
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = advertise
agent1.sinks.k1.brokerList = kafka01:9092,kafka02:9092,kafka03:9092
agent1.sinks.k1.producer.acks = 1
agent1.sinks.k1.channel = c1
################################################################################################
[root@kafka03 ~]#  cat /opt/apache-flume-1.9.0-bin/conf/avro-file-selector-kafka.properties
#定義source、channel、sink的名稱
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
# 定義和配置一個avro Source
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234
# 定義和配置一個file channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /opt/apache-flume-1.9.0-bin/checkpointDir
agent1.channels.c1.dataDirs = /opt/apache-flume-1.9.0-bin/dataDirs
# 定義和配置一個kafka sink
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = advertise
agent1.sinks.k1.brokerList = kafka01:9092,kafka02:9092,kafka03:9092
agent1.sinks.k1.producer.acks = 1
agent1.sinks.k1.channel = c1
  1. 啟動Flume采集服務:在kafka1節點配置taildir-file-selector-avro.properties并啟動
[root@kafka01 sparkKS]cat /opt/apache-flume-1.9.0-bin/conf/taildir-file-selector-avro.properties
#定義source、channel、sink的名稱
agent1.sources = taildirSource
agent1.channels = fileChannel
agent1.sinkgroups = g1
agent1.sinks = k1 k2
# 定義和配置一個TAILDIR Source
agent1.sources.taildirSource.type = TAILDIR
agent1.sources.taildirSource.positionFile = /opt/apache-flume-1.9.0-bin/taildir_position.json
agent1.sources.taildirSource.filegroups = f1
agent1.sources.taildirSource.filegroups.f1 = /opt/apache-flume-1.9.0-bin/logs/ad.log
agent1.sources.taildirSource.channels = fileChannel
# 定義和配置一個file channel
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /opt/apache-flume-1.9.0-bin/checkpointDir
agent1.channels.fileChannel.dataDirs = /opt/apache-flume-1.9.0-bin/dataDirs
#定義和配置一個 sink組
agent1.sinkgroups.g1.sinks = k1 k2
#為sink組定義一個處理器,load_balance表示負載均衡  failover表示故障切換
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
#定義處理器數據發送方式,round_robin表示輪詢發送  random表示隨機發送
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000
#定義一個sink將數據發送給kafka02節點
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = fileChannel
agent1.sinks.k1.batchSize = 1
agent1.sinks.k1.hostname = kafka02
agent1.sinks.k1.port = 1234
#定義另一個sink將數據發送給kafka03節點
agent1.sinks.k2.type = avro
agent1.sinks.k2.channel = fileChannel
agent1.sinks.k2.batchSize = 1
agent1.sinks.k2.hostname = kafka03
agent1.sinks.k2.port = 1234
  1. 模擬產生數據:執行ad.sh腳本,將數據寫入指定文件,模擬實時日志
    在這里插入圖片描述

3.6 Davinci數據可視化分析

  1. 啟動Davinci服務并登錄
    在這里插入圖片描述

  2. 創建新項目和數據源連接(連接到MySQL的advertise數據庫)

  3. 創建視圖(view):

    • 廣告點擊前10統計:關聯adversisecount和advertiseinfo表
    • 廣告省份點擊前10:關聯provincecount和distinctcode表
    • 廣告城市點擊前10:關聯citycount和distinctcode表
  4. 創建圖表(Widget):為三個視圖分別創建柱狀圖

  5. 創建大屏(Dashboard):

    • 添加創建的圖表
    • 設置數據刷新模式和時長(定時刷新,30秒)
    • 完成大屏制作,實現廣告點擊數據的實時可視化展示

在這里插入圖片描述

bin/stop-server.sh
bin/start-server.sh
##################################
cat /opt/davinci/config/application.yml
##################################
server:protocol: httpaddress: 192.168.100.150port: 38080servlet:context-path: /
jwtToken:secret: secrettimeout: 1800000algorithm: HS512
source:initial-size: 2min-idle: 1max-wait: 6000max-active: 10break-after-acquire-failure: trueconnection-error-retry-attempts: 0query-timeout: 600000validationQueryTimeout: 30000enable-query-log: falseresult-limit: 1000000
spring:mvc:async:request-timeout: 30sdatasource:url: jdbc:mysql://192.168.100.153:3306/advertise?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=trueusername: rootpassword: 123456driver-class-name: com.mysql.jdbc.Driverinitial-size: 2min-idle: 1max-wait: 60000max-active: 10redis:isEnable: falsehost: 127.0.0.1port: 6379password:database: 0timeout: 1000jedis:pool:max-active: 8max-wait: 1max-idle: 8min-idle: 0mail:host: smtp.163.comport: 465username: a351719672@163.comfromAddress:password: xxxxxnickname: luoboziproperties:smtp:starttls:enable: truerequired: trueauth: truemail:smtp:ssl:enable: trueldap:urls:username:password:base:domainName:    # domainName 指 企業郵箱后綴,如企業郵箱為:xxx@example.com, 這里值為 '@example.com'
screenshot:default_browser: PHANTOMJS                    # PHANTOMJS or CHROMEtimeout_second: 600phantomjs_path: /opt/davinci/phantomjschromedriver_path: $your_chromedriver_path$
data-auth-center:channels:- name:base-url:auth-code:
statistic:enable: falseelastic_urls:elastic_user:elastic_index_prefix:mysql_url:mysql_username:mysql_password:kafka.bootstrap.servers:kafka.topic:java.security.krb5.conf:java.security.keytab:java.security.principal:

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/89721.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/89721.shtml
英文地址,請注明出處:http://en.pswp.cn/web/89721.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

JAVA后端開發——success(data) vs toAjax(rows): 何時用

toAjax(int rows)用途&#xff1a;用于不返回任何數據的 “寫” 操作&#xff08;增、刪、改&#xff09;。工作原理&#xff1a;它只接收一個 int 類型的參數&#xff08;通常是數據庫操作影響的行數&#xff09;。它只關心這個數字是不是大于0&#xff0c;然后返回一個通用的…

pdf格式怎么提取其中一部分張頁?

想從PDF里提取幾個頁面&#xff0c;辦法還挺多的&#xff0c;下面給你嘮嘮常見的幾種&#xff0c;保準你一看就懂。一、用專業PDF編輯軟件提取 像Adobe Acrobat&#xff0c;這可是PDF編輯界的“老手”了。你先把要處理的PDF文件在Adobe Acrobat里打開&#xff0c;接著找到菜單欄…

Spring監聽器

1、監聽器的原理 ApplicationListener<T>是Spring框架中基于觀察者模式實現的事件監聽接口&#xff0c;用于監聽應用程序中特定類型的事件。該接口是一個函數式接口&#xff0c;從Spring 4.2開始支持Lambda表達式實現。 接口定義如下&#xff1a; FunctionalInterface …

基于Rust游戲引擎實踐(Game)

Rust游戲引擎推薦 以下是一些流行的Rust游戲引擎,適用于不同開發需求: Bevy 特點:數據驅動、模塊化設計,支持ECS架構,適合初學者和復雜項目。 適用場景:2D/3D游戲、原型開發。 Amethyst 特點:成熟的ECS框架,支持多線程,社區活躍。 適用場景:大型游戲或高性能應用。…

PyTorch 數據加載實戰:從 CSV 到圖像的全流程解析

目錄 一、PyTorch 數據加載的核心組件 1.1 Dataset 類的核心方法 1.2 DataLoader 的作用 二、加載 CSV 數據實戰 2.1 自定義 CSV 數據集 2.2 使用 TensorDataset 快速加載 三、加載圖像數據實戰 3.1 自定義圖像數據集 3.2 使用 ImageFolder 快速加載 四、加載官方數據…

程序人生,開啟2025下半年

時光匆匆&#xff0c;2025年已然過去一半。轉眼來到了7月份。 回望過去上半年&#xff0c;可能你也經歷了職場的浮沉、生活的跌宕、家庭的變故。 而下半年&#xff0c;生活依舊充滿了各種變數。 大環境的起起伏伏、生活節奏的加快&#xff0c;都讓未來的不確定性愈發凸顯。 在這…

在 .NET Core 中創建 Web Socket API

要在 ASP.NET Core 中創建 WebSocket API&#xff0c;您可以按照以下步驟操作&#xff1a;設置新的 ASP.NET Core 項目打開 Visual Studio 或您喜歡的 IDE。 創建一個新的 ASP.NET Core Web 應用程序項目。 選擇API模板&#xff0c;因為這將成為您的 WebSocket API 的基礎。在啟…

Python 之地址編碼識別

根據輸入地址&#xff0c;利用已有的地址編碼文件&#xff0c;構造處理規則策略識別地址的編碼。 lib/address.json 地址編碼文件&#xff08;這個文件太大&#xff0c;博客里放不下&#xff0c;需要的話可以到 gitcode 倉庫獲取&#xff1a;https://gitcode.com/TomorrowAndT…

kafka的部署

目錄 一、kafka簡介 1.1、概述 1.2、消息系統介紹 1.3、點對點消息傳遞模式 1.4、發布-訂閱消息傳遞模式 二、kafka術語解釋 2.1、結構概述 2.2、broker 2.3、topic 2.4、producer 2.5、consumer 2.6、consumer group 2.7、leader 2.8、follower 2.9、partition…

小語種OCR識別技術實現原理

小語種OCR&#xff08;光學字符識別&#xff09;技術的實現原理涉及計算機視覺、自然語言處理&#xff08;NLP&#xff09;和深度學習等多個領域的融合&#xff0c;其核心目標是讓計算機能夠準確識別并理解不同語言的印刷或手寫文本。以下是其關鍵技術實現原理的詳細解析&#…

GPT:讓機器擁有“創造力”的語言引擎

當ChatGPT寫出莎士比亞風格的十四行詩&#xff0c;當GitHub Copilot自動生成編程代碼&#xff0c;背后都源于同一項革命性技術——**GPT&#xff08;Generative Pre-trained Transformer&#xff09;**。今天&#xff0c;我們將揭開這項“語言魔術”背后的科學原理&#xff01;…

LeetCode|Day19|14. 最長公共前綴|Python刷題筆記

LeetCode&#xff5c;Day19&#xff5c;14. 最長公共前綴&#xff5c;Python刷題筆記 &#x1f5d3;? 本文屬于【LeetCode 簡單題百日計劃】系列 &#x1f449; 點擊查看系列總目錄 >> &#x1f4cc; 題目簡介 題號&#xff1a;14. 最長公共前綴 難度&#xff1a;簡單…

安全事件響應分析--基礎命令

----萬能密碼oror1 or # 1or11 1 or 11安全事件響應分析------***windoes***------方法開機啟動有無異常文件 【開始】?【運行】?【msconfig】文件排查 各個盤下的temp(tmp)相關目錄下查看有無異常文件 &#xff1a;Windows產生的 臨時文件 可以通過查看日志且通過篩…

基于C#+SQL Server實現(Web)學生選課管理系統

學生選課管理系統的設計與開發一、項目背景學生選課管理系統是一個學校不可缺少的部分&#xff0c;傳統的人工管理檔案的方式存在著很多的缺點&#xff0c;如&#xff1a;效率低、保密性差等&#xff0c;所以開發一套綜合教務系統管理軟件很有必要&#xff0c;它應該具有傳統的…

垃圾回收(GC)

內存管理策略&#xff0c;在業務進程運行的過程中&#xff0c;由垃圾收集器以類似守護協程的方式在后臺運行&#xff0c;按照指定策略回收不再被使用的對象&#xff0c;釋放內存空間進行回收 優勢&#xff1a; 屏蔽內存回收的細節&#xff1a;屏蔽復雜的內存管理工作&#xff0…

Datawhale AI夏令營-機器學習

比賽簡介 「用戶新增預測挑戰賽」是由科大訊飛主辦的一項數據科學競賽&#xff0c;旨在通過機器學習方法預測用戶是否為新增用戶 比賽屬于二分類任務&#xff0c;評價指標采用F1分數&#xff0c;分數越高表示模型性能越好。 如果你有一份帶標簽的表格型數據&#xff0c;只要…

Spring IOC容器在Web環境中是如何啟動的(源碼級剖析)?

文章目錄一、Web 環境中的 Spring MVC 框架二、Web 應用部署描述配置傳統配置&#xff08;web.xml&#xff09;&#xff1a;Java配置類&#xff08;Servlet 3.0&#xff09;&#xff1a;三、核心啟動流程詳解1. 啟動流程圖2. ★容器初始化入口&#xff1a;ContextLoaderListene…

18個優質Qt開源項目匯總

1&#xff0c;Clementine Music Player Clementine Music Player 是一個功能完善、跨平臺的開源音樂播放器&#xff0c;非常適合用于學習如何開發媒體類應用&#xff0c;尤其是跨平臺桌面應用。它基于 Qt 框架開發&#xff0c;支持多種操作系統&#xff0c;包括 Windows、macO…

計算機視覺:AI 的 “眼睛” 如何看懂世界?

1. 什么是計算機視覺&#xff1a;讓機器 “看見” 并 “理解” 的技術1.1 計算機視覺的核心目標計算機視覺&#xff08;CV&#xff09;是人工智能的一個重要分支&#xff0c;它讓計算機能夠 “看懂” 圖像和視頻 —— 不僅能捕捉像素信息&#xff0c;還能分析內容、提取語義&am…

華為OD刷題記錄

華為OD刷題記錄 刷過的題 入門 1、進制 2、NC61 doing 訂閱專欄