日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

storm throw 口袋妖怪_初版storm项目全流程自动化测试代码实现

發布時間:2025/3/19 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 storm throw 口袋妖怪_初版storm项目全流程自动化测试代码实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

由于項目需要,寫了版針對業務的自動化測試代碼,主要應用場景在于由于業務日趨復雜,一些公共代碼的改動,擔心會影響已有業務。還沒進行重寫,但知識點還是不少的與大家分享實踐下。首先,介紹下整個流處理的業務流程。

首先 從網管實時接入數據到kafka,然后消息接入 進行預處理(這個過程是通過jetty框架,直接用servlet啟動的項目,因為考慮到tomcat的并發不夠,所以這樣用。)隨后預處理完 傳入kafka,然后storm的不同的topo根據不同的傳入類型,進行接入消息的規則匹配,規則是存在于前臺的項目中,定時刷入redis(1分鐘1刷) 隨后加載用戶卡數據、用戶信息等(這些數據是每晚通過跑mapreduce任務生成的大寬表,直接導入redis),通過redis加載效率非常高,滿足實時性(如果redis中不存在數據的情況下,會連接hbase,再進行查詢) 隨后進行業務處理(有時有些會調各個網管的接口,獲取相應業務數據),隨后將封裝好的數據發總致下游通知拓撲,通知拓撲通過webservice或者restTemple發送值各個其他平臺,比如微信,支付寶,短信等,最終將整個運行日志寫入hbase。

首先準備下一些需要的公共類,kafkaclient:

private Properties properties;

private String defaultTopic;

private KafkaProducer producer;

public void setProperties(Properties properties) {

this.properties = properties;

}

public void setDefaultTopic(String defaultTopic) {

this.defaultTopic = defaultTopic;

}

public void setProducer(KafkaProducer producer) {

this.producer = producer;

}

public void init() {

if (properties == null) {

throw new NullPointerException("kafka properties is null.");

}

this.producer = new KafkaProducer(properties);

}

public void syncSend(V value) {

ProducerRecord producerRecord = new ProducerRecord(defaultTopic, value);

try {

producer.send(producerRecord).get();

} catch (Exception e) {

throw new RuntimeException(e);

}

}

public void asyncSend(V value) {

ProducerRecord producerRecord = new ProducerRecord(defaultTopic, value);

producer.send(producerRecord);

}

HbaseUtil:

private static final Logger logger = LoggerFactory.getLogger(HbaseResult.class);

private Gson gson = new Gson();

private HConnection connection = null;

private Configuration conf = null;

private String logFile = "D:/error.txt";

public void init() throws IOException {

logger.info("start init HBasehelper...");

conf = HBaseConfiguration.create();

connection = HConnectionManager.createConnection(conf);

logger.info("init HBasehelper successed!");

}

public synchronized HConnection getConnection() throws IOException {

if (connection == null) {

connection = HConnectionManager.createConnection(conf);

}

return connection;

}

private synchronized void closeConnection() {

if (connection != null) {

try {

connection.close();

} catch (IOException e) {

}

}

connection = null;

}

kafkaClient主要負責將讀取報文的信息發送至kafka,隨之又topo自行運算,最終使用通過調用hbaseUtil,對相應字段的比對查詢。

那么下面對整個自動化測試的流程進行說明:

一、導入前臺活動 ?由于是自動化測試,我們不可能每次都手工上下線,或在頁面配置啟用某個活動,所以通過直接調用前臺系統 導入功能 的方法,將活動配置寫入mysql數據庫,并進行狀態的切換。s

List codeList = new ArrayList();

List activityIdList = new ArrayList();

try {

FileBody bin = new FileBody(new File("D:/activityTest/activity.ac"));

InputStream in = bin.getInputStream();

BufferedReader br = new BufferedReader(new InputStreamReader(in));

String tr = null;

while((tr = br.readLine())!=null){

HttpPost httppost = new HttpPost("http://*********:8088/***/***/***/import");

CloseableHttpClient httpclient = HttpClients.createDefault();

ObjectMapper mapper = new ObjectMapper();

ActivityConfig cloneActivity = null;

cloneActivity = mapper.readValue(tr.toString(),ActivityConfig.class);

List cloneActivitys = new ArrayList();//存放所有的活動

cloneActivitys.add(cloneActivity);

for (ActivityConfig cloneActivity1 : cloneActivitys) {

String code = cloneActivity1.getCode();

codeList.add(code);

}

HttpEntity reqEntity = MultipartEntityBuilder.create()

.addPart("file", bin)

.build();

httppost.setEntity(reqEntity);

System.out.println("executing request " + httppost.getRequestLine());

CloseableHttpResponse response = httpclient.execute(httppost);

System.out.println(response.getStatusLine());

HttpEntity resEntity = response.getEntity();

if (resEntity != null) {

System.out.println("Response content length: " + resEntity.getContentLength());

}

EntityUtils.consume(resEntity);

response.close();

httpclient.close();

}

for(String code : codeList){

String code1 = "'" + code + "'";

if(StringUtils.isNotEmpty(activityCode)){

activityCode.append(",");

}

activityCode.append(code1);

}

}

return activityIdList;

]

二、讀取準備好的報文數據(xml形式需通過解析,數據分隔符格式讀取后直接發送至kafka)

public String readTxt() throwsIOException{

StringBuffer sendMessage= newStringBuffer();

BufferedReader br= null;try{

br= newBufferedReader(new InputStreamReader(new FileInputStream(MessageText), "UTF-8"));

String line= "";while((line = br.readLine()) != null){if (line.contains("<?xml ")) {int beginIndex = line.indexOf("<?xml");

line=line.substring(beginIndex);

}

sendMessage.append(line);

}

}catch(UnsupportedEncodingException e) {

e.printStackTrace();

}catch(FileNotFoundException e) {

e.printStackTrace();

}finally{

br.close();

}returnsendMessage.toString();

}

三、下來,我們需要將解析后的報文數據寫入hbase的相應用戶寬表、卡寬表中,以便storm拓撲中進行用戶數據的加載,這里的rowkey為預分區過的。

HbaseResult baseHelper = new HbaseResult();

baseHelper.init();

tableName = "CARD****";

rowkey = HTableManager.generatRowkey(cardNo);

data.put("*****", "10019");

data.put("*****", cardNo);

data.put("*****", certNo);

data.put("*****", "A");

data.put("*****", "1019");

data.put("*****", supplementCardNo);

data.put("*****", "10020");

data.put("*****", certNo);

data.put("*****", cardType);

data.put("*****", cardType);

data.put("*****", cardNo.substring(12,16));

data.put("*****", "F");

data.put("*****", "ysy");

Put put = new Put(Bytes.toBytes(rowkey));

for (Entry rs : data.entrySet()) {

put.add(HTableManager.DEFAULT_FAMILY_NAME, Bytes.toBytes(rs.getKey()), Bytes.toBytes(rs.getValue()));

}

baseHelper.put(tableName, put);

System.out.println("rowkey:"+rowkey);

data.clear();

四、隨后就可進行消息的發送(發送至集群的kafka)

KafkaInit();

FSTConfiguration fstConf = FSTConfiguration.getDefaultConfiguration();

kafkaClient.syncSend(fstConf.asByteArray(kafkaData));

五、最終進行發送數據的字段對比(通過報文中的,預設的數據字段 與 最終輸出的字段或結果進行對比,隨后追加寫入輸出文件)

Result result = baseHelper.getResult("EVENT_LOG_DH", messageKey);//對比字段

baseHelper.compareData(dataMap, result,activityCode);public Result getResult(String tableName, String rowKey) throwsIOException {

Get get= newGet(Bytes.toBytes(rowKey));

Result result= null;

HTableInterface tableInterface= null;try{

tableInterface=getConnection().getTable(tableName);

result=tableInterface.get(get);returnresult;

}catch(Exception e) {

closeConnection();

logger.error("", e);

}finally{if (tableInterface != null) {

tableInterface.close();

}

}public void compareData(Map messageData, Result res,List activityCode) throwsIOException{

List Messages = new ArrayList();for(Cell cell : res.rawCells()) {

String qualifier=Bytes.toString(CellUtil.cloneQualifier(cell));if(Bytes.toString(CellUtil.cloneQualifier(cell)).equalsIgnoreCase("VARIABLESETS")){

System.out.println(qualifier+ "[" + new Gson().fromJson(Bytes.toString(CellUtil.cloneValue(cell)), Map.class) + "]");

@SuppressWarnings("unchecked")

Map data = gson.fromJson(Bytes.toString(CellUtil.cloneValue(cell)), Map.class);

String message= "";for(String datakey : data.keySet()){if(messageData.containsKey(datakey)){

String dataValue=getString(data,datakey);

String messageValue=getString(messageData,datakey);if(datakey.equals("dh22")){

dataValue= dataValue.substring(0,dataValue.indexOf("."));

messageValue= messageValue.substring(0,messageValue.indexOf("."));

}if(dataValue.equals(messageValue)){

message= datakey + " = " + dataValue + " 與報文中的 " + dataValue + "對比相同";

Messages.add(message);

}else{

message= datakey + " = " + dataValue + " 與報文中的 " + dataValue + "不一致!!!";

Messages.add(message);

}

}

}

}if(Bytes.toString(CellUtil.cloneQualifier(cell)).equalsIgnoreCase("NOTIFY__")){

}

}if(Messages.size() > 0){

StringBuffer sb= newStringBuffer();for(String error : Messages){

sb.append(error).append("\n");

}

FileWriter fw= new FileWriter(logFile,true);

fw.write("\n----------------------");

fw.write(sb.toString());

fw.flush();

fw.close();

}else{

String sb= "沒有對不上的字段呀";

FileWriter fw= newFileWriter(logFile);

fw.write(sb);

fw.flush();

fw.close();

}

}

六、清除導入的數據等信息,整個流程結束~

public void delHbaseData(String cardNo,String certNo) throwsIOException{

String rowkeyCard=HTableManager.generatRowkey(cardNo) ;

String rowKeyUse=HTableManager.generatRowkey(certNo) ;

Delete delData= null;

HTableInterface tableInterface= null;

String tableName= "";try{

tableInterface=getConnection().getTable(tableName);

tableInterface.delete(delData);

}return;

}catch(Exception e) {

closeConnection();

logger.error("", e);

}finally{if (tableInterface != null) {

tableInterface.close();

}

}

}

總結

以上是生活随笔為你收集整理的storm throw 口袋妖怪_初版storm项目全流程自动化测试代码实现的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。