欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

清洗弹幕数据,去不相关的列和空值,MapReduce

发布时间:2024/2/28 编程问答 41 豆豆
生活随笔 收集整理的这篇文章主要介绍了 清洗弹幕数据,去不相关的列和空值,MapReduce 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

原始数据:

话不多说,直接上代码!

老样子先pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.henu</groupId><artifactId>ETL</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-common</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-client</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-server-common</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-server-resourcemanager</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-server-nodemanager</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-shuffle</artifactId><version>2.6.0</version></dependency></dependencies></project>

ETLUtil

package com.wc;/*** @author George* @description etl工具类**/ public class ETLUtil {public static String oriString2ETLString(String ori){StringBuilder etlString = new StringBuilder();if (ori.startsWith("0")) {String[] splits = ori.split("\t");for (String split : splits) {if (!"25".equals(split) && !"".equals(split)) {etlString.append(split + "#");}}}return etlString.toString();} /*public static void main(String[] args) throws IOException {BufferedReader br = new BufferedReader(new FileReader("./data/test"));String str = "";while ((str = br.readLine())!=null){String string = oriString2ETLString(str);System.out.println(string);}br.close();}*/ }

BSETLMapper

package com.wc;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author George* @description map阶段**/ public class BSETLMapper extends Mapper<Object, Text, NullWritable,Text> {Text text = new Text();@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {String etlString = ETLUtil.oriString2ETLString(value.toString());//检查字符串是否为空白、空("")或nullif (StringUtils.isBlank(etlString))return;text.set(etlString);context.write(NullWritable.get(),text);} }

BSETLRunner

 

package com.wc; import com.AccountRegisterETL.AccountRegisterETLMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/*** @author George* @description**/ public class BSETLRunner implements Tool {private Configuration conf = null;public void setConf(Configuration conf) {this.conf = conf;}public Configuration getConf() {return this.conf;}public int run(String[] args) throws Exception {conf = this.getConf();conf.set("inpath", args[0]);conf.set("outpath", args[1]);Job job = Job.getInstance(conf);job.setJarByClass(BSETLRunner.class);job.setMapperClass(BSETLMapper.class);job.setMapOutputKeyClass(NullWritable.class);job.setMapOutputValueClass(Text.class);job.setNumReduceTasks(0);this.initJobInputPath(job);this.initJobOutputPath(job);return job.waitForCompletion(true) ? 0 : 1;}private void initJobOutputPath(Job job) throws IOException {Configuration conf = job.getConfiguration();String outPathString = conf.get("outpath");FileSystem fs = FileSystem.get(conf);Path outPath = new Path(outPathString);if(fs.exists(outPath)){fs.delete(outPath, true);}FileOutputFormat.setOutputPath(job, outPath);}private void initJobInputPath(Job job) throws IOException {Configuration conf = job.getConfiguration();String inPathString = conf.get("inpath");FileSystem fs = FileSystem.get(conf);Path inPath = new Path(inPathString);if(fs.exists(inPath)){FileInputFormat.addInputPath(job, inPath);}else{throw new RuntimeException("HDFS中该文件目录不存在:" + inPathString);}}public static void main(String[] args) {try {int resultCode = ToolRunner.run(new BSETLRunner(), args);if(resultCode == 0){System.out.println("Success!");}else{System.out.println("Fail!");}System.exit(resultCode);} catch (Exception e) {e.printStackTrace();System.exit(1);}} }

启动集群!!! 

上传jar包,

上传数据:

[root@henu2 ~]# hdfs dfs -put data.txt /

运行jar包: 

[root@henu2 ~]# hdfs dfs -mkdir /out [root@henu2 ~]# yarn jar ETL-1.0-SNAPSHOT.jar com.wc.BSETLRunner /data.txt /out/

得到结果数据:

结果展示:

 

总结

以上是生活随笔为你收集整理的清洗弹幕数据,去不相关的列和空值,MapReduce的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。