spark rdd map java_如何在spark RDD(JavaRDD)中获取记录的文件名
我正在使用多个文件加载到
JavaRDD中
JavaRDD allLines = sc.textFile(hdfs://path/*.csv);
加载文件后,我修改每条记录并想保存它们.但是,我还需要将原始文件名(ID)与记录一起保存以供将来参考.无论如何我可以从RDD中的单个记录中获取原始文件名吗?
谢谢
您可以尝试执行以下代码段中的操作:
JavaPairRDD javaPairRDD = sc.newAPIHadoopFile(
"hdfs://path/*.csv",
TextInputFormat.class,
LongWritable.class,
Text.class,
new Configuration()
);
JavaNewHadoopRDD hadoopRDD = (JavaNewHadoopRDD) javaPairRDD;
JavaRDD> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, lines) -> {
FileSplit fileSplit = (FileSplit) inputSplit;
String fileName = fileSplit.getPath().getName();
Stream> stream =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(lines, Spliterator.ORDERED), false)
.map(line -> {
String lineText = line._2().toString();
// emit file name as key and line as a value
return new Tuple2(fileName, lineText);
});
return stream.iterator();
}, true);
更新(适用于java7)
JavaRDD> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit(
new Function2>, Iterator>>() {
@Override
public Iterator> call(InputSplit inputSplit, final Iterator> lines) throws Exception {
FileSplit fileSplit = (FileSplit) inputSplit;
final String fileName = fileSplit.getPath().getName();
return new Iterator>() {
@Override
public boolean hasNext() {
return lines.hasNext();
}
@Override
public Tuple2 next() {
Tuple2 entry = lines.next();
return new Tuple2(fileName, entry._2().toString());
}
};
}
},
true
);
总结
以上是生活随笔为你收集整理的spark rdd map java_如何在spark RDD(JavaRDD)中获取记录的文件名的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: java outofmemory jsp
- 下一篇: 算术运算符举例java_Java的算术运