下面设计实现的是:交换机Hlr指令处理任务模块。当然,在后续的业务发展过程中,还可能出现,其他类型指令的任务处理,所以根据“开闭”原则的定义,要抽象出一个接口类:BusinessEvent
/** * @filename:BusinessEvent.java * * Newland Co. Ltd. All rights reserved. * * @Description:业务事件任务接口定义 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; public interface BusinessEvent { // 执行具体批处理的任务 public int execute(Integer userId); }然后具体的Hlr指令发送任务模块HlrBusinessEvent要实现这个接口类的方法,完成用户停复机Hlr指令的派发。代码如下:
/** * @filename:HlrBusinessEvent.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hlr指令派发任务接口定义 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; import org.apache.commons.lang.math.RandomUtils; public class HlrBusinessEvent implements BusinessEvent { // 交换机上的指令执行成功失败标识0表示成功 1表示失败 public final static int TASKSUCC = 0; public final static int TASKFAIL = 1; private final static int ELAPSETIME = 1000; @Override public int execute(Integer userId) { // 这里为了举例,随机产生1000以内的随机数 int millis = RandomUtils.nextInt(ELAPSETIME); // 简单模拟往交换机发送停机/复机的指令 try { Thread.sleep(millis); String strContent = String.format( "线程标识[%s]用户标识:[%d]执行交换机指令工单耗时:[%d]毫秒", Thread .currentThread().getName(), userId, millis); System.out.println(strContent); // 这里为了演示直接简单根据随机数是不是偶数简单模拟交换机指令执行的结果 return (millis % 2 == 0) ? TASKSUCC : TASKFAIL; } catch (InterruptedException e) { e.printStackTrace(); return TASKFAIL; } } }实际运行情况中,我们可能要监控一下指令发送的时长,于是再设计一个:针对Hlr指令发送任务模块HlrBusinessEvent,切面嵌入代理的Hlr指令时长计算代理类:HlrBusinessEventAdvisor,具体的代码如下:
/** * @filename:HlrBusinessEventAdvisor.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hlr指令派发时长计算代理类 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.commons.lang.time.StopWatch; public class HlrBusinessEventAdvisor implements MethodInterceptor { public HlrBusinessEventAdvisor() { } @Override public Object invoke(MethodInvocation invocation) throws Throwable { // 计算一下指令派发时长 StopWatch sw = new StopWatch(); sw.start(); Object obj = invocation.proceed(); sw.stop(); System.out.println("执行交换机指令工单耗时: [" + sw.getTime() + "] 毫秒"); return obj; } }剩下的,我们由于是要,异步并行计算得到执行结果,于是我们设计一个:批处理Hlr任务执行模块HlrBusinessEventTask,它要实现java.util.concurrent.Callable接口的方法call,它会返回一个异步任务的执行结果。
/** * @filename:HlrBusinessEventTask.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hlr指令派任务执行类 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; import java.util.concurrent.Callable; import org.springframework.aop.framework.ProxyFactory; import org.springframework.aop.support.NameMatchMethodPointcutAdvisor; public class HlrBusinessEventTask implements Callable<Integer> { private NotifyUsers user = null; private final static String MAPPERMETHODNAME = "execute"; public HlrBusinessEventTask(NotifyUsers user) { this.user = user; } @Override public Integer call() throws Exception { synchronized (this) { ProxyFactory weaver = new ProxyFactory(new HlrBusinessEvent()); NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(); advisor.setMappedName(MAPPERMETHODNAME); advisor.setAdvice(new HlrBusinessEventAdvisor()); weaver.addAdvisor(advisor); BusinessEvent proxyObject = (BusinessEvent) weaver.getProxy(); Integer result = new Integer(proxyObject.execute(user.getUserId())); // 返回执行结果 return result; } } }- 接下来,我们要把并行异步加载的查询结果,和并行异步处理任务执行的模块,给它组合起来使用,故重新封装一个,通知用户批处理任务管理类模块:NotifyUsersBatchTask。它的主要功能是:批量并行异步加载查询待停复机的手机用户,然后把它放入并行异步处理的线程池中,进行异步处理。然后我们打印出,本次批处理的任务一共有多少,成功数和失败数分别是多少(当然,本文还给出了另外一种JMX方式的监控)。NotifyTaskSuccCounter类,主要是统计派发的任务中执行成功的任务的数量,而与之相对应的类NotifyTaskFailCounter,是用来统计执行失败的任务的数量。具体的代码如下
/** * @filename:NotifyUsersBatchTask.java * * Newland Co. Ltd. All rights reserved. * * @Description:通知用户批处理任务管理类 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import javax.sql.DataSource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.commons.collections.Closure; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.IfClosure; import org.apache.commons.lang.StringUtils; import newlandframework.batchtask.jmx.BatchTaskMonitor; import newlandframework.batchtask.model.NotifyUsers; import newlandframework.batchtask.parallel.BatchQueryLoader; import newlandframework.batchtask.parallel.BatchTaskReactor; public class NotifyUsersBatchTask { public NotifyUsersBatchTask() { } private ArrayList<DataSource> dataSource; // 基于JMX的任务完成情况监控计数器 private BatchTaskMonitor monitor = new BatchTaskMonitor(BatchTaskReactor.BATCHTASK_THREADPOOL_NAME); // 支持同时加载多个数据源 public NotifyUsersBatchTask(ArrayList<DataSource> dataSource) { this.dataSource = dataSource; } // 批处理任务执行成功计数器 class NotifyTaskSuccCounter implements Closure { public static final String NOTIFYTASKSUCCCOUNTER = "TASKSUCCCOUNTER"; private int numberSucc = 0; public void execute(Object input) { monitor.increaseBatchTaskCounter(NOTIFYTASKSUCCCOUNTER); numberSucc++; } public int getSuccNumber() { return numberSucc; } } // 批处理任务执行失败计数器 class NotifyTaskFailCounter implements Closure { public static final String NOTIFYTASKFAILCOUNTER = "TASKFAILCOUNTER"; private int numberFail = 0; public void execute(Object input) { monitor.increaseBatchTaskCounter(NOTIFYTASKFAILCOUNTER); numberFail++; } public int getFailNumber() { return numberFail; } } // 并行加载查询多个水平分库的数据集合 public List<NotifyUsers> query() throws SQLException { BatchQueryLoader loader = new BatchQueryLoader(); String strSQL = "select home_city, msisdn, user_id from notify_users"; for (int i = 0; i < dataSource.size(); i++) { Connection con = dataSource.get(i).getConnection(); Statement st = con.createStatement(); loader.attachLoadEnv(strSQL, st, con); } List<ResultSet> list = loader.executeQuery(); System.out.println("查询出记录总数为:" + list.size()); final List<NotifyUsers> listNotifyUsers = new ArrayList<NotifyUsers>(); for (int i = 0; i < list.size(); i++) { ResultSet rs = list.get(i); while (rs.next()) { NotifyUsers users = new NotifyUsers(); users.setHomeCity(rs.getInt(1)); users.setMsisdn(rs.getInt(2)); users.setUserId(rs.getInt(3)); listNotifyUsers.add(users); } } // 释放连接资源 loader.close(); return listNotifyUsers; } // 批处理数据集合,任务分派 public void batchNotify(List<NotifyUsers> list, final ExecutorService excutor) { System.out.println("处理记录总数为:" + list.size()); System.out.println(StringUtils.center("记录明细如下", 40, "-")); NotifyTaskSuccCounter cntSucc = new NotifyTaskSuccCounter(); NotifyTaskFailCounter cntFail = new NotifyTaskFailCounter(); BatchTaskPredicate predicate = new BatchTaskPredicate(excutor); Closure batchAction = new IfClosure(predicate, cntSucc, cntFail); CollectionUtils.forAllDo(list, batchAction); System.out.println("批处理一共处理:" + list.size() + "记录,处理成功:" + cntSucc.getSuccNumber() + "条记录,处理失败:" + cntFail.getFailNumber() + "条记录"); } }