HBase Java API 代码开发
1. API 介紹
幾個主要 HBase API 類和數據模型之間的對應關系:
| Java 類 | HBase 數據模型 |
| Admin | 數據庫(Database) |
| HBaseConfiguration | |
| Table | 表(Table) |
| HTableDescriptor | 列簇(Column Family) |
| HColumnDescriptor | |
| Put | 列修飾符(Column Qualifier) |
| Get | |
| Delete | |
| Result | |
| Scan | |
| ResultScanner |
?
1.1 Admin
關系:org.apache.hadoop.hbase.client.Admin
作用:提供了一個接口來管理 HBase 數據庫的表信息。它提供的方法包括:創建表、刪除表、列出表項,使表有效或無效,以及添加或刪除表列簇成員等。
| 返回值 | 函數 | 描述 |
| void | addColumn(TableName tableName, HColumnDescriptor column) | 向一個已經存在的表添加列 |
| checkHBaseAvaliable(HBaseConfiguration conf) | 靜態函數,查看 HBase 是否處于運行狀態 | |
| createTable(HTableDescriptor desc) | 創建一個表,同步操作 | |
| deleteTable(TableName tableName) | 刪除一個已經存在的表 | |
| enabelTable(TableName tableName) | 使表處于有效狀態 | |
| disableTable(TableName tableName) | 使表處于無效狀態(在刪除一個表時候,要先使這個表處于無效狀態) | |
| HTableDescriptor[] | listTables() | 列出所有用戶控件表項 |
| void | modifyTable(byte[] tableName, HTableDescriptor htd) | 修改表的模式,是異步的操作,可能需要花費一定的時間 |
| boolean | tableExists(String tableName) | 檢查表是否存在 |
?
1.2 HBaseConfiguration
關系:org.apache.hadoop.hbase.HBaseConfiguration
作用:對 HBase 進行配置
| 返回值 | 函數 | 描述 |
| void | addResource(Path file) | 通過給定的路徑所指的文件來添加資源 |
| void | clear() | 清空所有已設置的屬性 |
| String | get(String name) | 獲取屬性名對應的值 |
| String | getBoolean(String name, boolean defaultValue) | 獲取為 boolean 類型的屬性值,如果其屬性值類型不為 boolean,則返回默認屬性值 |
| void | set(String name, String value) | 通過屬性名來設置值 |
| void | setBoolean(String name, boolean value) | 設置 boolean 類型的屬性值 |
?
1.3?HTableDescriptor
關系:org.apache.hadoop.hbase.HTableDescriptor
作用:包含了表的名字及其對應表的列簇
| 返回值 | 函數 | 描述 |
| void | addFamily(HColumnDescriptor family) | 添加一個列簇 |
| HColumnDescriptor | removeFamily(byte[] column) | 移除一個列簇 |
| byte[] | getName() | 獲取表的名字 |
| byte[] | getValue(byte[] key) | 獲取屬性的值 |
| void | setValue(String key, String value) | 設置屬性的值 |
?
1.4?HColumnDescriptor
關系:org.apache.hadoop.hbase.HColumnDescriptor
作用:維護著關于列簇的信息,例如版本號,壓縮設置等。它通常在創建表或者為表添加列簇的時候使用。列簇被創建后不能直接修改,只能通過刪除然后重新創建的方式。列簇被刪除的時候,列簇里面的數據也同時被刪除。
| 返回值 | 函數 | 描述 |
| byte[] | getName() | 獲取列簇的名字 |
| byte[] | getValue(byte[] key) | 獲取對應的屬性的值 |
| void | setValue(String key, String value) | 設置對應屬性的值 |
?
1.5 Table
關系:org.apache.hadoop.hbase.client.Table
作用:可以用來和 HBase 表來直接通信,此方法對于更新操作來說是非線程安全的。
| 返回值 | 函數 | 描述 |
| void | checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) | 自動的檢查 row, family, qualifier 是否與給定的值匹配 |
| void | close() | 釋放所有的資源或掛起內部的緩沖區中的更新 |
| boolean | exists(Get get) | 檢查 Get 實例所指定的值是否存在于 HTable 的列中 |
| Result | get(Get get) | 獲取指定行的某些單元格所對應的值 |
| byte[][] | getEndKeys() | 獲取當前打開的表每個區域的結束鍵值 |
| ResultScanner | getScanner(byte[] family) | 獲取當前給定列簇的 Scanner 實例 |
| HTableDescriptor | getTableDescriptor() | 獲取當前表的 HTableDescriptor 實例 |
| byte[] | getTableName() | 獲取表名 |
| static boolean | isTableEnabled(TableName tableName) | 檢查表是否有效 |
| void | put(Put put) | 向表中添加值 |
?
1.6 Put
關系:org.apache.hadoop.hbase.client.Put
作用:用來對單個行執行添加操作
| 返回值 | 函數 | 描述 |
| Put | add(byte[] family, byte[] qualifier, byte[] value) | 將指定的列和對應的值添加到 Put 實例中 |
| Put | add(byte[] family, byte[] qualifier, long timeStamp, byte[] value) | 將指定的列和對應的值及時間戳添加到 Put 實例中 |
| byte[] | getRow() | 獲取 Put 實例的行 |
| RowLock | getRowLock() | 獲取 Put 實例的行鎖 |
| long | getTimeStamp() | 獲取 Put 實例的時間戳 |
| boolean | isEmpty() | 檢查 familyMap 是否為空 |
| Put | setTimeStamp(long timeStamp) | 設置 Put 實例的時間戳 |
用法示例:
Table table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName)); Put p = new Put(row); // 為指定行創建一個 Put 操作 put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column), Bytes.toBytes(value)); table.put(p);?
1.7?Get
關系:org.apache.hadoop.hbase.client.Get
作用:用來獲取單個行的相關信息
| 返回值 | 函數 | 描述 |
| Get | addColumn(byte[] family, byte[] qualifier) | 獲取指定列簇和列修飾符對應的列 |
| Get | addFamily(byte[] family) | 通過指定的列簇獲取其對應列的所有列 |
| Get | setTimeRange(long minStamp, long maxStamp) | 獲取指定范圍的列的版本號 |
| Get | setFilter(Filter filter) | 當執行 Get 操作時設置服務器端的過濾器 |
用法示例:
Table table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(row));?
1.8 Delete
關系:org.apache.hadoop.hbase.client.Delete
作用:用來封裝一個要刪除的信息
?
1.9 Scan
關系:org.apache.hadoop.hbase.client.Scan
作用:用來封裝一個作為查詢條件的信息
?
1.10 Result
關系:org.apache.hadoop.hbase.client.Result
作用:存儲 Get 或 Scan 操作后獲取表的單行值。使用此類提供的方法可以直接獲取值或各種 Map 結構(Key-Value 對)
| 返回值 | 函數 | 描述 |
| boolean | containsColumn(byte[] family, byte[] qualifier) | 檢查指定的列是否存在 |
| NavigableMap<byte[], byte[]> | getFamilyMap(byte[] family) | 獲取對應列簇所包含的修飾符與值的鍵值對 |
| byte[] | getValue(byte[] family, byte[] qualifier) | 獲取對應列的最新值 |
?
1.11 ResultScanner
關系:org.apache.hadoop.hbase.client.ResultScanner
作用:存儲 Scan 操作后獲取表的單行值
?
2. 基本增刪改查的代碼實現
Note: 切記,在將 string 類型轉換為 byte[] 時,要使用 HBase 提供的工具類 Bytes.toBytes() 方法來轉換。同樣,在將 byte[] 轉換為 string 類型時,要使用 Bytes.toString() 方法來進行轉換操作。
package cn.gldwolf.hbase;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException; import java.io.UnsupportedEncodingException;/*** @author: Gldwolf* @email: ZengqiangZhao@sina.com* @date: 2019/6/20 10:41*/public class HBaseDemo {public static Configuration conf;private static final String ZK_CONNECT_STR = "hdp01:2181,hdp02:2181,hdp03:2181";static {conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", ZK_CONNECT_STR);}/*** 創建表* @param tableName 表名* @param family 列簇列表* @throws IOException*/public static void createTable(String tableName, String[] family) throws IOException {Admin admin = ConnectionFactory.createConnection(conf).getAdmin();TableName tbName = TableName.valueOf(tableName); // 表的名稱HTableDescriptor desc = new HTableDescriptor(tbName); // 創建一個表的描述信息對象for (int i = 0; i < family.length; i++) {desc.addFamily(new HColumnDescriptor(family[i])); // 將表的列簇信息添加到 HTableDescriptor 中}if (admin.tableExists(tbName)) { // 判斷表是否已經存在System.out.println(tbName.getNameAsString() + " is exists!");System.exit(0);} else { // 如果不存在:創建一個表admin.createTable(desc);System.out.println("Congratulations, " + tbName.getNameAsString() + " created successfully!");}}/*** 往表中按照 rowKey 添加數據* @param rowKey* @param tableName 表名* @param column1 第一個列簇列表* @param value1 第一個列的值的列表* @param column2 第二個列簇* @param value2 第二個列的值的列表* @throws IOException*/public static void addData(String rowKey, String tableName, String[] column1, String[] value1,String[] column2, String[] value2) throws IOException {// 設置 rowKeyPut put = new Put(Bytes.toBytes(rowKey)); // Put 用來對單個行執行添加操作// HTable 即 HBase 中的表,負責跟蹤記錄相關的操作:如增刪改查等Table table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName));// 獲取所有的列簇HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies();for (int i = 0; i < columnFamilies.length; i++) {// 獲取列簇名String familyName = columnFamilies[i].getNameAsString();// 往 article 列簇中 put 數據if (familyName.equals("article")) {for (int j = 0; j < column1.length; j++) {put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));}}// 往 author 列簇中添加數據if (familyName.equals("author")) {for (int j = 0; j < column2.length; j++) {put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column2[j]), Bytes.toBytes(value2[j]));}}}table.put(put);System.out.println("Add data success!");}/*** 根據 rowKey 獲取表中的數據* @param tableName 表名* @param rowKey* @return Result 一個 rowKey 對應的結果集合* @throws IOException*/public static Result getResult(String tableName, String rowKey) throws IOException {Get get = new Get(Bytes.toBytes(rowKey)); // Get 是用來獲取單行數據的對象// 獲取表對象Table table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName));// 獲取結果集Result result = table.get(get);for (Cell cell : result.listCells()) {printCell(cell);}return result;}/*** 打印單個 cell 信息* @param cell 一個數據單元*/public static void printCell(Cell cell) throws UnsupportedEncodingException {byte[] rowArray = cell.getRowArray();for (byte row : rowArray) {}System.out.println("RowKey: " + Bytes.toString(cell.getRowArray()));System.out.println("Family: " + Bytes.toString(cell.getFamilyArray()));System.out.println("Qualifier: " + Bytes.toString(cell.getQualifierArray()));System.out.println("Value: " + Bytes.toString(cell.getValueArray()));System.out.println("TimeStamp: " + cell.getTimestamp());System.out.println("-------------------------");}/*** 獲取整個表的所有行數據,并打印所有的 Cell* @param tableName 表名* @throws IOException*/public static void getResultScan(String tableName) throws IOException {Scan scan = new Scan(); // 在創建對象的時候要傳入掃描的起始行和結束行,如果不給參數,那么就會 Scan 整個表Table table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName));// 獲取 scan 后的結果對象ResultScanner rs = table.getScanner(scan);// 結果對象包含所有行的數據,而每個行中又有許多的 Cellfor (Result r : rs) {for (Cell cell : r.listCells()) {printCell(cell);}}rs.close();}/*** 獲取從 startRowKey 到 stopRowKey 的所有的行的數據,并打印包含的所有 Cell* @param tableName 表名* @param startRowKey 起始行* @param stopRowKey 結束行* @throws IOException*/public static void getResultScan(String tableName, String startRowKey, String stopRowKey) throws IOException {Scan scan = new Scan(Bytes.toBytes(startRowKey), Bytes.toBytes(stopRowKey)); // 在創建對象的時候傳入掃描的起始行和結束行Table table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName));// 獲取 scan 后的結果對象ResultScanner rs = table.getScanner(scan);// 結果對象包含所有行的數據,而每個行中又有許多的 Cellfor (Result r : rs) {for (Cell cell : r.listCells()) {printCell(cell);}}rs.close();}/*** 查詢表的某一列的數據* @param tableName 表名* @param rowKey rowKey* @param familyName 列簇名* @param columName 列名* @throws IOException*/public static void getResultByColumn(String tableName, String rowKey, String familyName, String columName) throws IOException {Table table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName));Get get = new Get(Bytes.toBytes(rowKey));// 獲取指定列簇和列修飾符對應的列get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columName));Result results = table.get(get);for (Cell cell : results.listCells()) {printCell(cell);}}/*** 更新一個單元格中的數據* @param tableName 表名* @param rowKey* @param familyName 列簇名* @param columnName 列名* @param value 值* @throws IOException*/public static void updateTable(String tableName, String rowKey, String familyName, String columnName, String value) throws IOException {Table table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName));Put put = new Put(Bytes.toBytes(rowKey));// 創建一個 Cell,需要用 CellUtil 的 createCell 方法來創建Cell cell = CellUtil.createCell(Bytes.toBytes(rowKey),Bytes.toBytes(familyName), // 列簇名Bytes.toBytes(columnName), // 列名System.currentTimeMillis(), // 時間戳Type.Put.getCode(), // 類型Bytes.toBytes(value)); // 值// 住 Put 對象中添加 Cellput.add(cell);// 執行 update 操作table.put(put);System.out.println("Update table success!");}/*** 查詢某列數據的多個版本* @param tableName 表名* @param rowKey* @param familyName 列簇名* @param columnName 列名* @throws IOException*/public static void getResultByVersion(String tableName, String rowKey, String familyName, String columnName) throws IOException {Table table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName.getBytes()));Get get = new Get(Bytes.toBytes(rowKey));// 往 Get 對象中添加列簇信息和列信息get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));// 設置 version 為 5get.setMaxVersions(5);Result results = table.get(get);for (Cell cell : results.listCells()) {printCell(cell);}}/*** 刪除一行中的一個指定的列* @param tableName 表名* @param rowKey* @param familyName 列簇名* @param columnName 列名* @throws IOException*/public static void deleteOneColumnOfOneRow(String tableName, String rowKey, String familyName, String columnName) throws IOException {Table table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName));// 創建一個行級的 Delete 對象Delete deleteColumn = new Delete(Bytes.toBytes(rowKey));// 指定要刪除的列deleteColumn.addColumns(Bytes.toBytes(familyName), Bytes.toBytes(columnName));table.delete(deleteColumn);System.out.println(rowKey + ": " + familyName + ": " + columnName + "is deleted!");}/*** 刪除一行的所有列* @param tableName 表名* @param rowKey* @throws IOException*/public static void deleteAllColumnOfOneRow(String tableName, String rowKey) throws IOException {Table table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName));Delete deleteColumn = new Delete(Bytes.toBytes(rowKey)); // 此時就不用添加列簇名和列名了,因為要刪除這一行中的所有數據table.delete(deleteColumn);System.out.println(rowKey + " all columns are deleted!");}/*** 刪除一個表* @param tableName 表名* @throws IOException*/public static void deleteTable(String tableName) throws IOException {Admin admin = ConnectionFactory.createConnection(conf).getAdmin();// 在刪除表之前要先將表停admin.disableTable(TableName.valueOf(tableName));admin.deleteTable(TableName.valueOf(tableName));System.out.println(tableName + " is deleted!");}public static void main(String[] args) throws IOException {// 創建表String tableName = "blog";String[] family = { "article", "author" };createTable(tableName, family);// 為表添加數據String[] column1 = {"title", "content", "tag"};String[] value1 = {"Head First HBase","HBase is the Hadoop database","Hadoop, HBase, NoSQL"};String[] column2 = {"name", "nickname"};String[] value2 = {"nicholas", "lee"};String[] value3 = {"lilaoshi", "malaoshi"};addData("rowkey1", "blog", column1, value1, column2, value2);addData("rowkey1", "blog", column1, value1, column2, value3);addData("rowkey2", "blog", column1, value1, column2, value2);addData("rowkey3", "blog", column1, value1, column2, value2);// 遍歷查詢, 根據 row key 范圍遍歷查詢getResultScan("blog", "rowkey2", "rowkey3");// 查詢getResult("blog", "rowkey1");// 查詢某一列的值getResultByColumn("blog", "rowkey1", "author", "name");// 更新列updateTable("blog", "rowkey1", "author", "name", "bin");// 查詢某一列的值getResultByColumn("blog", "rowkey1", "author", "name");// 查詢某列的多版本getResultByVersion("blog", "rowkey1", "author", "name");// 刪除一列deleteOneColumnOfOneRow("blog", "rowkey1", "author", "nickname");// 刪除所有列deleteAllColumnOfOneRow("blog", "rowkey1");// 刪除表 // deleteTable("blog");} }?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
總結
以上是生活随笔為你收集整理的HBase Java API 代码开发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: VS 中配置使用Visual SVN系列
- 下一篇: java实验10流_实验9 Java输入