欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程语言 > java >内容正文

java

spark rdd map java_如何在spark RDD(JavaRDD)中获取记录的文件名

发布时间:2025/4/17 java 70 豆豆
生活随笔 收集整理的这篇文章主要介绍了 spark rdd map java_如何在spark RDD(JavaRDD)中获取记录的文件名 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

我正在使用多个文件加载到

JavaRDD中

JavaRDD allLines = sc.textFile(hdfs://path/*.csv);

加载文件后,我修改每条记录并想保存它们.但是,我还需要将原始文件名(ID)与记录一起保存以供将来参考.无论如何我可以从RDD中的单个记录中获取原始文件名吗?

谢谢

您可以尝试执行以下代码段中的操作:

JavaPairRDD javaPairRDD = sc.newAPIHadoopFile(

"hdfs://path/*.csv",

TextInputFormat.class,

LongWritable.class,

Text.class,

new Configuration()

);

JavaNewHadoopRDD hadoopRDD = (JavaNewHadoopRDD) javaPairRDD;

JavaRDD> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, lines) -> {

FileSplit fileSplit = (FileSplit) inputSplit;

String fileName = fileSplit.getPath().getName();

Stream> stream =

StreamSupport.stream(Spliterators.spliteratorUnknownSize(lines, Spliterator.ORDERED), false)

.map(line -> {

String lineText = line._2().toString();

// emit file name as key and line as a value

return new Tuple2(fileName, lineText);

});

return stream.iterator();

}, true);

更新(适用于java7)

JavaRDD> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit(

new Function2>, Iterator>>() {

@Override

public Iterator> call(InputSplit inputSplit, final Iterator> lines) throws Exception {

FileSplit fileSplit = (FileSplit) inputSplit;

final String fileName = fileSplit.getPath().getName();

return new Iterator>() {

@Override

public boolean hasNext() {

return lines.hasNext();

}

@Override

public Tuple2 next() {

Tuple2 entry = lines.next();

return new Tuple2(fileName, entry._2().toString());

}

};

}

},

true

);

总结

以上是生活随笔为你收集整理的spark rdd map java_如何在spark RDD(JavaRDD)中获取记录的文件名的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。