日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flume自定义Hbase Sink的EventSerializer序列化类

發布時間:2023/12/15 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume自定义Hbase Sink的EventSerializer序列化类 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

? ? 最近要分析公司的日志,需要把日志從flume打到hbase中,但是我們的日志由于前期是存到MongoDb中的,所以都是Json格式的日志,這時候使用flume自帶的SimpleHbaseEventSerializer和RegexHbaseEventSerializer這樣的就不行了,于是開始痛苦的看源碼,自己寫序列化的類(這里需要注意,如果是在flume的hbasesink包下編寫的代碼,License信息一定要加上。就是最上面那段英文,要不然在運行的時候會報錯),比較簡單,編寫好類之后,編譯打包,傳到flume的lib目錄下,然后在配置agent的時候指定Serializer的類為編寫的類即可。下面是代碼(類注釋沒貼出來,見諒哈):

public class PRTMSAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {private byte[] table;//hbase表private byte[] cf;//列簇private byte[][] payload;//列集合private byte[][] payloadColumn;//列值private byte[] incrementColumn;private String rowSuffix;//roykey后綴private String rowPrefix;//rowkey前綴private byte[] incrementRow;private KeyType keyType;//rowkey后綴類型 private static final Logger logger = LoggerFactory.getLogger(PRTMSAsyncHbaseEventSerializer.class);@Overridepublic void configure(Context context) {// TODO Auto-generated method stub//設置主鍵后綴類型,這里使用時間戳keyType = KeyType.TS;if (iCol != null && !iCol.isEmpty()) {incrementColumn = iCol.getBytes(Charsets.UTF_8);}incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);}@Overridepublic void configure(ComponentConfiguration conf) {// TODO Auto-generated method stub}@Overridepublic void initialize(byte[] table, byte[] cf) {// TODO Auto-generated method stubthis.table = table;this.cf = cf;}/*** * @Title: setEvent * @Description: 獲取日志信息,并解析出HBase的列以及列的value值 * @param event * @throws * @see org.apache.flume.sink.hbase.AsyncHbaseEventSerializer#setEvent(org.apache.flume.Event)*/@Overridepublic void setEvent(Event event) {// TODO Auto-generated method stub//獲取日志信息String log = new String(event.getBody(), StandardCharsets.UTF_8);//headers包含日志中項目編號和host信息Map<String, String> headers = event.getHeaders();JsonReader jsonReader = new JsonReader(new StringReader(log));String name = "";String value = "";String path = "";Map<String, String> kv = new HashMap<String, String>();try {//解析日志中的鍵值對緩存到map中jsonReader.beginObject();while (jsonReader.hasNext()) {name = jsonReader.nextName();value = jsonReader.nextString();if(name.equals("uri"))path = value.split(" ")[1];kv.put(name, value);}jsonReader.endObject();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}//解析headers中的項目id和服務host、路徑if(path.contains("?")){path = path.substring(0, path.indexOf("?"));}String pcode = headers.get("pcode");String host = headers.get("host");//將項目編號和服務器host添加到map中kv.put("pcode",pcode);kv.put("host", host);//初始化列和value數組this.payloadColumn = new byte[kv.keySet().size()][];this.payload = new byte[kv.keySet().size()][];int i = 0;//給hbase的列和value賦值for (String key : kv.keySet()) {this.payloadColumn[i] = key.getBytes();this.payload[i] = kv.get(key).getBytes();i++;}//設置rowkey的前綴 格式是項目編號+路徑this.rowSuffix = new StringBuilder(pcode).reverse().toString() + ":"+path+":"+kv.get("time");}@Overridepublic List<PutRequest> getActions() {// TODO Auto-generated method stubList<PutRequest> actions = new ArrayList<PutRequest>();if (payloadColumn != null) {byte[] rowKey;try {rowKey = rowSuffix.getBytes();// for 循環,提交所有列和對于數據的put請求。for (int i = 0; i < this.payload.length; i++) {PutRequest putRequest = new PutRequest(table, rowKey, cf, payloadColumn[i], payload[i]);actions.add(putRequest);}} catch (Exception e) {throw new FlumeException("Could not get row key!", e);}}return actions;}@Overridepublic List<AtomicIncrementRequest> getIncrements() {// TODO Auto-generated method stubList<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();if (incrementColumn != null) {AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn);actions.add(inc);}return actions;}@Overridepublic void cleanUp() {// TODO Auto-generated method stub}}



轉載于:https://my.oschina.net/u/780876/blog/651566

總結

以上是生活随笔為你收集整理的Flume自定义Hbase Sink的EventSerializer序列化类的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。