HBase:为客户行为生成搜索点击事件统计信息
在本文中,我們將探索HBase來存儲客戶搜索點擊事件數據,并利用其基于搜索查詢字符串和構面過濾器點擊來獲取客戶行為信息。 我們將介紹如何使用MiniHBaseCluster,HBase Schema設計,使用HBaseSink與Flume集成以存儲JSON數據。
在之前的文章的基礎上,
- 客戶產品搜索使用大數據進行點擊分析 ,
- Flume:使用Apache Flume收集客戶產品搜索點擊數據 ,
- Hive:使用Apache Hive查詢客戶最喜歡的搜索查詢和產品視圖計數 ,
- ElasticSearch-Hadoop:將產品視圖計數和從Hadoop到ElasticSearch的客戶頂部搜索查詢建立索引 ,
- Oozie:為Hive分區和ElasticSearch索引安排協調器/捆綁作業 ,
- Spark:大數據的實時分析,可用于熱門搜索查詢和熱門產品視圖
我們已經探索了將搜索點擊事件數據存儲在Hadoop中并使用不同的技術對其進行查詢。 在這里,我們將使用HBase實現相同的目的:
- HBase小型集群設置
- 使用Spring Data的HBase模板
- HBase模式設計
- 使用HBaseSink進行Flume集成
- HBaseJsonSerializer序列化json數據
- 查詢過去一個小時的前10個搜索查詢字符串
- 查詢過去一個小時的前10個搜索方面過濾器
- 獲取最近30天內客戶的最近搜索查詢字符串
HBase的
HBase “是Hadoop數據庫,一個分布式,可擴展的大數據存儲。”
HBaseMiniCluster / MiniZookeperCluster
要設置和啟動小型集群,請檢查HBaseServiceImpl.java
...miniZooKeeperCluster = new MiniZooKeeperCluster();miniZooKeeperCluster.setDefaultClientPort(10235);miniZooKeeperCluster.startup(new File("taget/zookeper/dfscluster_" + UUID.randomUUID().toString()).getAbsoluteFile());...Configuration config = HBaseConfiguration.create();config.set("hbase.tmp.dir", new File("target/hbasetom").getAbsolutePath());config.set("hbase.master.port", "44335");config.set("hbase.master.info.port", "44345");config.set("hbase.regionserver.port", "44435");config.set("hbase.regionserver.info.port", "44445");config.set("hbase.master.distributed.log.replay", "false");config.set("hbase.cluster.distributed", "false");config.set("hbase.master.distributed.log.splitting", "false");config.set("hbase.zookeeper.property.clientPort", "10235");config.set("zookeeper.znode.parent", "/hbase");miniHBaseCluster = new MiniHBaseCluster(config, 1);miniHBaseCluster.startMaster();...MiniZookeeprCluster在客戶端端口10235上啟動,所有客戶端連接都將在此端口上。 確保將hbase服務器端口配置為不與其他本地hbase服務器沖突。 在這里,我們僅在測試案例中啟動一臺hbase區域服務器。
使用Spring數據的HBase模板
我們將使用Spring hbase模板連接到HBase集群:
<hdp:hbase-configuration id="hbaseConfiguration" configuration-ref="hadoopConfiguration" stop-proxy="false" delete-connection="false" zk-quorum="localhost" zk-port="10235"></hdp:hbase-configuration><bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HBaseTemplate" p:configuration-ref="hbaseConfiguration" />HBase表架構設計
我們具有以下格式的搜索點擊事件JSON數據,
{"eventid":"24-1399386809805-629e9b5f-ff4a-4168-8664-6c8df8214aa7","hostedmachinename":"192.168.182.1330","pageurl":"http://blahblah:/5" ;,"customerid":24,"sessionid":"648a011d-570e-48ef-bccc-84129c9fa400","querystring":null,"sortorder":"desc","pagenumber":3,"totalhits":28,"hitsshown":7,"createdtimestampinmillis":1399386809805,"clickeddocid":"41","favourite":null,"eventidsuffix":"629e9b5f-ff4a-4168-8664-6c8df8214aa7","filters":[{"code":"searchfacettype_color_level_2","value":"Blue"},{"code":"searchfacettype_age_level_2","value":"12-18 years"}]}處理數據的一種方法是將其直接存儲在一個列族和json列下。 這樣掃描json數據將變得不那么容易和靈活。 另一種選擇是將其存儲在一個列族下,但具有不同的列。 但是將過濾器數據存儲在單列中將很難進行掃描。 下面的混合方法是將其劃分為多個列族,并動態生成用于過濾器數據的列。
轉換后的架構為:
{ "client:eventid" => "24-1399386809805-629e9b5f-ff4a-4168-8664-6c8df8214aa7", "client:eventidsuffix" => "629e9b5f-ff4a-4168-8664-6c8df8214aa7", "client:hostedmachinename" => "192.168.182.1330", "client:pageurl" => "http://blahblah:/5", "client:createdtimestampinmillis" => 1399386809805, "client:cutomerid" => 24, "client:sessionid" => "648a011d-570e-48ef-bccc-84129c9fa400", "search:querystring" => null, "search:sortorder" => desc, "search:pagenumber" => 3, "search:totalhits" => 28, "search:hitsshown" => 7, "search:clickeddocid" => "41", "search:favourite" => null, "filters:searchfacettype_color_level_2" => "Blue", "filters:searchfacettype_age_level_2" => "12-18 years" }將創建以下三列系列:
- client :存儲事件的客戶和客戶數據特定信息。
- search :與查詢字符串和分頁信息有關的搜索信息存儲在此處。
- 過濾器:為了將來支持更多構面等,并更靈活地掃描數據,將基于構面名稱/代碼動態創建列名稱,并將列值存儲為構面過濾器值。
要創建hbase表,
...TableName name = TableName.valueOf("searchclicks");HTableDescriptor desc = new HTableDescriptor(name);desc.addFamily(new HColumnDescriptor(HBaseJsonEventSerializer.COLUMFAMILY_CLIENT_BYTES));desc.addFamily(new HColumnDescriptor(HBaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES));desc.addFamily(new HColumnDescriptor(HBaseJsonEventSerializer.COLUMFAMILY_FILTERS_BYTES));try {HBaseAdmin hBaseAdmin = new HBaseAdmin(miniHBaseCluster.getConf());hBaseAdmin.createTable(desc);hBaseAdmin.close();} catch (IOException e) {throw new RuntimeException(e);}...在創建表時已添加了相關列族,以支持新的數據結構。 通常,建議盡量減少列族的數量,并牢記如何根據使用情況來構造數據。 根據以上示例,我們將掃描場景保持為:
- 如果您想根據網站上的總訪問量信息來檢索客戶或客戶信息,請掃描客戶家庭。
- 掃描搜索信息以查看最終客戶正在尋找哪些免費文本搜索,而導航搜索卻無法滿足這些需求。 請參閱在哪個頁面上單擊了相關產品,您是否需要加強應用才能將產品推高。
- 掃描過濾器系列,以了解導航搜索如何為您工作。 是否為最終客戶提供他們想要的產品。 查看更多點擊哪些構面過濾器,您是否需要在訂購中提高一點以便于客戶輕松使用。
- 應避免在家庭之間進行掃描,而應使用行鍵設計來獲得特定的客戶信息。
行鍵設計信息
在我們的例子中,行鍵設計基于customerId-timestamp -randomuuid 。 由于所有列族的行鍵都相同,因此我們可以使用“前綴過濾器”對僅與特定客戶相關的行進行過濾。
final String eventId = customerId + "-" + searchQueryInstruction.getCreatedTimeStampInMillis() + "-" + searchQueryInstruction.getEventIdSuffix(); ... byte[] rowKey = searchQueryInstruction.getEventId().getBytes(CHARSET_DEFAULT); ... # 24-1399386809805-629e9b5f-ff4a-4168-8664-6c8df8214aa7這里的每個列族都有相同的行鍵,并且您可以使用前綴過濾器僅掃描特定客戶的行。
水槽整合
HBaseSink用于將搜索事件數據直接存儲到HBase。 檢查詳細信息FlumeHBaseSinkServiceImpl.java
...channel = new MemoryChannel();Map<String, String> channelParamters = new HashMap<>();channelParamters.put("capacity", "100000");channelParamters.put("transactionCapacity", "1000");Context channelContext = new Context(channelParamters);Configurables.configure(channel, channelContext);channel.setName("HBaseSinkChannel-" + UUID.randomUUID());sink = new HBaseSink();sink.setName("HBaseSink-" + UUID.randomUUID());Map<String, String> paramters = new HashMap<>();paramters.put(HBaseSinkConfigurationConstants.CONFIG_TABLE, "searchclicks");paramters.put(HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY, new String(HBaseJsonEventSerializer.COLUMFAMILY_CLIENT_BYTES));paramters.put(HBaseSinkConfigurationConstants.CONFIG_BATCHSIZE, "1000");paramters.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER, HBaseJsonEventSerializer.class.getName());Context sinkContext = new Context(paramters);sink.configure(sinkContext);sink.setChannel(channel);sink.start();channel.start();...客戶端列系列僅用于HBaseSink的驗證。
HBaseJsonEventSerializer
創建自定義序列化器以存儲JSON數據:
public class HBaseJsonEventSerializer implements HBaseEventSerializer {public static final byte[] COLUMFAMILY_CLIENT_BYTES = "client".getBytes();public static final byte[] COLUMFAMILY_SEARCH_BYTES = "search".getBytes();public static final byte[] COLUMFAMILY_FILTERS_BYTES = "filters".getBytes();...byte[] rowKey = searchQueryInstruction.getEventId().getBytes(CHARSET_DEFAULT);Put put = new Put(rowKey);// Client Inforput.add(COLUMFAMILY_CLIENT_BYTES, "eventid".getBytes(), searchQueryInstruction.getEventId().getBytes());...if (searchQueryInstruction.getFacetFilters() != null) {for (SearchQueryInstruction.FacetFilter filter : searchQueryInstruction.getFacetFilters()) {put.add(COLUMFAMILY_FILTERS_BYTES, filter.getCode().getBytes(),filter.getValue().getBytes());}}...檢查更多詳細信息, HBaseJsonEventSerializer.java
事件主體從Json轉換為Java bean,并進一步處理數據以在相關的列系列中進行序列化。
查詢原始單元格數據
要查詢原始單元格數據:
...Scan scan = new Scan();scan.addFamily(HBaseJsonEventSerializer.COLUMFAMILY_CLIENT_BYTES);scan.addFamily(HBaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES);scan.addFamily(HBaseJsonEventSerializer.COLUMFAMILY_FILTERS_BYTES);List<String> rows = hbaseTemplate.find("searchclicks", scan,new RowMapper<String>() {@Overridepublic String mapRow(Result result, int rowNum) throws Exception {return Arrays.toString(result.rawCells());}});for (String row : rows) {LOG.debug("searchclicks table content, Table returned row: {}", row);}檢查HBaseServiceImpl.java以獲得詳細信息。
數據以以下格式存儲在hbase中:
searchclicks table content, Table returned row: [84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/client:createdtimestampinmillis/1404832918166/Put/vlen=13/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/client:customerid/1404832918166/Put/vlen=2/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/client:eventid/1404832918166/Put/vlen=53/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/client:hostedmachinename/1404832918166/Put/vlen=16/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/client:pageurl/1404832918166/Put/vlen=19/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/client:sessionid/1404832918166/Put/vlen=36/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/filters:searchfacettype_product_type_level_2/1404832918166/Put/vlen=7/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/search:hitsshown/1404832918166/Put/vlen=2/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/search:pagenumber/1404832918166/Put/vlen=1/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/search:querystring/1404832918166/Put/vlen=13/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/search:sortorder/1404832918166/Put/vlen=3/mvcc=0, 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923/search:totalhits/1404832918166/Put/vlen=2/mvcc=0]查詢過去一個小時的前10個搜索查詢字符串
要僅查詢搜索字符串,我們只需要搜索列族。 要在時間范圍內進行掃描,我們可以使用client列系列創建的timestampinmillis列,但這將是擴展掃描。
...Scan scan = new Scan();scan.addColumn(HBaseJsonEventSerializer.COLUMFAMILY_CLIENT_BYTES, Bytes.toBytes("createdtimestampinmillis"));scan.addColumn(HBaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES, Bytes.toBytes("querystring"));List<String> rows = hbaseTemplate.find("searchclicks", scan,new RowMapper<String>() {@Overridepublic String mapRow(Result result, int rowNum) throws Exception {String createdtimestampinmillis = new String(result.getValue(HBaseJsonEventSerializer.COLUMFAMILY_CLIENT_BYTES, Bytes.toBytes("createdtimestampinmillis")));byte[] value = result.getValue(HBaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES, Bytes.toBytes("querystring"));String querystring = null;if (value != null) {querystring = new String(value);}if (new DateTime(Long.valueOf(createdtimestampinmillis)).plusHours(1).compareTo(new DateTime()) == 1 && querystring != null) {return querystring;}return null;}});...//sort the keys, based on counts collection of the query strings.List<String> sortedKeys = Ordering.natural().onResultOf(Functions.forMap(counts)).immutableSortedCopy(counts.keySet());...查詢過去一個小時的前10個搜索方面過濾器
基于動態列創建,您可以掃描數據以返回點擊次數最高的構面過濾器。
動態列將基于您的方面代碼,這些代碼可以是以下任意一種:
#searchfacettype_age_level_1#searchfacettype_color_level_2#searchfacettype_brand_level_2#searchfacettype_age_level_2for (String facetField : SearchFacetName.categoryFacetFields) {scan.addColumn(HBaseJsonEventSerializer.COLUMFAMILY_FILTERS_BYTES, Bytes.toBytes(facetField));}檢索到:
...hbaseTemplate.find("searchclicks", scan, new RowMapper<String>() {@Overridepublic String mapRow(Result result, int rowNum) throws Exception {for (String facetField : SearchFacetName.categoryFacetFields) {byte[] value = result.getValue(HBaseJsonEventSerializer.COLUMFAMILY_FILTERS_BYTES, Bytes.toBytes(facetField));if (value != null) {String facetValue = new String(value);List<String> list = columnData.get(facetField);if (list == null) {list = new ArrayList<>();list.add(facetValue);columnData.put(facetField, list);} else {list.add(facetValue);}}}return null;}});...您將獲得所有構面的完整列表,可以進一步處理數據以計算頂面并對其進行排序。 有關完整的詳細信息,請檢查HBaseServiceImpl.findTopTenSearchFiltersForLastAnHour
獲取客戶的最近搜索查詢字符串
如果需要檢查客戶當前正在尋找什么,我們可以在“客戶”和“搜索”之間的兩個列族之間創建掃描。 或者,另一種方式是設計行鍵,以便為您提供相關信息。 在我們的例子中,行鍵設計基于CustomerId_timestamp _randomuuid。 由于所有列族的行鍵都相同,因此我們可以使用“前綴過濾器”對僅與特定客戶相關的行進行過濾。
final String eventId = customerId + "-" + searchQueryInstruction.getCreatedTimeStampInMillis() + "-" + searchQueryInstruction.getEventIdSuffix(); ... byte[] rowKey = searchQueryInstruction.getEventId().getBytes(CHARSET_DEFAULT); ... # 84-1404832902498-7965306a-d256-4ddb-b7a8-fd19cdb99923要掃描特定客戶的數據,
...Scan scan = new Scan();scan.addColumn(HBaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES, Bytes.toBytes("customerid"));Filter filter = new PrefixFilter(Bytes.toBytes(customerId + "-"));scan.setFilter(filter);...有關詳細信息,請檢查HBaseServiceImpl.getAllSearchQueryStringsByCustomerInLastOneMonth
希望這可以幫助您入門HBase模式設計和處理數據。
翻譯自: https://www.javacodegeeks.com/2014/07/hbase-generating-search-click-events-statistics-for-customer-behavior.html
總結
以上是生活随笔為你收集整理的HBase:为客户行为生成搜索点击事件统计信息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 跳舞机设置(跳舞机设置怎么使用)
- 下一篇: 在线电脑系统修复教程(快速修复电脑)