日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

数据导入HBase最常用的三种方式及实践分析

發布時間:2025/7/14 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 数据导入HBase最常用的三种方式及实践分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

????要使用Hadoop,數據合并至關重要,HBase應用甚廣。一般而言,需要針對不同情景模式將現有的各種類型的數據庫或數據文件中的數據轉入至HBase中。

????常見方式為:1.使用HBase的API中的Put方法;

??????????????? 2.使用HBase 的bulk load 工具;

??????????????? 3.使用定制的MapReduce Job方式。

????《HBase Administration Cookbook》一書對這三種方式有著詳盡描述,由ImportNew?的陳晨進行了編譯,很有收獲,推薦給大家。


HBase數據遷移(1)-使用HBase的API中的Put方法 ? ?

????使用HBase的API中的Put是最直接的方法,用法也很容易學習。但針對大部分情況,它并非都是最高效的方式。當需要將海量數據在規定時間內載 入HBase中時,效率問題體現得尤為明顯。待處理的數據量一般都是巨大的,這也許是為何我們選擇了HBase而不是其他數據庫的原因。在項目開始之前, 你就該思考如何將所有能夠很好的將數據轉移進HBase,否則之后可能面臨嚴重的性能問題。

????HBase有一個名為bulk load的功能支持將海量數據高效地裝載入HBase中。Bulk load是通過一個MapReduce Job來實現的,通過Job直接生成一個HBase的內部HFile格式文件來形成一個特殊的HBase數據表,然后直接將數據文件加載到運行的集群中。 使用bulk load功能最簡單的方式就是使用importtsv 工具。importtsv 是從TSV文件直接加載內容至HBase的一個內置工具。它通過運行一個MapReduce ? ?Job,將數據從TSV文件中直接寫入HBase的表或者寫入一個HBase的自有格式數據文件。盡管importtsv 工具在需要將文本數據導入HBase的時候十分有用,但是有一些情況,比如導入其他格式的數據,你會希望使用編程來生成數據,而MapReduce是處理 海量數據最有效的方式。這可能也是HBase中加載海量數據唯一最可行的方法了。當然我們可以使用MapReduce向HBase導入數據,但海量的數據 集會使得MapReduce Job也變得很繁重。若處理不當,則可能使得MapReduce的job運行時的吞吐量很小。

????在HBase中數據合并是一項頻繁執行寫操作任務,除非我們能夠生成HBase的內部數據文件,并且直接加載。這樣盡管HBase的寫入速度一直很快,但是若合并過程沒有合適的配置,也有可能造成寫操作時常被阻塞。寫操作很重的任務可能引起的另一個問題就是將數據寫入了相同的族群服務器 (region ? ? server),這種情況常出現在將海量數據導入到一個新建的HBase中。一旦數據集中在相同的服務器,整個集群就變得不平衡,并且寫速度會顯著的降 低。我們將會在本文中致力于解決這些問題。我們將從一個簡單的任務開始,使用API中的Put方法將MySQL中的數據導入HBase。接著我們會描述如 何使用importtsv 和 bulk load將TSV數據文件導入HBase。我們也會有一個MapReduce樣例展示如何使用其他數據文件格式來導入數據。上述方式都包括將數據直接寫入 HBase中,以及在HDFS中直接寫入HFile類型文件。本文中最后一節解釋在向HBase導入數據之前如何構建好集群。本文代碼均是以Java編 寫,我們假設您具有基本Java知識,所以我們將略過如何編譯與打包文中的Java示例代碼,但我們會在示例源碼中進行注釋。

通過單個客戶端導入MySQL數據

????數據合并最常見的應用場景就是從已經存在的關系型數據庫將數據導入到HBase中。對于此類型任務,最簡單直接的方式就是從一個單獨的客戶端獲取數據,然后通過HBase的API中Put方法將數據存入HBase中。這種方式適合處理數據不是太多的情況。

????本節描述的是使用Put方法將MySQL數據導入HBase中的方式。所有的操作均是在一個單獨的客戶端執行,并且不會使用到MapReduce。本節將會帶領你通過HBase ? ?Shell創建HBase表格,通過Java來連接集群,并將數據導入HBase。

準備

????公共數據集合是個練習HBase數據合并的很好數據源?;ヂ摼W上有很多公共數據集合。我們在本文中獎使用 “美國國家海洋和大氣管理局 1981-2010氣候平均值”的公共數據集合。訪問http://www1.ncdc.noaa.gov/pub/data/normals /1981-2010/下載。

這些氣候報表數據是由美國國家海洋和大氣管理局(NOAA)生成的。在本文中,我們使用在目錄 products | hourly 下的小時溫度數據(可以在上述鏈接頁面中找到)。下載hly-temp-normal.txt文件。 ? ?
需要一個MySQL實例,在MySQL數據庫中創建hly_temp_normal表格,使用如下的SQL命令:

[sql] view plaincopy

  • create?table?hly_temp_normal?(?? id?INT?NOT?NULL?AUTO_INCREMENT?PRIMARY?KEY,?? stnid?CHAR(11),?? month?TINYINT,?? day?TINYINT,?? value1?VARCHAR(5),?? value2?VARCHAR(5),?? value3?VARCHAR(5),?? value4?VARCHAR(5),?? value5?VARCHAR(5),?? value6?VARCHAR(5),?? value7?VARCHAR(5),?? value8?VARCHAR(5),?? value9?VARCHAR(5),?? value10?VARCHAR(5),?? value11?VARCHAR(5),?? value12?VARCHAR(5),?? value13?VARCHAR(5),?? value14?VARCHAR(5),?? value15?VARCHAR(5),?? value16?VARCHAR(5),?? value17?VARCHAR(5),?? value18?VARCHAR(5),?? value19?VARCHAR(5),?? value20?VARCHAR(5),?? value21?VARCHAR(5),?? value22?VARCHAR(5),?? value23?VARCHAR(5),?? value24?VARCHAR(5)?? );
  • ????本文提供了一些腳本將txt中的數據導入到MySQL表中。你可以使用 insert_hly.py 來加載每小時的NOAA數據。只需要修改腳本中的主機(host),用戶(user),密碼(password)以及數據名稱(database ? ?name)。完成修改后就能夠將下載的hly-temp-normal.txt數據導入到mysql的hly_temp_normal 表中,使用命令如下: ? ?
    $ python insert_hly.py -f hly-temp-normal.txt -t hly_temp_normal

    譯者注:此處給出python腳本下載地址(https://github.com/uprush/hac-book/blob/master/2-data-migration/script/insert_hly.py)

    譯者注:由于對于python的了解有限以及環境限制,所以單獨另寫了一段Java的代碼,可以直接使用的:

    [java] view plaincopy

    import?java.io.FileInputStream; import?java.io.IOException; import?java.io.InputStreamReader; import?java.io.Reader; import?java.sql.Connection; import?java.sql.DriverManager; import?java.sql.PreparedStatement; import?java.sql.SQLException; import?java.util.ArrayList; import?java.util.List;public?class?InsertHly?{static?String?user?=?"root";static?String?pwd?=?"root123";static?String?driver?=?"com.mysql.jdbc.Driver";static?String?url?=?"jdbc:mysql://127.0.0.1:3306/htom?useUnicode=true&characterEncoding=UTF-8";public?static?void?main(String[]?args)?throws?SQLException?{Connection?baseCon?=?null;String?sqlStr?=?"insert?into?hly_temp_normal?(stnid,month,day)?values?(?,?,?)";List?parasValues?=?new?ArrayList();try?{baseCon?=?DriverManager.getConnection(url,?user,?pwd);}?catch?(SQLException?e)?{//?TODO?Auto-generated?catch?block?e.printStackTrace();}?//?替換為文件地址String?allRowsStr?=?readFileByChars("d:\\TestZone\\hly-temp-normal.txt",?"gbk");String[]?rows?=?allRowsStr.split("\n");for?(String?row?:?rows)?{parasValues.add(row.split("\\s+"));}PreparedStatement?basePsm?=?null;try?{baseCon.setAutoCommit(false);basePsm?=?baseCon.prepareStatement(sqlStr);for?(int?i?=?0;?i?<?parasValues.size();?i++)?{Object[]?parasValue?=?parasValues.get(i);for?(int?j?=?0;?j?<?parasValue.length;?j++)?{basePsm.setObject(j?+?1,?parasValue[j]);}basePsm.addBatch();}basePsm.executeBatch();baseCon.commit();}?catch?(SQLException?e)?{baseCon.rollback();throw?e;}?finally?{if?(basePsm?!=?null)?{basePsm.close();basePsm?=?null;}if?(baseCon?!=?null)?{baseCon.close();}}}public?static?String?readFileByChars(String?fileName,?String?enc)?{StringBuffer?content?=?new?StringBuffer();Reader?reader?=?null;try?{?//?一次讀多個字符char[]?tempchars?=?new?char[30];int?charread?=?0;reader?=?new?InputStreamReader(new?FileInputStream(fileName),?enc);?//?讀入多個字符到字符數組中,charread為一次讀取字符數while?((charread?=?reader.read(tempchars))?!=?-1)?{?//?同樣屏蔽掉\r不顯示if?((charread?==?tempchars.length)&&?(tempchars[tempchars.length?-?1]?!=?'\r'))?{content.append(tempchars);}?else?{for?(int?i?=?0;?i?<?charread;?i++)?{if?(tempchars[i]?==?'\r')?{continue;}?else?{content.append(tempchars[i]);}}}}return?content.toString();}?catch?(Exception?e1)?{e1.printStackTrace();}?finally?{if?(reader?!=?null)?{try?{reader.close();}?catch?(IOException?e1)?{}}}return?null;} }

    為使得下一節中的Java源碼能夠編譯,你需要下列庫支持: ? ? ? ?
    hadoop-core-1.0.2.jar ? ? ? ?
    hbase-0.92.1.jar ? ? ? ?
    mysql-connector-java-5.1.18.jar

    你可以將他們手動加入classpath中,或者使用本文中的可用的示例代碼。

    在導入數據之前,確認HDFS, ZooKeeper,和HBase集群均正常運行。在HBase的客戶端節點記錄日志。

    如何實施 ? ?

    通過單節點客戶端將數據從MySQL導入HBase: ? ? ? ?
    1.從HBase的客戶端服務器從過HBase的Shell命令行,連接到HBase的集群。 ? ? ? ?
    hadoop$ $HBASE_HOME/bin/hbase shell ? ? ? ?
    2.在HBase中創建 hly_temp 表 ? ? ? ?
    hbase> create ‘hly_temp’, {NAME => ‘n’, VERSIONS => 1} ? ? ? ?
    3.寫一個Java程序將數據從MySQL中導入HBase,并將其打包成jar。在Java中按照下列步驟導入數據: ? ? ? ?
    i. 使用Java創建一個connectHBase() 方法來連接到指定的HBase表: ? ? ? ?
    $ vi Recipe1.java ? ?

    [java] view plaincopy

  • private?static?HTable?connectHBase(String?tablename)?\?? throws?IOException?{?? HTable?table?=?null;?? Configuration?conf?=?HBaseConfiguration.create();?? table?=?new?HTable(conf,?tablename);?? return?table;?? }
  • ii. 使用Java創建一個 connectDB() 方法來 MySQL : ? ? ? ?
    $ vi Recipe1.java ? ?

    [java] view plaincopy

  • private?static?Connection?connectDB()?\?? throws?Exception?{?? String?userName?=?"db_user";?? String?password?=?"db_password";?? String?url?=?"jdbc:mysql://db_host/database";?? Class.forName("com.mysql.jdbc.Driver").newInstance();?? Connection?conn?=?DriverManager.getConnection(url,?? userName,?password);?? return?conn;?? }
  • 此處是Java類中的main() 方法,在其中我們從MySQL獲取數據并存入HBase中: ? ? ? ?
    $ vi Recipe1.java ? ?

    [java] view plaincopy

  • public?class?Recipe1?{??public?static?void?main(String[]?args)?{??Connection?dbConn?=?null;??HTable?htable?=?null;??Statement?stmt?=?null;??String?query?=?"select?*?from?hly_temp_normal";??try?{??dbConn?=?connectDB();??htable?=?connectHBase("hly_temp");??byte[]?family?=?Bytes.toBytes("n");??stmt?=?dbConn.createStatement();??ResultSet?rs?=?stmt.executeQuery(query);??//?time?stamp?for?all?inserted?rows??//?所有插入數據的時間戳??long?ts?=?System.currentTimeMillis();??while?(rs.next())?{??String?stationid?=?rs.getString("stnid");??int?month?=?rs.getInt("month");??int?day?=?rs.getInt("day");??String?rowkey?=?stationid?+?Common.lpad(String.???valueOf(month),?2,????'0')?+?Common.lpad(String.valueOf(day),?2,?'0');??Put?p?=?new?Put(Bytes.toBytes(rowkey));??//?get?hourly?data?from?MySQL?and?put?into?hbase??//從MySQL中獲取小時數據并存入HBase??for?(int?i?=?5;?i?<?29;?i++)?{??String?columnI?=?"v"?+?Common.lpad???(String.valueOf(i?-?4),?2,?'0');??String?valueI?=?rs.getString(i);??p.add(family,?Bytes.toBytes(columnI),?ts,????Bytes.toBytes(valueI));??}??htable.put(p);??}??}?catch?(Exception?e)?{??e.printStackTrace();??}?finally?{??try?{??if?(stmt?!=?null)?{??stmt.close();??}??if?(dbConn?!=?null)?{??dbConn.close();??}??if?(htable?!=?null)?{??htable.close();??}??}?catch?(Exception?e)?{??//?ignore??}??}??}?? }
  • 4.運行導入任務,下面的腳本就是用于執行JAR文件:

    [java] view plaincopy

  • #/bin/bash?? bin=`dirname?$0`?? bin=`cd?$bin;pwd`?? cp=$HBASE_HOME/conf:$HBASE_HOME/hbase-0.92.1.jar:$bin/build/hac-?? chapter2.jar?? for?jar?in?$bin/lib/*.jar?? do??cp=$cp:$jar?? done?? for?jar?in?$HBASE_HOME/lib/*.jar?? do??cp=$cp:$jar?? done
  • $JAVA_HOME/bin/java -classpath $cp “hac.chapter2.Recipe1″ ? ?

    5.驗證HBase中導入的數據,通過HBase的Shell連接至HBase: ? ? ? ?
    hadoop$ $HBASE_HOME/bin/hbase shell ? ?

    6.驗證數據已經被導入了HBase的對應表中: ? ? ? ?
    hbase> count ‘hly_temp’ ? ? ? ?
    95630 row(s) in 8.9850 seconds ? ? ? ?
    hbase> scan ‘hly_temp’, {LIMIT => 10} ? ? ? ?
    … ? ? ? ?
    AQW000617050110 column=n:v23, ? ? ? ?
    timestamp=1322958813521, value=814S ? ? ? ?
    AQW000617050110 column=n:v24, ? ? ? ?
    timestamp=1322958813521, value=811C ? ? ? ?
    10 row(s) in 0.6730 seconds

    運行原理 ? ?

    ????在步驟1和2中,我們在HBase中創建了目標表用于插入數據。目標表名稱為hly_temp,且只有單個列族(column family) n。我們將列族名稱設計為一個字母的原因,是因為列族名稱會存儲在HBase的每個鍵值對中。使用短名能夠讓數據的存儲和緩存更有效率。我們只需要保留一 個版本的數據,所以為列族指定VERSION屬性。

    ????在Java代碼中,為了連接到HBase,我們首先創建一個配置(Configuration)對象,使用該對象創建一個HTable實例。這個HTable對象用于處理所有的客戶端API調用。如你所見,我們在代碼沒有設置任何 ZooKeeper或HBase的連接配置。所以程序該如何連接到運行的HBase集群呢?這或許是因為我們在步驟4中將$HBase/conf目錄添加到classpath中了。通過上述設置,HBase的客戶端API會classpath中的hbase- site.xml加載配置信息。連接配置信息在hbase-site.xml中設置。

    ????在使用JDBC中MySQL中獲取數據之后,我們循環讀取結果集,將MySQL中的一行映射為HBase表中的一行。此處我們使用stationid,月份和日期欄位來生成HBase數據的row key。我們在月份和日期左邊也填充0,補足2位數。這樣做很重要,因為HBase的row key是按照字典排序的,意味著12將排序在2之前,這樣可能會導致一些意外的情況發生。

    ????我們創建了Put對象,利用row key添加一行數據。每小時的數據的添加需要調用Put.add()方法,傳入參數包括列族(column family),限定符(qualifier),時間戳( timestamp), and 值(value)。再次聲明,我們使用很短的列族名稱能夠讓存儲數據更高效。所有的數據都被添加之后,我們調用HTable.put() 方法會將數據保存進HBase的table中。

    ????最后,所有打開的資源都需要手動關閉。我們在代碼中的final塊中結束了MySQL和HBase的連接,這樣確保即時導入動作中拋出異常仍然會被調用到。 ? ? ? ?
    ????你能夠通過對比MySQL和HBase的數據行數來驗證導入是否正確。你可以在掃描(scan)結果集中發現數據都準確的導入了HBase。


    轉載于:https://blog.51cto.com/xulongping/1438827

    總結

    以上是生活随笔為你收集整理的数据导入HBase最常用的三种方式及实践分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。