日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

一段java并发编程代码

發(fā)布時間:2024/1/23 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 一段java并发编程代码 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1?snmpGetDevicesTask方法

public static Map<String,CapEbu> snmpGetDevicesTask(List<String> ipList, List<String> oidList) {if(ipList==null||ipList.size()==0 || oidList==null || oidList.size()==0) return null;Map<String,CapEbu> ebuMap = new HashMap<String,CapEbu>();List<CapEbu> ebulist = new Vector<CapEbu>();int ipsSize = ipList.size();//ip段大小//指定循環(huán)次數(shù),如果不指定循環(huán)次數(shù)且IP段大小是65536,那么線程池的一個任務(wù)就包含600多個ip,那么在極短時間內(nèi),當(dāng)前任務(wù)會有一個監(jiān)聽器線程,該線程可能會響應(yīng)不過來,因為要處理600多個響應(yīng)。//現(xiàn)在指定一次循環(huán),線程池就只處理1000個ip,同時該循環(huán)中只向線程池發(fā)布100個任務(wù),平均一個任務(wù)只處理10個ip。int cicleNumber = (ipsSize%ScanDeviceContants.DEAL_IPNUMBER == 0)?(ipsSize/ScanDeviceContants.DEAL_IPNUMBER):(ipsSize/ScanDeviceContants.DEAL_IPNUMBER+1);//創(chuàng)建線程執(zhí)行器ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(ScanDeviceContants.THREADPOOL_SIZE);List<Future<List<CapEbu>>> resultList = new ArrayList<Future<List<CapEbu>>>();for(int cicle=0; cicle<cicleNumber; cicle++){List<String> iptemp = new ArrayList<String>();iptemp.addAll(ipList.subList(cicle*ScanDeviceContants.DEAL_IPNUMBER, cicle==cicleNumber-1?ipList.size():(cicle+1)*ScanDeviceContants.DEAL_IPNUMBER));//創(chuàng)建ip分配器AssignIps assignIps = new AssignIps(iptemp);for(int i=0; i<assignIps.getTaskSize(); i++){SnmpEbuTask snmpEbuTask = new SnmpEbuTask(assignIps.getAssignedIps(i),oidList);Future<List<CapEbu>> result = executor.submit(snmpEbuTask);resultList.add(result);snmpEbuTask = null;}System.out.println("CorePoolSize:"+executor.getCorePoolSize()+"---"+"MaximumPoolSize:"+executor.getMaximumPoolSize()+"-----LargestPoolSize:"+executor.getLargestPoolSize()+"-------PoolSize:"+executor.getPoolSize());executor.allowCoreThreadTimeOut(false);System.out.println("executor.getKeepAliveTime(TimeUnit.MILLISECONDS):"+executor.getKeepAliveTime(TimeUnit.NANOSECONDS));//直到所有任務(wù)都已完成,則停止循環(huán)do {try {TimeUnit.MILLISECONDS.sleep(500);System.out.println("線程活躍數(shù)量:"+executor.getActiveCount());} catch (InterruptedException e) {e.printStackTrace();}} // while (executor.getCompletedTaskCount()<resultList.size()*(cicle+1)); while (executor.getActiveCount()!=0); System.out.println("線程活躍數(shù)量:"+executor.getActiveCount()+",執(zhí)行器執(zhí)行完的數(shù)量:"+executor.getCompletedTaskCount());System.out.println("執(zhí)行器"+(cicle+1)+"是否已終止:"+executor.isTerminated());}executor.shutdown();//關(guān)閉線程執(zhí)行器//所有任務(wù)已經(jīng)完成,開始取出任務(wù)for (int i=0; i<resultList.size(); i++) {Future<List<CapEbu>> result=resultList.get(i);if(result.isDone()){try {List<CapEbu> ebus = result.get();if(ebus!=null&&ebus.size()>0){for(int j = 0 ; j < ebus.size() ; j++){if(!ebuMap.containsKey(ebus.get(j).getEbuIp())){ebuMap.put(ebus.get(j).getEbuIp(), ebus.get(j));System.out.println("放入map中,測試結(jié)果為 : ip = " + ebus.get(j).getEbuIp());}}} // ebulist.removeAll(ebus); // ebulist.addAll(ebus);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}} return ebuMap;}
2?SnmpEbuTask.java

import java.util.ArrayList; import java.util.List; import java.util.Vector; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit;import org.snmp4j.CommunityTarget; import org.snmp4j.PDU; import org.snmp4j.Snmp; import org.snmp4j.event.ResponseEvent; import org.snmp4j.event.ResponseListener; import org.snmp4j.mp.SnmpConstants; import org.snmp4j.smi.Address; import org.snmp4j.smi.GenericAddress; import org.snmp4j.smi.OID; import org.snmp4j.smi.OctetString; import org.snmp4j.smi.VariableBinding; import org.snmp4j.transport.DefaultUdpTransportMapping;import com.nufront.euht.model.CapEbu; import com.nufront.euht.scanDevice.util.OidTranslatorUtil; import com.nufront.euht.util.Beans; import com.nufront.euht.util.StringUtil;public class SnmpEbuTask implements Callable<List<CapEbu>> {private int version = SnmpConstants.version2c;private String protocol = "udp";private int port = 161;private String community = "euhtpub";private List<String> ipsAddress;private List<String> oids;private List<CapEbu> capEbus = new Vector<CapEbu>();private Object lock = new Object();//同步鎖private int responseCounter = 0;//響應(yīng)計數(shù)器private ConcurrentHashMap<String,Object> distinctIp = new ConcurrentHashMap<String,Object>();//ip響應(yīng)去重器public SnmpEbuTask(List<String> ipsAddress, List<String> oids) {this.ipsAddress = ipsAddress;this.oids = oids;}private CommunityTarget createCommunityTarget(String address,String community, int version, long timeOut, int retry) {Address targetAddress = GenericAddress.parse(address);CommunityTarget target = new CommunityTarget();target.setCommunity(new OctetString(community));target.setAddress(targetAddress);target.setVersion(version);target.setTimeout(timeOut); // millisecondstarget.setRetries(retry);return target;}@Overridepublic List<CapEbu> call() throws Exception {String address = null;CommunityTarget target = null;DefaultUdpTransportMapping transport = null;Snmp snmp = null;try {transport = new DefaultUdpTransportMapping();transport.listen();snmp = new Snmp(transport);PDU pdu = new PDU();pdu.setType(PDU.GET);for (String oid : oids) {pdu.add(new VariableBinding(new OID(oid)));}ResponseListener listener = new ResponseListener() {public void onResponse(ResponseEvent event) {((Snmp) event.getSource()).cancel(event.getRequest(), this);PDU response = event.getResponse();PDU request = event.getRequest();System.out.println("[request]:" + request);String ip = null;if (Beans.isNotEmpty(event.getPeerAddress())) {ip = event.getPeerAddress().toString(); // 獲取IPif(!StringUtil.isNullOrBlank(ip)){ip = ip.split("/")[0];System.out.println("registerListener, ip=" + ip);}}if (response == null) {System.out.println("[ERROR]: response is null, ip is "+ip);} else if (response.getErrorStatus() != 0) {System.out.println("[ERROR]: response status"+ response.getErrorStatus() + " Text:"+ response.getErrorStatusText()+ " ip is "+ip);} else {if(ip.equals("192.168.22.226")){System.out.println();}System.out.println("Received response Success!!!"+" ip:"+ip);CapEbu capEbu = new CapEbu();capEbu.setEbuIp(ip);List<OidData> oidDatas = new ArrayList<OidData>();for (int i = 0; i < response.size(); i++) {VariableBinding vb = response.get(i);OidData oidData = new OidData();oidData.setOid(vb.getOid().toString());oidData.setValue(vb.getVariable().toString());oidDatas.add(oidData);}if(Beans.isNotEmpty(oidDatas) && oidDatas.size()>0){for(OidData oidData : oidDatas){OidTranslatorUtil.oidvalueConvertToName(oidData, capEbu);}}if(!distinctIp.containsKey(ip)){distinctIp.put(ip, new Object());capEbus.add(capEbu);System.out.println("capEbus.size():"+capEbus.size());}}synchronized (lock) {responseCounter++;System.out.println("responseCounter++:"+responseCounter);}}};for(int i=0; i<ipsAddress.size(); i++){address = protocol + ":" + ipsAddress.get(i) + "/" + port;target = createCommunityTarget(address, community, version, 1000L, 5);snmp.send(pdu, target, null, listener);System.out.println("asynchronous send pdu wait for response...");}long baselineTime = System.currentTimeMillis();while(responseCounter < ipsAddress.size()){long sendedTime = System.currentTimeMillis();//如果逗留時間超過5秒,有可能是網(wǎng)絡(luò)原因或者其他原因,導(dǎo)致響應(yīng)不會到達監(jiān)聽器,為避免死循環(huán),則應(yīng)馬上結(jié)束循環(huán)。if(sendedTime-baselineTime>5000){break;}TimeUnit.MILLISECONDS.sleep(1000);}return capEbus;} catch (Exception e) {System.out.println("SNMP GetNext Exception:" + e);}finally{snmp.close();//掃描結(jié)束后,得立刻關(guān)掉snmp,否則監(jiān)聽器的線程將一直處于wait狀態(tài),任務(wù)數(shù)過多的話,線程創(chuàng)建也將越來越多,最終導(dǎo)致系統(tǒng)崩潰transport.close();}return null;}}

總結(jié)

以上是生活随笔為你收集整理的一段java并发编程代码的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。