Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四)
生活随笔
收集整理的这篇文章主要介绍了
Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四)
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
由于Hive不在本地,操作略显麻烦。不过细心一点,分析错误,也还好,如果你搭建的hadoop是HA,需要多注意:
这里指出一个错误,如果你报了同类错误,可以参考:https://georgedage.blog.csdn.net/article/details/103086882
读取Hive中的数据加载成DataFrame/DataSet
- HiveContext是SQLContext的子类,连接Hive建议使用HiveContext。
- 由于本地没有Hive环境,要提交到集群运行,提交命令:
javaAPI:
package com.henu;import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.hive.HiveContext;/*** @author George* @description* 读取Hive中的数据加载成DataFrame**/ public class HiveDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("hive");SparkContext sc = new SparkContext(conf);//HiveContext是SQLContext的子类。/*** 友情提示,在2.3.1中HiveContext被* SparkSession.builder.enableHiveSupport* 所替代*/HiveContext hiveContext = new HiveContext(sc);hiveContext.sql("use spark");hiveContext.sql("drop table if exists student_infos");//在hive中创建student_infos表hiveContext.sql("create table if not exists student_infos(name String,age Int)" +"row format delimited fields terminated by ' '");hiveContext.sql("load data local inpath '/root/test/student_info' into table student_infos");hiveContext.sql("drop table if exists student_scores");hiveContext.sql("create table if not exists student_scores (name String,score Int)" +"row format delimited fields terminated by ' '");hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores");/*** 查询表生成Dataset*/Dataset<Row> dataset = hiveContext.sql("SELECT si.name, si.age, ss.score "+ "FROM student_infos si "+ "JOIN student_scores ss "+ "ON si.name=ss.name "+ "WHERE ss.score>=80"); // dataset.show();hiveContext.sql("drop table if exists good_student_infos");dataset.registerTempTable("goodstudent"); // Dataset<Row> sql = hiveContext.sql("create table good_student_infos as select * from goodstudent");Dataset<Row> sql = hiveContext.sql("select * from goodstudent");sql.show();/*** 将结果保存到hive表 good_student_infos*/dataset.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");sc.stop();} }将其打包放在linux上执行:
根据代码,注意本地的数据源文件:
然后别忘了启动hadoop集群,
然后提交命令。【会有点慢】
然后进入hive中查看是否成功:
0: jdbc:hive2://henu2:10000> use spark; No rows affected (0.35 seconds) 0: jdbc:hive2://henu2:10000> show tables; +-----------------+--+ | tab_name | +-----------------+--+ | student_infos | | student_scores | +-----------------+--+ 2 rows selected (0.81 seconds) 0: jdbc:hive2://henu2:10000> select * from student_infos; +---------------------+--------------------+--+ | student_infos.name | student_infos.age | +---------------------+--------------------+--+ | George | 22 | | kangkang | 20 | | GeorgeDage | 28 | | limu | 1 | +---------------------+--------------------+--+ 4 rows selected (0.967 seconds)scalaAPI:
package com.henuimport org.apache.spark.sql.SaveMode import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext}object HiveScalaDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("hive")val sc = new SparkContext(conf)val hiveContext = new HiveContext(sc)hiveContext.sql("use spark")hiveContext.sql("drop table if exists student_infos")//在hive中创建student_infos表hiveContext.sql("create table if not exists student_infos(name String,age Int)" +"row format delimited fields terminated by ' '")hiveContext.sql("load data local inpath '/root/test/student_info' into table student_infos")hiveContext.sql("drop table if exists student_scores")hiveContext.sql("create table if not exists student_scores (name String,score Int)" +"row format delimited fields terminated by ' '")hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores")val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")hiveContext.sql("drop table if exists good_student_infos")/*** 将结果写入到hive表中**/df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")sc.stop()} }说实话,那个新表我不知道,它在哪里,就是结果写入hive表。【没成功】,欢迎指正
然后我就想开了个办法:
修改了代码:
package com.henu;import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.hive.HiveContext;/*** @author George* @description* 读取Hive中的数据加载成DataFrame**/ public class HiveDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("hive");SparkContext sc = new SparkContext(conf);//HiveContext是SQLContext的子类。/*** 友情提示,在2.3.1中HiveContext被* SparkSession.builder.enableHiveSupport* 所替代*/HiveContext hiveContext = new HiveContext(sc);hiveContext.sql("use spark");hiveContext.sql("drop table if exists student_infos");//在hive中创建student_infos表hiveContext.sql("create table if not exists student_infos(name String,age Int)" +"row format delimited fields terminated by ' '");hiveContext.sql("load data local inpath '/root/test/student_info' into table student_infos");hiveContext.sql("drop table if exists student_scores");hiveContext.sql("create table if not exists student_scores (name String,score Int)" +"row format delimited fields terminated by ' '");hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores");/*** 查询表生成Dataset*/Dataset<Row> dataset = hiveContext.sql("SELECT si.name, si.age, ss.score "+ "FROM student_infos si "+ "JOIN student_scores ss "+ "ON si.name=ss.name "+ "WHERE ss.score>=80"); // dataset.show();hiveContext.sql("drop table if exists good_student_infos");dataset.registerTempTable("goodstudent"); // Dataset<Row> sql = hiveContext.sql("create table good_student_infos as select * from goodstudent");Dataset<Row> sql = hiveContext.sql("create table good_student_infos as select * from goodstudent");sql.show();/*** 将结果保存到hive表 good_student_infos*/dataset.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");sc.stop();} }也就是这一句:
Dataset<Row> sql = hiveContext.sql("create table good_student_infos as select * from goodstudent");貌似已经成功了:
再去查一下表:然而!!!
0: jdbc:hive2://henu2:10000> show tables; +---------------------+--+ | tab_name | +---------------------+--+ | good_student_infos | | student_infos | | student_scores | +---------------------+--+ 3 rows selected (0.653 seconds) 0: jdbc:hive2://henu2:10000> select * from good_student_infos; +--------------------------+-------------------------+---------------------------+--+ | good_student_infos.name | good_student_infos.age | good_student_infos.score | +--------------------------+-------------------------+---------------------------+--+ +--------------------------+-------------------------+---------------------------+--+但最起码表生成了,只是插入数据的问题,
再做修改:
就不上代码了,因为还是没导进去,下次见。。。
总结
以上是生活随笔为你收集整理的Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四)的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: Spark _24 _读取JDBC中的数
- 下一篇: idea提交spark任务,内存不足,指