欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame)

发布时间:2024/7/23 编程问答 41 豆豆
生活随笔 收集整理的这篇文章主要介绍了 spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame) 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

使用 saveAsHadoopDataset 写入数据

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}

import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapred.JobConf

//import org.apache.hadoop.mapreduce.Job

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.SparkSession

/**

* Created by blockchain on 18-9-9 下午3:45 in Beijing.

*/

object SparkHBaseRDD {

def main(args: Array[String]) {

// 屏蔽不必要的日志显示在终端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession.builder().appName("SparkHBase").getOrCreate()

val sc = spark.sparkContext

val tablename = "SparkHBase"

val hbaseConf = HBaseConfiguration.create()

hbaseConf.set("hbase.zookeeper.quorum","localhost") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置

hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181

hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)

//初始化job,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的

val jobConf = new JobConf(hbaseConf)

jobConf.setOutputFormat(classOf[TableOutputFormat])

val indataRDD = sc.makeRDD(Array("2,jack,16", "1,Lucy,15", "5,mike,17", "3,Lily,14"))

val rdd = indataRDD.map(_.split(',')).map{ arr=>

/*一个Put对象就是一行记录,在构造方法中指定主键

* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换

* Put.addColumn 方法接收三个参数:列族,列名,数据*/

val put = new Put(Bytes.toBytes(arr(0)))

put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))

put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(arr(2)))

(new ImmutableBytesWritable, put)

}

rdd.saveAsHadoopDataset(jobConf)

spark.stop()

}

}

使用 newAPIHadoopRDD 读取数据

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}

import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapred.JobConf

//import org.apache.hadoop.mapreduce.Job

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.SparkSession

/**

* Created by blockchain on 18-9-9 下午3:45 in Beijing.

*/

object SparkHBaseRDD {

def main(args: Array[String]) {

// 屏蔽不必要的日志显示在终端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession.builder().appName("SparkHBase").getOrCreate()

val sc = spark.sparkContext

val tablename = "SparkHBase"

val hbaseConf = HBaseConfiguration.create()

hbaseConf.set("hbase.zookeeper.quorum","localhost") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置

hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181

hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)

// 如果表不存在则创建表

val admin = new HBaseAdmin(hbaseConf)

if (!admin.isTableAvailable(tablename)) {

val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))

admin.createTable(tableDesc)

}

//读取数据并转化成rdd TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的

val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],

classOf[ImmutableBytesWritable],

classOf[Result])

hBaseRDD.foreach{ case (_ ,result) =>

//获取行键

val key = Bytes.toString(result.getRow)

//通过列族和列名获取列

val name = Bytes.toString(result.getValue("cf1".getBytes,"name".getBytes))

val age = Bytes.toString(result.getValue("cf1".getBytes,"age".getBytes))

println("Row key:"+key+" Name:"+name+" Age:"+age)

}

admin.close()

spark.stop()

}

}

Spark DataFrame 通过 Phoenix 读写 HBase

需要添加的依赖如下:

org.apache.phoenix

phoenix-core

${phoenix.version}

org.apache.phoenix

phoenix-spark

${phoenix.version}

下面老规矩,直接上代码。

package com.ai.spark

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.{SaveMode, SparkSession}

/**

* Created by blockchain on 18-9-9 下午8:33 in Beijing.

*/

object SparkHBaseDataFrame {

def main(args: Array[String]) {

// 屏蔽不必要的日志显示在终端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession.builder().appName("SparkHBaseDataFrame").getOrCreate()

val url = s"jdbc:phoenix:localhost:2181"

val dbtable = "PHOENIXTEST"

//spark 读取 phoenix 返回 DataFrame 的 第一种方式

val rdf = spark.read

.format("jdbc")

.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")

.option("url", url)

.option("dbtable", dbtable)

.load()

rdf.printSchema()

//spark 读取 phoenix 返回 DataFrame 的 第二种方式

val df = spark.read

.format("org.apache.phoenix.spark")

.options(Map("table" -> dbtable, "zkUrl" -> url))

.load()

df.printSchema()

//spark DataFrame 写入 phoenix,需要先建好表

df.write

.format("org.apache.phoenix.spark")

.mode(SaveMode.Overwrite)

.options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> url))

.save()

spark.stop()

}

}

参考链接:

总结

以上是生活随笔为你收集整理的spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame)的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。