欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

Spark _27_自定义函数UDF和UDAF

发布时间:2024/2/28 编程问答 53 豆豆
生活随笔 收集整理的这篇文章主要介绍了 Spark _27_自定义函数UDF和UDAF 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

UDF:用户自定义函数。

可以自定义类实现UDFX接口。

javaAPI:

package com.udf;import javafx.scene.chart.PieChart; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.api.java.UDF2; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;import java.util.ArrayList; import java.util.Arrays; import java.util.List;/*** @author George* @description**/ public class Udf {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setAppName("udf");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> rdd = sc.parallelize(Arrays.asList("George","GeorgeDage","kangkang"));JavaRDD<Row> map = rdd.map(new Function<String, Row>() {public Row call(String v1) throws Exception {return RowFactory.create(v1);}});List<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);Dataset<Row> df = sqlContext.createDataFrame(map, schema);df.show();/*** +----------+* | name|* +----------+* | George|* |GeorgeDage|* | kangkang|* +----------+*/df.registerTempTable("user");/*** 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1,UDF2。。。。UDF1xxx*/sqlContext.udf().register("StrLen", new UDF1<String, Integer>() {public Integer call(String s) throws Exception {return s.length();}},DataTypes.IntegerType);sqlContext.sql("select name,StrLen(name) as length from user").show();/*** +----------+------+* | name|length|* +----------+------+* | George| 6|* |GeorgeDage| 10|* | kangkang| 8|* +----------+------+*/sqlContext.udf().register("StrLen", new UDF2<String, Integer, Integer>() {public Integer call(String s, Integer integer) throws Exception {return s.length()+integer;}}, DataTypes.IntegerType);sqlContext.sql("select name,StrLen(name,10) as length from user").show();/*** +----------+------+* | name|length|* +----------+------+* | George| 16|* |GeorgeDage| 20|* | kangkang| 18|* +----------+------+*/sc.stop();} }

scalaAPI:

package com.udfimport org.apache.spark.sql.SparkSession/*** UDF用户自定义函数*/ object UdfScalaDemo {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local").appName("udf").getOrCreate()val list = List[String]("George","lucy","kk","lmdhk")import sparkSession.implicits._val frame = list.toDF("name")frame.createOrReplaceTempView("students")frame.show()/*** +------+* | name|* +------+* |George|* | lucy|* | kk|* | lmdhk|* +------+*/sparkSession.udf.register("STRLEN",(n:String)=>{n.length})sparkSession.sql("select name,STRLEN(name) as length from students sort by length desc").show(100)/*** +------+------+* | name|length|* +------+------+* |George| 6|* | lmdhk| 5|* | lucy| 4|* | kk| 2|* +------+------+*/sparkSession.stop()} }

 

UDAF:用户自定义聚合函数。

  • 实现UDAF函数如果要自定义类要继承UserDefinedAggregateFunction类

javaAPI:

package com.udf;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;import java.util.ArrayList; import java.util.Arrays;/*** @author George* @description*用户自定义聚合函数。*实现UDAF函数如果要自定义类要继承UserDefinedAggregateFunction类**/ public class Udaf {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("udaf");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("George", "kangkang", "GeorgeDage", "limu","George","GeorgeDage"));JavaRDD<Row> map = parallelize.map(new Function<String, Row>() {public Row call(String v1) throws Exception {return RowFactory.create(v1);}});ArrayList<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);Dataset<Row> frame = sqlContext.createDataFrame(map, schema);frame.show();/*** +----------+* | name|* +----------+* | George|* | kangkang|* |GeorgeDage|* | limu|* +----------+*/frame.registerTempTable("user");/*** 注册一个UDAF函数,实现统计相同值得个数* 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的*/sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {/*** 指定输入字段的字段及类型*/@Overridepublic StructType inputSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name",DataTypes.StringType, true)));}@Overridepublic DataType dataType() {return DataTypes.IntegerType;}@Overridepublic boolean deterministic() {return true;}/*** 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑* buffer.getInt(0)获取的是上一次聚合后的值* 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合* 大聚和发生在reduce端.* 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算*/@Overridepublic void update(MutableAggregationBuffer buffer, Row input) {buffer.update(0,buffer.getInt(0)+1);}@Overridepublic StructType bufferSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)));}/*** 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来* buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值* buffer2.getInt(0) : 这次计算传入进来的update的结果* 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0,buffer1.getInt(0) + buffer2.getInt(0));}/*** 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);}/*** 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果*/@Overridepublic Object evaluate(Row buffer) {return buffer.getInt(0);}});sqlContext.sql("select name ,StringCount(name) from user group by name").show();/*** +----------+------+* | name|(name)|* +----------+------+* | limu| 1|* | George| 2|* |GeorgeDage| 2|* | kangkang| 1|* +----------+------+*/sc.stop();} }

scalaAPI:

package com.udfimport org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._class MyUDAF extends UserDefinedAggregateFunction{// 聚合操作时,所处理的数据的类型def bufferSchema: StructType = {DataTypes.createStructType(Array(DataTypes.createStructField("aaa",IntegerType, true)))}// 最终函数返回值的类型def dataType: DataType = {DataTypes.IntegerType}def deterministic: Boolean = {true}// 最后返回一个最终的聚合值 要和dataType的类型一一对应def evaluate(buffer: Row): Any = {buffer.getAs[Int](0)}// 为每个分组的数据执行初始化值def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0}//输入数据的类型def inputSchema: StructType = {DataTypes.createStructType(Array(DataTypes.createStructField("input", StringType, true)))}// 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getAs[Int](0)+buffer2.getAs[Int](0)}// 每个组,有新的值进来的时候,进行分组对应的聚合值的计算def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getAs[Int](0)+1} }

 

package com.udfimport org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.UserDefinedAggregateFunctionobject UdafScalaDemo {def main(args: Array[String]): Unit = {val session = SparkSession.builder().master("local").appName("udaf").getOrCreate()val list = List[String]("George","lucy","kk","lmdhk","kk")import session.implicits._val frame = list.toDF("name")frame.createOrReplaceTempView("students")/*** 注册UDAF函数*/session.udf.register("NAMECOUNT",new MyUDAF())session.sql("select name,NAMECOUNT(name) as count from students group by name").show(100)/*** +------+-----+* | name|count|* +------+-----+* | lucy| 1|* | kk| 2|* |George| 1|* | lmdhk| 1|* +------+-----+*/session.stop()} }

图解UDAF:

总结

以上是生活随笔为你收集整理的Spark _27_自定义函数UDF和UDAF的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。