HBase API 详细例子(封装的DAO类)
生活随笔
收集整理的这篇文章主要介绍了
HBase API 详细例子(封装的DAO类)
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
HBase中没有库的概念
HBase lib目录下所有JAR包复制到项目中,Hbase 版本0.98.5
package com.zxing.imgQRCode;import java.io.IOException; import java.util.LinkedList; import java.util.List;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;public class HbaseConnection {private String rootDir;private String zkServer;private String port;private Configuration conf;private HConnection hConn=null;public HbaseConnection(String rootDir, String zkServer, String port) {super();this.rootDir = rootDir;this.zkServer = zkServer;this.port = port;conf=HBaseConfiguration.create();conf.set("hbase.rootdir", rootDir);conf.set("hbase.zookeeper.quorum ", zkServer);conf.set("hbase.zookeeper.property.clientPort", port);try {hConn=HConnectionManager.createConnection(conf);} catch (IOException e) {e.printStackTrace();}}//创建表public void crateTable(String tableName,List<String> cols){try {HBaseAdmin admin=new HBaseAdmin(conf);if(admin.tableExists(tableName))throw new IOException("table exists");else{HTableDescriptor tableDesc=new HTableDescriptor(tableName);for(String col:cols){HColumnDescriptor colDesc=new HColumnDescriptor(col);colDesc.setCompressionType(Algorithm.GZ);colDesc.setDataBlockEncoding(DataBlockEncoding.DIFF);tableDesc.addFamily(colDesc);}admin.createTable(tableDesc);}} catch (MasterNotRunningException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (ZooKeeperConnectionException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}//插入数据public void saveData(String tableName,List<Put> puts){try {HTableInterface table =hConn.getTable(tableName);table.put(puts);table.setAutoFlush(false);table.flushCommits();} catch (IOException e) {e.printStackTrace();}}//得到数据public Result getData(String tableName,String rowkey){try {HTableInterface table =hConn.getTable(tableName);Get get=new Get(rowkey.getBytes());return table.get(get);} catch (IOException e) {e.printStackTrace();}return null;} //输出result结果public void format(Result result){String rowkey=Bytes.toString(result.getRow());KeyValue[] kvs=result.raw();for (KeyValue kv:kvs){String family= Bytes.toString(kv.getFamily());String qualifier= Bytes.toString(kv.getQualifier());System.out.println("rowkey->"+rowkey+"family->"+family+"qualifier->"+qualifier);}}//全表扫描public void hbaseScan(String tableName){Scan scan=new Scan();//扫描器scan.setCaching(1000);//缓存1000条数据,一次读取1000条try {HTableInterface table =hConn.getTable(tableName);ResultScanner scanner=table.getScanner(scan);//返回迭代器for(Result res:scanner){format(res);}} catch (IOException e) {e.printStackTrace();}}//比较过滤器public void filterTest(String tableName){Scan scan=new Scan();//扫描器scan.setCaching(1000);//缓存1000条数据,一次读取1000条RowFilter filter =new RowFilter(CompareFilter.CompareOp.EQUAL,new BinaryComparator("Jack".getBytes()));RowFilter filter1 =new RowFilter(CompareFilter.CompareOp.EQUAL,new RegexStringComparator("J\\w+"));scan.setFilter(filter);try {HTableInterface table =hConn.getTable(tableName);ResultScanner scanner=table.getScanner(scan);//返回迭代器for(Result res:scanner){format(res);}} catch (IOException e) {e.printStackTrace();}}//PageFilter分页public void pageFilterTest(String tableName){PageFilter filter = new PageFilter(4);byte[] lastRow=null;int pageCount=0; //记录第几页try {HTableInterface table =hConn.getTable(tableName);while(++pageCount>0){System.out.println("pageCount = "+ pageCount);Scan scan=new Scan();scan.setFilter(filter);if(lastRow!=null){scan.setStartRow(lastRow);}ResultScanner scanner=table.getScanner(scan);int count=0;//计数器for(Result res:scanner){lastRow=res.getRow();if(++count>3)break;format(res);if(count<3){break;}}}} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {String rootDir="hdfs://ns1/hbase";String zkServer="10.128.129.230";//集群内网IPString port="2181";//HbaseConnection conn=new HbaseConnection(rootDir, zkServer, port);List<String> cols=new LinkedList<String>();cols.add("basicInfo");cols.add("moreInfo");conn.crateTable("students", cols);//List<Put> puts=new LinkedList<Put>();Put put1=new Put("Tom".getBytes());//rowkeyput1.add("basicInfo".getBytes(), "age".getBytes(), "27".getBytes());put1.add("moreInfo".getBytes(), "tel".getBytes(), "110".getBytes());Put put2=new Put("Jim".getBytes());put2.add("basicInfo".getBytes(), "age".getBytes(), "28".getBytes());put2.add("moreInfo".getBytes(), "tel".getBytes(), "111".getBytes());puts.add(put1);puts.add(put2);conn.saveData("students", puts);//Result result= conn.getData("students", "Tom");conn.format(result);//conn.hbaseScan("students");//conn.filterTest("students");//conn.pageFilterTest("students");}}常用接口
package test;import hbase.HbaseUtils;import java.io.IOException; import java.util.Calendar; import java.util.Date; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.TimeUnit;import net.sf.json.JSONObject;import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test;import com.xd.iis.se.common.Constants; import com.xd.iis.se.hbase.CommHbaseUtils; import com.xd.iis.se.hbutils.MeUtils;import commn.CommonConstants;public class SyncTestUtils {//hbase表名(hbaseapi包中的Constants类中定义了表名和数字的映射关系) private final static String wz_content="wz_content";//1 private final static String lt_content="lt_content";//4 private final static String wb_content="wb_content";//2 private static final String wb_comment="wb_comment";//45private static final String sinawb_user="sinawb_user";// 微博用户表/* TitleHs的定义在hbaseapi包中SwitchBeanAndJsonUtils类中jsonToDocument方法里* * 从326行代码开始* * hbase表字段定义hbaseapi包中HIContentCommon类* * pfsearch包中IContentCommon类* */@Testpublic void hbaseTableNameToDigitalMapping() {for(Entry<String, String> entry: Constants.rstypemp.entrySet()){System.out.println(entry.getKey()+":"+entry.getValue());}}@Testpublic void seconds(){System.out.println(new Date().getTime());System.out.println(System.nanoTime());System.out.println(System.currentTimeMillis());//时间转换System.out.println(TimeUnit.NANOSECONDS.convert(10L, TimeUnit.MILLISECONDS));}//毫秒转换成日期@Testpublic void millsToDate(){String mills="1460459403324";Date date=new Date(Long.parseLong(mills));System.out.println(date);System.out.println(date.getTime());}//手工干预生成19位的全网微博评论tokenKey(键,rowkey)=wbcomment_key//TokenTable=hotmanwb_token/** hbase(main):003:0> scan 'hotmanwb_token',LIMIT=>2ROW COLUMN+CELL hotmanwb_key column=content:date, timestamp=1459310036375, value=1459310031972331086 hotmanwbcomment_key column=content:date, timestamp=1460600117890, value=1460600079542140091 ltcomment_key column=content:date, timestamp=1460600483717, value=1460600441668719114 wzcomment_key column=content:date, timestamp=1460599817280, value=1460599777817713930 */@Testpublic void generateTokenKeyForWeiboComment(){Calendar calendar = Calendar.getInstance();calendar.setTime(new Date());//三十天前的时间calendar.add(calendar.DATE, -30);Date date = calendar.getTime();//第一位表示星期System.out.println(date);//13位加6位拼成19位String startTime=date.getTime()+"000000";System.out.println("startTime:"+startTime);//插入或者更新时间HbaseUtils.insertData("hotmanwb_token", "wbcomment_key", startTime);//1458109165143000000 19位}//hbase根据表名和rowkey查询一条数据(tokenkey)@Testpublic void findByRowKey(){String startTime=HbaseUtils.QueryByCondition1("hotmanwb_token", "wbcomment_key");System.out.println(startTime);}//hbase返回前几条数据/*key:014604646505869913352145key:014604646505954866550445key:014604988869079841915645key:014605014502935460283945key:014605014503712711041745*/@Testpublic void scanTopRowComment(){ResultScanner resultScanner = null;HTableInterface table = HbaseUtils.pool.getTable(wz_content);try {Scan scan = new Scan();//设置过滤器,只返回20条Filter filter = new PageFilter(5); scan.setFilter(filter);//RegionServer是否应当缓存 当前客户端访问过的数据块 如果是随机的get 这个最好为falsescan.setCacheBlocks(true);/*简而言之就是 batch 是qualifier column级别的 caching是row级别的batch 就是每次迭代从服务器获取的记录数, 设置太小 会频繁到服务器取数据,太大 会对客户端造成比较大的压力, 具体根据需要使用 , 正常使用可以不必管它, 大批量读取可以考虑用它改善性能这里要注意了: 这个记录数是qualifier不是row, 如果一个row有17个qualifier,setBatch(5),一个row就会分散到4个Result中, 分别持有5,5,5,2个qualifier(默认一个row的所有qualifier会在一个Result中)*//*scan.setBatch(100);*/ //setFilter与setBatch不能都打开,会冲突//setCaching发给scanners的缓存的Row的数量scan.setCaching(100);scan.setMaxVersions(1);resultScanner = table.getScanner(scan);/* for (Result r : rs) {return new String(r.getRow());}*/Iterator<Result> res = resultScanner.iterator();// 返回查询遍历器while (res.hasNext()) {Result result = res.next();System.out.println(result);System.out.println("key:" + new String(result.getRow()));//date列存的是json字符串String value = new String(result.getValue(CommonConstants.CRAWLERCONTENT_TABLE_COLUMNS.getBytes(),CommonConstants.CRAWLERCONTENT_TABLE_COLUMN2.getBytes()), "ISO8859-1");System.out.println("value:" + value);JSONObject js = JSONObject.fromObject(value);System.out.println(js);}} catch (Exception e) {e.printStackTrace();}finally{//这样一定要记住 用完closeif(resultScanner!=null) resultScanner.close();}}//根据rowkey范围扫描@Testpublic void scanByRowKeyRangeComment(){ResultScanner resultScanner = null;HTableInterface table = HbaseUtils.pool.getTable(wb_comment);String startRow="01420459403324297147";//String stopRow="014605014503712711";//20位try {Scan scan = new Scan();//设置过滤器,只返回20条Filter filter = new PageFilter(5); scan.setFilter(filter);scan.setStartRow(startRow.getBytes());scan.setStopRow(stopRow.getBytes());//RegionServer是否应当缓存 当前客户端访问过的数据块 如果是随机的get 这个最好为falsescan.setCacheBlocks(true);/*简而言之就是 batch 是qualifier column级别的 caching是row级别的batch 就是每次迭代从服务器获取的记录数, 设置太小 会频繁到服务器取数据,太大 会对客户端造成比较大的压力, 具体根据需要使用 , 正常使用可以不必管它, 大批量读取可以考虑用它改善性能这里要注意了: 这个记录数是qualifier不是row, 如果一个row有17个qualifier,setBatch(5),一个row就会分散到4个Result中, 分别持有5,5,5,2个qualifier(默认一个row的所有qualifier会在一个Result中)*//*scan.setBatch(100);*/ //setFilter与setBatch不能都打开,会冲突//setCaching发给scanners的缓存的Row的数量scan.setCaching(100);scan.setMaxVersions(1);resultScanner = table.getScanner(scan);/* for (Result r : rs) {return new String(r.getRow());}*/Iterator<Result> res = resultScanner.iterator();// 返回查询遍历器while (res.hasNext()) {Result result = res.next();System.out.println(result);System.out.println("key:" + new String(result.getRow()));//date列存的是json字符串String value = new String(result.getValue(CommonConstants.CRAWLERCONTENT_TABLE_COLUMNS.getBytes(),CommonConstants.CRAWLERCONTENT_TABLE_COLUMN2.getBytes()), "ISO8859-1");System.out.println("value:" + value);JSONObject js = JSONObject.fromObject(value);System.out.println(js);}} catch (Exception e) {e.printStackTrace();}finally{//这样一定要记住 用完closeif(resultScanner!=null) resultScanner.close();}}@Test//hbase生成行键(hbaseApi包) 第一个url参数无用public void createRowKey(){//typemp.put("wb_comment", "45");// 微博评论表对应编码最后两位String newRowKey=MeUtils.createKeyCode("", "wb_comment");System.out.println(newRowKey);/*String oldRowKey=MeUtils.createKeyCode_oid("http://www.baidu.com", "wb_comment");System.out.println(oldRowKey);*///rowkey=114606860784008157170445 24位//1+19位时间戳+2位随机数+2位表名}/*TimestampHBase通过row和column确定一份数据,这份数据的值可能有多个版本,不同版本的值按照时间倒序排序,即最新的数据排在最前面,查询时默认返回最新版本。如上例中row key=1的author:nickname值有两个版本,分别为1317180070811对应的“一叶渡江”和1317180718830对应的“yedu”(对应到实际业务可以理解为在某时刻修改了nickname为yedu,但旧值仍然存在)。Timestamp默认为系统当前时间(精确到毫秒),也可以在写入数据时指定该值。Value每个值通过4个键唯一索引,tableName+RowKey+ColumnKey+Timestamp=>value,例如上例中{tableName=’blog’,RowKey=’1’,ColumnName=’author:nickname’,Timestamp=’ 1317180718830’}索引到的唯一值是“yedu”。*//*大Solr(192.168.20.190对应三个域名)# 24 indexsolr_24h=http://solr-24h.wyq.cn/solr# month indexsolr_month=http://solr-month.wyq.cn/solr# week indexsolr_week=http://solr-week.wyq.cn/solr*/ @Testpublic void scanTopRowContent(){ResultScanner resultScanner = null;HTableInterface table = HbaseUtils.pool.getTable(wz_content);try {Scan scan = new Scan();//设置过滤器,只返回20条Filter filter = new PageFilter(5); scan.setFilter(filter);//RegionServer是否应当缓存 当前客户端访问过的数据块 如果是随机的get 这个最好为falsescan.setCacheBlocks(true);/*简而言之就是 batch 是qualifier column级别的 caching是row级别的batch 就是每次迭代从服务器获取的记录数, 设置太小 会频繁到服务器取数据,太大 会对客户端造成比较大的压力, 具体根据需要使用 , 正常使用可以不必管它, 大批量读取可以考虑用它改善性能这里要注意了: 这个记录数是qualifier不是row, 如果一个row有17个qualifier,setBatch(5),一个row就会分散到4个Result中, 分别持有5,5,5,2个qualifier(默认一个row的所有qualifier会在一个Result中)*//*scan.setBatch(100);*/ //setFilter与setBatch不能都打开,会冲突//setCaching发给scanners的缓存的Row的数量scan.setCaching(100);scan.setMaxVersions(1);resultScanner = table.getScanner(scan);// 返回查询遍历器Iterator<Result> res = resultScanner.iterator();while (res.hasNext()) {System.out.println("--------------行分割线-------------");Result result = res.next();System.out.println("\n"+"------单个result--------");System.out.println(result);System.out.println("\n"+"------result中Cells--------");//由{row key, Family:Qualifier, version} 唯一确定的单元。cell中的数据是没有类型的,全部是以字节的形式进行存储的for (Cell cell : result.rawCells()) {//rowkeySystem.out.println("Rowkey : " +Bytes.toString (CellUtil.cloneRow(cell)));//列簇+列(Family是第一级列,Qualifier是第二级列)System.out.println("Familiy:Quilifier : " +Bytes.toString (CellUtil.cloneFamily(cell))+":"+Bytes.toString (CellUtil.cloneQualifier (cell))); //值System.out.println ("Value : " +Bytes.toString (CellUtil.cloneValue (cell)));System.out.println("TimeStamp : " +cell.getTimestamp());}/* //老APISystem.out.println("\n"+ "------result中KeyValues--------"); for( KeyValue kv:result.list()){ System.out.println(String.format("row:%s, family:%s, qualifier:%s, qualifiervalue:%s, timestamp:%s.", Bytes.toString(kv.getRow()), Bytes.toString(kv.getFamily()), Bytes.toString(kv.getQualifier()), Bytes.toString(kv.getValue()), kv.getTimestamp())); } */ }} catch (Exception e) {e.printStackTrace();}finally{//这样一定要记住 用完closeif(resultScanner!=null) resultScanner.close();}}//SecureCRT上传下载文件//sz 下载命令//rz -be 上传文件 单独用rz会有两个问题:上传中断、上传文件变化(md5不同),/*解决办法是上传是用rz -be,并且去掉弹出的对话框中“Upload files as ASCII”前的勾选。-a, –ascii-b, –binary 用binary的方式上传下载,不解释字符为ascii-e, –escape 强制escape 所有控制字符,比如Ctrl+x,DEL等rar,gif等文件文件采用 -b 用binary的方式上传。文件比较大而上传出错的话,采用参数 -e*///根据rowkey查找数据@Testpublic void select(){String ID="114615497672016941968326";try {String json=CommHbaseUtils.select(ID);System.out.println(json);JSONObject js = JSONObject.fromObject(json);System.out.println(js);} catch (IOException e) {e.printStackTrace();}}/*//血和泪的经验教训ArrayList非线程安全,即使使用Collections.synchronizedList(new ArrayList<SolrInputDocument>())访问方法size()方法得出来的大小也是错的,还是推荐使用vector代替//因为solrserver服务的url配置文件pfs.properties未打包进去,找不到url发生空指针异常ScheduledExecutorService对于线程中发生的http服务方面的异常无法捕获,jstack -l命令打印信息java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method)- parking to wait for <0x0000000712e8e7e8> (a 解决方案:异常可以替代使用Timer定时器来捕获*//*修改properties文件编码* 全局修改:* window-> preference -> general -> content types 找到右边的 java properties file ,将其编码改为 utf-8 单个文件修改:右击该properties文件--properties--Resource--Text file encoding,选中other,选择其它编码方式,如UTF-8或GBK,这样就能在properties里面输入中文,而不会自动转成Unicode了。* */}本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1698822
总结
以上是生活随笔为你收集整理的HBase API 详细例子(封装的DAO类)的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: Linux下的tr编辑器命令详解
- 下一篇: js事件监听器用法实例详解