提到java多線程不免有些人會頭大,很多概念都是很理解但是真正到了實戰的時候又是不知道如何操作了,下面就結合實際項目來說說多線程的應用。
業務需求:
舉例:批量插入10萬條用戶的相關活動優惠券
操作方式:使用固定10個大小的線程池來做,並每次處理1000條插入數據
線程類:注實現Callable
public class InsertBatchThread implements Callable{ private int vdate; private int uid; private int count; private FundsInfoMapper fundsInfoMapper; private WmpsDayInterMapper wmpsDayInterMapper; private DataSource dataSource; public WmpsDayInterMapper getWmpsDayInterMapper() { if (null == wmpsDayInterMapper) { synchronized (this) { if (null == wmpsDayInterMapper) { wmpsDayInterMapper = SpringContextUtils.getBean("wmpsDayInterMapper"); } } } return wmpsDayInterMapper; } public FundsInfoMapper getProCommFundsInfoMapper() { if (null == fundsInfoMapper) { synchronized (this) { if (null == fundsInfoMapper) { fundsInfoMapper = SpringContextUtils.getBean("fundsInfoMapper"); } } } return fundsInfoMapper; } /** * 無參構造函數 */ public InsertBatchThread(){ } /** * 無參構造函數 */ public InsertBatchThread(int vdate,int uid,int count){ this.vdate=vdate; this.uid=uid; this.count=count; this.fundsInfoMapper=getFundsInfoMapper(); this.wmpsDayInterMapper=getWmpsDayInterMapper(); } /** * 多線程規定好的方法,如果是繼承Thread類則必須實現這個方法 */ public Integer call() { int result = -1; try { //操作 List listUsers = fundsInfoMapper.selectUserForInsertBatch(count * 1000, 1000); //批量插入用戶活動優惠券記錄表 List listOnePageDayInner = wmpsDayInterMapper.selectByInsertBatch(vdate,listUsers.get(0),listUsers.get(listUsers.size() - 1)); wmpsDayInterInsertBatch(listOnePageDayInner); }catch (Exception e){ result=count; } return result; } //批量插入用戶活動優惠券記錄表--JDBC public int[] wmpsDayInterInsertBatch(List listOnePage) throws Exception{ dataSource= SpringContextUtils.getBean("dataSource"); PreparedStatement ps = dataSource.getConnection().prepareStatement("INSERT ignore INTO t_a (uid, inter, cdate) values(?,?,?)" ); for(WmpsDayInter oneObj :listOnePage ){ ps.setInt(1,oneObj.getUid()); ps.setBigDecimal(2, oneObj.getInter()); ps.setBigDecimal(3,oneObj.getCdate()); ps.addBatch(); } return ps.executeBatch(); } }
對應的業務實現類:
/** * 活動優惠券 */ @Service("FundsInfoService") public class FundsInfoServiceImpl extends BaseServiceimplements IFundsInfoService { private final static Logger LOGGER = LoggerFactory.getLogger(FundsInfoServiceImpl.class); @Autowired private FundsInfoMapper fundsInfoMapper; public FundsInfoServiceImpl() { } @Override public void insertDayInter(Integer vdate, Integer uid) { //活動優惠券條件判斷 if (vdate == null || vdate.intValue() <= 0 || uid == null || uid.intValue() < 0) { LOGGER.error("=insertDayInter=>error,vdate=" + vdate + ",uid=" + uid); } else { //統計符合條件用戶 int totalNum = fundsInfoMapper.countForInsertBatchByUser(); List listException = new ArrayList (); //初始化第一批次起始值 int startRow = 0; try { //固定大小線程池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); for (int i = 0; i <= Math.floor(totalNum / 1000); i++) { Future future = fixedThreadPool.submit(new InsertBatchThread(vdate, uid, i)); if( future.get()>=0 ){ listException.add( future.get()); } } //活動優惠券發完,驗證是否有遺漏,如果有則修復 if(listException.size()>0){ for (int i = 0; i <= listException.size(); i++) { Future future = fixedThreadPool.submit(new InsertBatchThread(vdate, uid, listException.get(i))); if( future.get()>=0 ){ listException.add( future.get()); } } } }catch (Exception e){ LOGGER.error("<=insertDayInter=>error",e); } } LOGGER.info("<=insertDayInter=>"); } }
問題在於,當我們需要使用多線程操作時,一般會先查詢,再進行插入,但是此時如果我們沒有一個合理的維度去分割數據的話,很容易造成死鎖。例如如果我們沒有將維度分配合理,就有可能批次一與批次二同時處理相同的數據,既要查詢又要更新,互相均需要對方的鎖釋放才可以繼續執行,導致死鎖。