四、DataFrame存儲+Spark UDF函數
1、儲存DataFrame
1)、將DataFrame存儲為parquet文件
2)、將DataFrame存儲到JDBC數據庫
3)、將DataFrame存儲到Hive表
2、UDF:用戶自定義函數
可以自定義類實現UDFX接口
java:
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("udf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Row call(String s) throws Exception {
return RowFactory.create(s);}
});List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD,schema);
df.registerTempTable("user");/*** 根據UDF函數參數的個數來決定是實現哪一個UDF UDF1,UDF2。。。。UDF1xxx*/
sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(String t1) throws Exception {return t1.length();}
}, DataTypes.IntegerType);
sqlContext.sql("select name ,StrLen(name) as length from user").show();//sqlContext.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
//
// /**
// *
// */
// private static final long serialVersionUID = 1L;
//
// @Override
// public Integer call(String t1, Integer t2) throws Exception {
//return t1.length()+t2;
// }
//} ,DataTypes.IntegerType );
//sqlContext.sql("select name ,StrLen(name,10) as length from user").show();sc.stop();
scala:
1.val spark = SparkSession.builder().master("local").appName("UDF").getOrCreate()
2.val nameList: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhaoliu", "tianqi")
3.import spark.implicits._
4.val nameDF: DataFrame = nameList.toDF("name")
5.nameDF.createOrReplaceTempView("students")
6.nameDF.show()
7.
8.spark.udf.register("STRLEN",(name:String)=>{
9.name.length
10.})
11.spark.sql("select name ,STRLEN(name) as length from students order by length desc").show(100)
五、UDAF函數
1、UDAF:用戶自定義聚合函數
1)、實現UDAF函數如果要自定義類要繼承
UserDefinedAggregateFunction類
2)、UDAF原理圖
java:
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("udaf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu","zhangsan","zhangsan","lisi"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Row call(String s) throws Exception {return RowFactory.create(s);}
});List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/*** 注冊一個UDAF函數,實現統計相同值得個數* 注意:這里可以自定義一個類繼承UserDefinedAggregateFunction類也是可以的*/
sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {/*** */private static final long serialVersionUID = 1L;/*** 更新 可以認為一個一個地將組內的字段值傳遞進來 實現拼接的邏輯* buffer.getInt(0)獲取的是上一次聚合后的值* 相當于map端的combiner,combiner就是對每一個map task的處理結果進行一次小聚合 * 大聚和發生在reduce端.* 這里即是:在進行聚合的時候,每當有新的值進來,對分組后的聚合如何進行計算*/@Overridepublic void update(MutableAggregationBuffer buffer, Row arg1) {buffer.update(0, buffer.getInt(0)+1);}/*** 合并 update操作,可能是針對一個分組內的部分數據,在某個節點上發生的 但是可能一個分組內的數據,會分布在多個節點上處理* 此時就要用merge操作,將各個節點上分布式拼接好的串,合并起來* buffer1.getInt(0) : 大聚和的時候 上一次聚合后的值 * buffer2.getInt(0) : 這次計算傳入進來的update的結果* 這里即是:最后在分布式節點完成后需要進行全局級別的Merge操作*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));}/*** 指定輸入字段的字段及類型*/@Overridepublic StructType inputSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true)));}/*** 初始化一個內部的自己定義的值,在Aggregate之前每組數據的初始化結果*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);}/*** 最后返回一個和DataType的類型要一致的類型,返回UDAF最后的計算結果*/@Overridepublic Object evaluate(Row row) {return row.getInt(0);}@Overridepublic boolean deterministic() {//設置為truereturn true;}/*** 指定UDAF函數計算后返回的結果類型*/@Overridepublic DataType dataType() {return DataTypes.IntegerType;}/*** 在進行聚合操作的時候所要處理的數據的結果的類型*/@Overridepublic StructType bufferSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)));}});sqlContext.sql("select name ,StringCount(name) from user group by name").show();sc.stop();
scala:
1.class MyCount extends UserDefinedAggregateFunction{
2. //輸入數據的類型
3. override def inputSchema: StructType = StructType(List[StructField](StructField("xx",StringType,true)))
4.
5. //在聚合過程中處理的數據類型
6. override def bufferSchema: StructType = StructType(List[StructField](StructField("xx",IntegerType,true)))
7.
8. //最終返回值的類型,與evaluate返回的值保持一致
9. override def dataType: DataType = IntegerType
10.
11. //多次運行數據是否一致
12. override def deterministic: Boolean = true
13.
14. //每個分區中每組key 對應的初始值
15. override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,0)
16.
17. //每個分區中,每個分組內進行聚合操作
18. override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
19. buffer.update(0,buffer.getInt(0) + 1)
20. }
21.
22. //不同的分區中相同的key的數據進行聚合
23. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
24. buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0))
25. }
26.
27. //聚合之后,每個分組最終返回的值,類型要和dataType 一致
28. override def evaluate(buffer: Row): Any = buffer.getInt(0)
29.}
30.
31.object Test {
32. def main(args: Array[String]): Unit = {
33. val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
34. val list = List[String]("zhangsan","lisi","wangwu","zhangsan","lisi","zhangsan")
35.
36. import session.implicits._
37. val frame = list.toDF("name")
38. frame.createTempView("mytable")
39.
40. session.udf.register("MyCount",new MyCount())
41.
42. val result = session.sql("select name,MyCount(name) from mytable group by name")
43. result.show()
44.
45. }
46.}
47.