使用java多线程分批处理数据工具类
生活随笔
收集整理的這篇文章主要介紹了
使用java多线程分批处理数据工具类
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
最近由于業務需要,數據量比較大,需要使用多線程來分批處理,提高處理效率和能力,于是就寫了一個通用的多線程處理工具,只需要實現自己的業務邏輯就可以正常使用,現在記錄一下
主要是針對大數據量list,將list劃分多個線程處理
ResultBean類:?返回結果統一bean
package com.ts.common.model;import java.io.Serializable;import com.alibaba.fastjson.JSON;/*** 返回結果統一bean* * ResultBean<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午3:49:46 <BR>* @version 2.0**/ public class ResultBean<T> implements Serializable {private static final long serialVersionUID = 1L;// 成功狀態public static final int SUCCESS = 1;// 處理中狀態public static final int PROCESSING = 0;// 失敗狀態public static final int FAIL = -1;// 描述private String msg = "success";// 狀態默認成功private int code = SUCCESS;// 備注private String remark;// 返回數據private T data;public ResultBean() {super();}public ResultBean(T data) {super();this.data = data;}/*** 使用異常創建結果*/public ResultBean(Throwable e) {super();this.msg = e.toString();this.code = FAIL;}/*** * 實例化結果默認成功狀態<BR>* 方法名:newInstance<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午3:51:26 <BR>* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public static <T> ResultBean<T> newInstance() {ResultBean<T> instance = new ResultBean<T>();//默認返回信息instance.code = SUCCESS;instance.msg = "success";return instance;}/*** * 實例化結果默認成功狀態和數據<BR>* 方法名:newInstance<BR>* 創建人:wangbeidou <BR>* 時間:2018年5月10日-下午2:13:16 <BR>* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public static <T> ResultBean<T> newInstance(T data) {ResultBean<T> instance = new ResultBean<T>();//默認返回信息instance.code = SUCCESS;instance.msg = "success";instance.data = data;return instance;}/*** * 實例化返回結果<BR>* 方法名:newInstance<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午4:00:53 <BR>* @param code* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public static <T> ResultBean<T> newInstance(int code, String msg) {ResultBean<T> instance = new ResultBean<T>();//默認返回信息instance.code = code;instance.msg = msg;return instance;}/*** * 實例化返回結果<BR>* 方法名:newInstance<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午4:00:35 <BR>* @param code* @param msg* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public static <T> ResultBean<T> newInstance(int code, String msg, T data) {ResultBean<T> instance = new ResultBean<T>();//默認返回信息instance.code = code;instance.msg = msg;instance.data = data;return instance;}/*** * 設置返回數據<BR>* 方法名:setData<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午3:52:01 <BR>* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> setData(T data){this.data = data;return this;}/*** * 設置結果描述<BR>* 方法名:setMsg<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午3:52:34 <BR>* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> setMsg(String msg){this.msg = msg;return this;}/*** * 設置狀態<BR>* 方法名:setCode<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午4:17:56 <BR>* @param code* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> setCode(int code){this.code = code;return this;}/*** * 設置備注)<BR>* 方法名:setRemark<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午5:47:29 <BR>* @param remark* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> setRemark(String remark){this.remark = remark;return this;}/*** * 設置成功描述和返回數據<BR>* 方法名:success<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午3:52:58 <BR>* @param msg* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> success(String msg, T data){ this.code = SUCCESS;this.data = data;this.msg = msg;return this; } /*** * 設置成功返回結果描述<BR>* 方法名:success<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午3:53:31 <BR>* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> success(String msg){ this.code = SUCCESS;this.msg = msg;return this; }/*** * 設置處理中描述和返回數據<BR>* 方法名:success<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午3:52:58 <BR>* @param msg* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> processing(String msg, T data){ this.code = PROCESSING;this.data = data;this.msg = msg;return this; } /*** * 設置處理中返回結果描述<BR>* 方法名:success<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午3:53:31 <BR>* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> processing(String msg){ this.code = PROCESSING;this.msg = msg;return this; }/*** * 設置失敗返回描述和返回數據<BR>* 方法名:fail<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午3:54:04 <BR>* @param msg* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> fail(String msg, T data){ this.code = FAIL;this.data = data;this.msg = msg;return this; } /*** * 設置失敗返回描述<BR>* 方法名:fail<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午3:54:32 <BR>* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> fail(String msg){ this.code = FAIL;this.msg = msg;return this; }public T getData() { return data; } public String getMsg() { return msg; } public int getCode() { return code; } public String getRemark() { return remark; } /*** * 生成json字符串<BR>* 方法名:json<BR>* 創建人:wangbeidou <BR>* 時間:2018年4月12日-下午4:42:28 <BR>* @return String<BR>* @exception <BR>* @since 2.0*/public String json(){return JSON.toJSONString(this);} } View CodeITask接口: 實現自己的業務
package com.ts.common.multi.execute;import java.util.Map;/*** 任務處理接口* 具體業務邏輯可實現該接口* T 返回值類型* E 傳入值類型* ITask<BR>* 創建人:wangbeidou <BR>* 時間:2018年8月4日-下午6:12:32 <BR>* @version 2.0**/ public interface ITask<T, E> {/*** * 任務執行方法接口<BR>* 方法名:execute<BR>* 創建人:wangbeidou <BR>* 時間:2018年8月4日-下午6:13:44 <BR>* @param e 傳入對象* @param params 其他輔助參數* @return T<BR> 返回值類型* @exception <BR>* @since 2.0*/T execute(E e, Map<String, Object> params); } View CodeHandleCallable類:?實現Callable接口,來處理任務
package com.ts.common.multi.execute;import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import com.ts.common.model.ResultBean;/*** * * HandleCallable<BR>* 創建人:wangbeidou <BR>* 時間:2018年8月4日-上午11:55:41 <BR>* * @version 2.0**/ @SuppressWarnings("rawtypes") public class HandleCallable<E> implements Callable<ResultBean> {private static Logger logger = LoggerFactory.getLogger(HandleCallable.class);// 線程名稱 private String threadName = "";// 需要處理的數據private List<E> data;// 輔助參數private Map<String, Object> params;// 具體執行任務private ITask<ResultBean<String>, E> task;public HandleCallable(String threadName, List<E> data, Map<String, Object> params,ITask<ResultBean<String>, E> task) {this.threadName = threadName;this.data = data;this.params = params;this.task = task;}@Overridepublic ResultBean<List<ResultBean<String>>> call() throws Exception {// 該線程中所有數據處理返回結果ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance();if (data != null && data.size() > 0) {logger.info("線程:{},共處理:{}個數據,開始處理......", threadName, data.size());// 返回結果集List<ResultBean<String>> resultList = new ArrayList<>();// 循環處理每個數據for (int i = 0; i < data.size(); i++) {// 需要執行的數據E e = data.get(i);// 將數據執行結果加入到結果集中 resultList.add(task.execute(e, params));logger.info("線程:{},第{}個數據,處理完成", threadName, (i + 1));}logger.info("線程:{},共處理:{}個數據,處理完成......", threadName, data.size());resultBean.setData(resultList);}return resultBean;}} View CodeMultiThreadUtils類: 多線程工具類
package com.ts.common.multi.execute;import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import com.ts.common.model.ResultBean;/*** * * MultiThreadUtils<BR>* 創建人:wangbeidou <BR>* 時間:2018年8月8日-下午8:20:42 <BR>* @version 2.0**/ public class MultiThreadUtils<T> {private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class);// 線程個數,如不賦值,默認為5private int threadCount = 5;// 具體業務任務private ITask<ResultBean<String>, T> task;// 線程池管理器private CompletionService<ResultBean> pool = null;/*** * 初始化線程池和線程個數<BR>* 方法名:newInstance<BR>* 創建人:wangbeidou <BR>* 時間:2018年8月8日-下午8:22:00 <BR>* @param threadCount* @return MultiThreadUtils<BR>* @exception <BR>* @since 2.0*/public static MultiThreadUtils newInstance(int threadCount) {MultiThreadUtils instance = new MultiThreadUtils();threadCount = threadCount;instance.setThreadCount(threadCount);return instance;}/*** * 多線程分批執行list中的任務<BR>* 方法名:execute<BR>* 創建人:wangbeidou <BR>* 時間:2018年8月8日-下午8:22:31 <BR>* @param data 線程處理的大數據量list* @param params 處理數據是輔助參數傳遞* @param task 具體執行業務的任務接口* @return ResultBean<BR> * @exception <BR>* @since 2.0*/@SuppressWarnings("rawtypes")public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) {// 創建線程池ExecutorService threadpool = Executors.newFixedThreadPool(threadCount);// 根據線程池初始化線程池管理器pool = new ExecutorCompletionService<ResultBean>(threadpool);// 開始時間(ms)long l = System.currentTimeMillis();// 數據量大小int length = data.size();// 每個線程處理的數據個數int taskCount = length / threadCount;// 劃分每個線程調用的數據for (int i = 0; i < threadCount; i++) {// 每個線程任務數據listList<T> subData = null;if (i == (threadCount - 1)) {subData = data.subList(i * taskCount, length);} else {subData = data.subList(i * taskCount, (i + 1) * taskCount);}// 將數據分配給各個線程HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task);// 將線程加入到線程池 pool.submit(execute);}// 總的返回結果集List<ResultBean<String>> result = new ArrayList<>();for (int i = 0; i < threadCount; i++) {// 每個線程處理結果集ResultBean<List<ResultBean<String>>> threadResult;try {threadResult = pool.take().get();result.addAll(threadResult.getData());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}// 關閉線程池 threadpool.shutdownNow();// 執行結束時間long end_l = System.currentTimeMillis();logger.info("總耗時:{}ms", (end_l - l));return ResultBean.newInstance().setData(result);}public int getThreadCount() {return threadCount;}public void setThreadCount(int threadCount) {this.threadCount = threadCount;}} View Code測試類TestTask
package com.ts.common.multi.execute;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;import com.ts.common.model.ResultBean;/*** * 具體執行業務任務 需要 實現ITask接口 在execute中重寫業務邏輯* TestTask<BR>* 創建人:wangbeidou <BR>* 時間:2018年8月8日-下午8:40:32 <BR>* @version 2.0**/ public class TestTask implements ITask<ResultBean<String>, Integer> {@Overridepublic ResultBean execute(Integer e, Map<String, Object> params) {/*** 具體業務邏輯:將list中的元素加上輔助參數中的數據返回*/int addNum = Integer.valueOf(String.valueOf(params.get("addNum")));e = e + addNum;ResultBean<String> resultBean = ResultBean.newInstance();resultBean.setData(e.toString());return resultBean;}public static void main(String[] args) {// 需要多線程處理的大量數據listList<Integer> data = new ArrayList<>(10000);for(int i = 0; i < 10000; i ++){data.add(i + 1);}// 創建多線程處理任務MultiThreadUtils<Integer> threadUtils = MultiThreadUtils.newInstance(5);ITask<ResultBean<String>, Integer> task = new TestTask();// 輔助參數 加數Map<String, Object> params = new HashMap<>();params.put("addNum", 4);// 執行多線程處理,并返回處理結果ResultBean<List<ResultBean<String>>> resultBean = threadUtils.execute(data, params, task);}}?
轉載于:https://www.cnblogs.com/qixing/p/9451714.html
總結
以上是生活随笔為你收集整理的使用java多线程分批处理数据工具类的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring 嵌套方法AOP不生效问题
- 下一篇: 防火墙及其功能(转)