日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

使用java多线程分批处理数据工具类

發(fā)布時(shí)間:2024/9/5 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用java多线程分批处理数据工具类 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

最近由于業(yè)務(wù)需要,數(shù)據(jù)量比較大,需要使用多線程來分批處理,提高處理效率和能力,于是就寫了一個(gè)通用的多線程處理工具,只需要實(shí)現(xiàn)自己的業(yè)務(wù)邏輯就可以正常使用,現(xiàn)在記錄一下

主要是針對(duì)大數(shù)據(jù)量list,將list劃分多個(gè)線程處理

ResultBean類:?返回結(jié)果統(tǒng)一bean

package com.ts.common.model;import java.io.Serializable;import com.alibaba.fastjson.JSON;/*** 返回結(jié)果統(tǒng)一bean* * ResultBean<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午3:49:46 <BR>* @version 2.0**/ public class ResultBean<T> implements Serializable {private static final long serialVersionUID = 1L;// 成功狀態(tài)public static final int SUCCESS = 1;// 處理中狀態(tài)public static final int PROCESSING = 0;// 失敗狀態(tài)public static final int FAIL = -1;// 描述private String msg = "success";// 狀態(tài)默認(rèn)成功private int code = SUCCESS;// 備注private String remark;// 返回?cái)?shù)據(jù)private T data;public ResultBean() {super();}public ResultBean(T data) {super();this.data = data;}/*** 使用異常創(chuàng)建結(jié)果*/public ResultBean(Throwable e) {super();this.msg = e.toString();this.code = FAIL;}/*** * 實(shí)例化結(jié)果默認(rèn)成功狀態(tài)<BR>* 方法名:newInstance<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午3:51:26 <BR>* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public static <T> ResultBean<T> newInstance() {ResultBean<T> instance = new ResultBean<T>();//默認(rèn)返回信息instance.code = SUCCESS;instance.msg = "success";return instance;}/*** * 實(shí)例化結(jié)果默認(rèn)成功狀態(tài)和數(shù)據(jù)<BR>* 方法名:newInstance<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年5月10日-下午2:13:16 <BR>* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public static <T> ResultBean<T> newInstance(T data) {ResultBean<T> instance = new ResultBean<T>();//默認(rèn)返回信息instance.code = SUCCESS;instance.msg = "success";instance.data = data;return instance;}/*** * 實(shí)例化返回結(jié)果<BR>* 方法名:newInstance<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午4:00:53 <BR>* @param code* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public static <T> ResultBean<T> newInstance(int code, String msg) {ResultBean<T> instance = new ResultBean<T>();//默認(rèn)返回信息instance.code = code;instance.msg = msg;return instance;}/*** * 實(shí)例化返回結(jié)果<BR>* 方法名:newInstance<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午4:00:35 <BR>* @param code* @param msg* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public static <T> ResultBean<T> newInstance(int code, String msg, T data) {ResultBean<T> instance = new ResultBean<T>();//默認(rèn)返回信息instance.code = code;instance.msg = msg;instance.data = data;return instance;}/*** * 設(shè)置返回?cái)?shù)據(jù)<BR>* 方法名:setData<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午3:52:01 <BR>* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> setData(T data){this.data = data;return this;}/*** * 設(shè)置結(jié)果描述<BR>* 方法名:setMsg<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午3:52:34 <BR>* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> setMsg(String msg){this.msg = msg;return this;}/*** * 設(shè)置狀態(tài)<BR>* 方法名:setCode<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午4:17:56 <BR>* @param code* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> setCode(int code){this.code = code;return this;}/*** * 設(shè)置備注)<BR>* 方法名:setRemark<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午5:47:29 <BR>* @param remark* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> setRemark(String remark){this.remark = remark;return this;}/*** * 設(shè)置成功描述和返回?cái)?shù)據(jù)<BR>* 方法名:success<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午3:52:58 <BR>* @param msg* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> success(String msg, T data){ this.code = SUCCESS;this.data = data;this.msg = msg;return this; } /*** * 設(shè)置成功返回結(jié)果描述<BR>* 方法名:success<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午3:53:31 <BR>* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> success(String msg){ this.code = SUCCESS;this.msg = msg;return this; }/*** * 設(shè)置處理中描述和返回?cái)?shù)據(jù)<BR>* 方法名:success<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午3:52:58 <BR>* @param msg* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> processing(String msg, T data){ this.code = PROCESSING;this.data = data;this.msg = msg;return this; } /*** * 設(shè)置處理中返回結(jié)果描述<BR>* 方法名:success<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午3:53:31 <BR>* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> processing(String msg){ this.code = PROCESSING;this.msg = msg;return this; }/*** * 設(shè)置失敗返回描述和返回?cái)?shù)據(jù)<BR>* 方法名:fail<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午3:54:04 <BR>* @param msg* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> fail(String msg, T data){ this.code = FAIL;this.data = data;this.msg = msg;return this; } /*** * 設(shè)置失敗返回描述<BR>* 方法名:fail<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午3:54:32 <BR>* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> fail(String msg){ this.code = FAIL;this.msg = msg;return this; }public T getData() { return data; } public String getMsg() { return msg; } public int getCode() { return code; } public String getRemark() { return remark; } /*** * 生成json字符串<BR>* 方法名:json<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年4月12日-下午4:42:28 <BR>* @return String<BR>* @exception <BR>* @since 2.0*/public String json(){return JSON.toJSONString(this);} } View Code

ITask接口: 實(shí)現(xiàn)自己的業(yè)務(wù)

package com.ts.common.multi.execute;import java.util.Map;/*** 任務(wù)處理接口* 具體業(yè)務(wù)邏輯可實(shí)現(xiàn)該接口* T 返回值類型* E 傳入值類型* ITask<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年8月4日-下午6:12:32 <BR>* @version 2.0**/ public interface ITask<T, E> {/*** * 任務(wù)執(zhí)行方法接口<BR>* 方法名:execute<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年8月4日-下午6:13:44 <BR>* @param e 傳入對(duì)象* @param params 其他輔助參數(shù)* @return T<BR> 返回值類型* @exception <BR>* @since 2.0*/T execute(E e, Map<String, Object> params); } View Code

HandleCallable類:?實(shí)現(xiàn)Callable接口,來處理任務(wù)

package com.ts.common.multi.execute;import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import com.ts.common.model.ResultBean;/*** * * HandleCallable<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年8月4日-上午11:55:41 <BR>* * @version 2.0**/ @SuppressWarnings("rawtypes") public class HandleCallable<E> implements Callable<ResultBean> {private static Logger logger = LoggerFactory.getLogger(HandleCallable.class);// 線程名稱 private String threadName = "";// 需要處理的數(shù)據(jù)private List<E> data;// 輔助參數(shù)private Map<String, Object> params;// 具體執(zhí)行任務(wù)private ITask<ResultBean<String>, E> task;public HandleCallable(String threadName, List<E> data, Map<String, Object> params,ITask<ResultBean<String>, E> task) {this.threadName = threadName;this.data = data;this.params = params;this.task = task;}@Overridepublic ResultBean<List<ResultBean<String>>> call() throws Exception {// 該線程中所有數(shù)據(jù)處理返回結(jié)果ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance();if (data != null && data.size() > 0) {logger.info("線程:{},共處理:{}個(gè)數(shù)據(jù),開始處理......", threadName, data.size());// 返回結(jié)果集List<ResultBean<String>> resultList = new ArrayList<>();// 循環(huán)處理每個(gè)數(shù)據(jù)for (int i = 0; i < data.size(); i++) {// 需要執(zhí)行的數(shù)據(jù)E e = data.get(i);// 將數(shù)據(jù)執(zhí)行結(jié)果加入到結(jié)果集中 resultList.add(task.execute(e, params));logger.info("線程:{},第{}個(gè)數(shù)據(jù),處理完成", threadName, (i + 1));}logger.info("線程:{},共處理:{}個(gè)數(shù)據(jù),處理完成......", threadName, data.size());resultBean.setData(resultList);}return resultBean;}} View Code

MultiThreadUtils類: 多線程工具類

package com.ts.common.multi.execute;import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import com.ts.common.model.ResultBean;/*** * * MultiThreadUtils<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年8月8日-下午8:20:42 <BR>* @version 2.0**/ public class MultiThreadUtils<T> {private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class);// 線程個(gè)數(shù),如不賦值,默認(rèn)為5private int threadCount = 5;// 具體業(yè)務(wù)任務(wù)private ITask<ResultBean<String>, T> task;// 線程池管理器private CompletionService<ResultBean> pool = null;/*** * 初始化線程池和線程個(gè)數(shù)<BR>* 方法名:newInstance<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年8月8日-下午8:22:00 <BR>* @param threadCount* @return MultiThreadUtils<BR>* @exception <BR>* @since 2.0*/public static MultiThreadUtils newInstance(int threadCount) {MultiThreadUtils instance = new MultiThreadUtils();threadCount = threadCount;instance.setThreadCount(threadCount);return instance;}/*** * 多線程分批執(zhí)行l(wèi)ist中的任務(wù)<BR>* 方法名:execute<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年8月8日-下午8:22:31 <BR>* @param data 線程處理的大數(shù)據(jù)量list* @param params 處理數(shù)據(jù)是輔助參數(shù)傳遞* @param task 具體執(zhí)行業(yè)務(wù)的任務(wù)接口* @return ResultBean<BR> * @exception <BR>* @since 2.0*/@SuppressWarnings("rawtypes")public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) {// 創(chuàng)建線程池ExecutorService threadpool = Executors.newFixedThreadPool(threadCount);// 根據(jù)線程池初始化線程池管理器pool = new ExecutorCompletionService<ResultBean>(threadpool);// 開始時(shí)間(ms)long l = System.currentTimeMillis();// 數(shù)據(jù)量大小int length = data.size();// 每個(gè)線程處理的數(shù)據(jù)個(gè)數(shù)int taskCount = length / threadCount;// 劃分每個(gè)線程調(diào)用的數(shù)據(jù)for (int i = 0; i < threadCount; i++) {// 每個(gè)線程任務(wù)數(shù)據(jù)listList<T> subData = null;if (i == (threadCount - 1)) {subData = data.subList(i * taskCount, length);} else {subData = data.subList(i * taskCount, (i + 1) * taskCount);}// 將數(shù)據(jù)分配給各個(gè)線程HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task);// 將線程加入到線程池 pool.submit(execute);}// 總的返回結(jié)果集List<ResultBean<String>> result = new ArrayList<>();for (int i = 0; i < threadCount; i++) {// 每個(gè)線程處理結(jié)果集ResultBean<List<ResultBean<String>>> threadResult;try {threadResult = pool.take().get();result.addAll(threadResult.getData());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}// 關(guān)閉線程池 threadpool.shutdownNow();// 執(zhí)行結(jié)束時(shí)間long end_l = System.currentTimeMillis();logger.info("總耗時(shí):{}ms", (end_l - l));return ResultBean.newInstance().setData(result);}public int getThreadCount() {return threadCount;}public void setThreadCount(int threadCount) {this.threadCount = threadCount;}} View Code

測(cè)試類TestTask

package com.ts.common.multi.execute;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;import com.ts.common.model.ResultBean;/*** * 具體執(zhí)行業(yè)務(wù)任務(wù) 需要 實(shí)現(xiàn)ITask接口 在execute中重寫業(yè)務(wù)邏輯* TestTask<BR>* 創(chuàng)建人:wangbeidou <BR>* 時(shí)間:2018年8月8日-下午8:40:32 <BR>* @version 2.0**/ public class TestTask implements ITask<ResultBean<String>, Integer> {@Overridepublic ResultBean execute(Integer e, Map<String, Object> params) {/*** 具體業(yè)務(wù)邏輯:將list中的元素加上輔助參數(shù)中的數(shù)據(jù)返回*/int addNum = Integer.valueOf(String.valueOf(params.get("addNum")));e = e + addNum;ResultBean<String> resultBean = ResultBean.newInstance();resultBean.setData(e.toString());return resultBean;}public static void main(String[] args) {// 需要多線程處理的大量數(shù)據(jù)listList<Integer> data = new ArrayList<>(10000);for(int i = 0; i < 10000; i ++){data.add(i + 1);}// 創(chuàng)建多線程處理任務(wù)MultiThreadUtils<Integer> threadUtils = MultiThreadUtils.newInstance(5);ITask<ResultBean<String>, Integer> task = new TestTask();// 輔助參數(shù) 加數(shù)Map<String, Object> params = new HashMap<>();params.put("addNum", 4);// 執(zhí)行多線程處理,并返回處理結(jié)果ResultBean<List<ResultBean<String>>> resultBean = threadUtils.execute(data, params, task);}}

?

轉(zhuǎn)載于:https://www.cnblogs.com/qixing/p/9451714.html

總結(jié)

以上是生活随笔為你收集整理的使用java多线程分批处理数据工具类的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。