基于Redis、Storm的实时数据查询实践
通過(guò)算法小組給出的聚合文件,我們需要實(shí)現(xiàn)一種業(yè)務(wù)場(chǎng)景,通過(guò)用戶(hù)的消費(fèi)地點(diǎn)的商戶(hù)ID與posId,查詢(xún)出他所在的商圈,并通過(guò)商圈地點(diǎn)查詢(xún)出與該區(qū)域的做活動(dòng)的商戶(hù),并與之進(jìn)行消息匹配,推送相應(yīng)活動(dòng)信息到用戶(hù)手機(jī)。
那么整個(gè)流程分為兩步,第一步,將整個(gè)聚合文件刷入緩存,文件數(shù)據(jù)格式如下:
29 1:1 102100156910958 10015691?X有限公司 0 1 29 1:1 102100156910958 10015691 X有限公司?0 1字段含義分別是 地區(qū)編號(hào)、商圈編號(hào)、商戶(hù)編號(hào)、Pos編號(hào)、商戶(hù)名稱(chēng)、合作商戶(hù)標(biāo)志。那么我們可以通過(guò) 商戶(hù)編號(hào)+Post編號(hào) 來(lái)定位 其所在的商圈, 可以通過(guò) 地區(qū)編號(hào)+商圈編號(hào) 來(lái)獲取該商圈的所有商戶(hù)信息(Redis中直接set)。于是導(dǎo)入Redis是可使用key:商戶(hù)編號(hào)+Post編號(hào) value:地區(qū)編號(hào)+商圈編號(hào) 。 隨之第二個(gè)key 為 地區(qū)編號(hào)+商圈編號(hào) 從而得到 該商圈的所有商戶(hù)(Redis中使用hset)。
將聚合文件導(dǎo)入Redis,,部分代碼如下
String merchantId = StringUtils.join("V_",content[2].trim(),content[3].trim());String areabiz = StringUtils.join(content[0].trim(),content[1].trim());String merchantName = StringUtils.join(content[4].trim());String flag = StringUtils.join(content[5].trim());Map<String,String> MerchantMap = new HashMap<String,String>();MerchantMap.put(merchantName, merchantId);try {for (int i = 0; i < jedisvPools.size(); i++) {JedisPool jp = jedisvPools.get(i);Jedis jedis = null;try {jedis = jp.getResource();//key為商戶(hù)編號(hào)+PosId value為地區(qū)編號(hào)area+商圈編號(hào)bizAreaId jedis.set(merchantId, areabiz);//key為商圈編號(hào)+PosId value為商戶(hù)名稱(chēng),使用sadd添加相同商圈編號(hào)+PosId的商戶(hù)if("1".equals(flag)){jedis.hmset(areabiz, MerchantMap);}} catch (Exception e) {logger.error("", e);} finally {jedis.close();}}將需要匹配的活動(dòng)商戶(hù)文件及信息導(dǎo)入redis,,部分代碼如下
if (StringUtils.isEmpty(content[4]) || StringUtils.isEmpty(content[5])||StringUtils.isEmpty(content[6])) {logger.warn("數(shù)據(jù)格式有誤,內(nèi)容為:{}", line);return;}String merchantId = "";String posIds = StringUtils.join(content[5]);String address = StringUtils.join(content[3]);String[] posIdArray = posIds.split("、");String url = content[6];Map<String,String> MerchantUrlAdress = new HashMap<String,String>();MerchantUrlAdress.put(address,url);for(String posId : posIdArray){merchantId = StringUtils.join("Vir_",content[4].trim(),posId.trim());try {for (int i = 0; i < jedisPools.size(); i++) {JedisPool jp = jedisPools.get(i);Jedis jedis = null;try {jedis = jp.getResource();//key為商戶(hù)編號(hào)+PosId value為地區(qū)編號(hào)area+商圈編號(hào)bizAreaId jedis.hmset(merchantId,MerchantUrlAdress);} catch (Exception e) {logger.error("", e);} finally {//jedis.close(); jp.returnResourceObject(jedis);}}接入用戶(hù)實(shí)時(shí)刷卡消費(fèi)信息,流入storm,匹配該用戶(hù)所在商圈的活動(dòng)商戶(hù),并匹配獲取該活動(dòng)商戶(hù)的地址及url信息 通過(guò)http的形式推送至支付寶或微信渠道,部分代碼如下:
String bizAreaName = "";String bizAreaUrl = "";String address = "";//根據(jù)活動(dòng)商戶(hù)ID與postId 查詢(xún)所在商圈String areabiz = virtualBusinessService.getAreaBiz(MerchantId);if(null == areabiz){resultSets.addValue(ResultSets.OpType.INSERT,"BIZAREALISTNAME",bizAreaName);resultSets.addValue(ResultSets.OpType.INSERT, "BIZAREAURL", bizAreaUrl);resultSets.addValue(ResultSets.OpType.INSERT, "BIZADDRESS", address);logger.info("VirtualTradeAreaAlgorithm="+MerchantId);return resultSets;}//根據(jù)活動(dòng)ID,獲取該活動(dòng)配置的商戶(hù)IdString activityMerchantCode = virtualBusinessService.getActivityConf(activityConfId);//查詢(xún)所在商圈的所有商戶(hù)信息Map<String,String> bizAreaNameMap = virtualBusinessService.getbizAreaNameSet(areabiz);if(!bizAreaNameMap.isEmpty()){//匹配活動(dòng)配置的商戶(hù)for(String bizName : bizAreaNameMap.keySet()){String mapvalue = bizAreaNameMap.get(bizName).replace("V_", "");if(activityMerchantCode.contains(mapvalue)){bizAreaName = bizName;//根據(jù)活動(dòng)商戶(hù)名稱(chēng)查詢(xún)?cè)撋虘?hù)對(duì)應(yīng)的商戶(hù)IDactivityMerchantId = bizAreaNameMap.get(bizName).replace("V_","Vir_");//根據(jù)活動(dòng)商戶(hù)Id,查詢(xún)?cè)摶顒?dòng)商戶(hù)的url Vir_89811144816144501080209Map<String,String> bizAreaUrlAdree = virtualBusinessService.getBizUrl(activityMerchantId);if(null == bizAreaUrlAdree){address = "";bizAreaUrl = "";}else{for(String bizAdress : bizAreaUrlAdree.keySet()){address = bizAdress;bizAreaUrl = bizAreaUrlAdree.get(bizAdress);}}break;}}}?具體還在整理,后續(xù)將其補(bǔ)全~
總結(jié)
以上是生活随笔為你收集整理的基于Redis、Storm的实时数据查询实践的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: oracle 磁盘不分区吗,LINUX停
- 下一篇: linux mysql 最小安装,Lin