Mybatis PageHelper
如果你也在用 MyBatis,建议尝试该分页插件,这一定是最方便使用的分页插件。
分页插件支持任何复杂的单表、多表分页。
拓展阅读
基本的使用
// 查询条件
UserExample userExample = new UserExample;
long count = userService.count(userExample);
long totalPageSize = count / pageSize;
// 注意是否整除的问题
if(count % pageSize != 0) {
totalPageSize++;
}
// 分页处理
for(int i = 1; i <= totalPageSize; i++) {
List<User> list = userService.queryByPage(userExample, i, pageSize);
// 处理结果
}
通用的分页设计
接口
public interface IBizQueue {
/**
* 处理入参
* @param baseRequest 入参
*/
void handle(BaseRequest baseRequest);
}
基本实现
/**
* 抽象业务队列
*
* @author binbin.hou
*/
public abstract class AbstractBizQueue<T> implements IBizQueue {
private static final Logger logger = LoggerFactory.getLogger(AbstractBizQueue.class);
protected ArrayBlockingQueue<T> queue = null;
/**
* 计算总数
*/
private AtomicLong counter = null;
/**
* 请求入参
*/
private BaseRequest baseRequest = null;
/**
* 分页大小
*
* @return 大小
*/
protected int getPageSize() {
return 10000;
}
/**
* 线程数量
*
* @return 数量
*/
protected int getThreadNum() {
return 10;
}
/**
* 等待的毫秒数
*
* @return 等待
*/
protected long getAwaitMills() {
return 2000;
}
/**
* 获取任务名称
*
* @return 结果
*/
protected String getTaskName() {
return this.getClass().getSimpleName();
}
/**
* 单个处理逻辑
*
* @param request 入参
* @param t 单个实体
*/
protected abstract void doHandle(BaseRequest request, T t);
/**
* 查询总数
*
* @param request 条件
* @return 结果
*/
protected abstract long queryCount(BaseRequest request);
/**
* 分页查询
*
* @param pageNum 当前页
* @param pageSize 大小
* @param request 请求
* @return 结果
*/
protected abstract List<T> queryByPage(int pageNum, int pageSize, BaseRequest request);
/**
* 查询列表
*
* @param ids 指定查询的 id 列表
* @param request 请求
* @return 结果
*/
protected abstract List<T> queryByIds(List<String> ids, BaseRequest request);
public AbstractBizQueue() {
final int threadNum = getThreadNum();
final String taskName = getTaskName();
//1. 初始化
// 消费者线程池
final ExecutorService executor = Executors.newFixedThreadPool(threadNum, new ThreadFactory() {
int i = 1;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, String.format(taskName + "-THREAD-%s", i++));
t.setDaemon(true);
return t;
}
});
logger.info("完成初始化线程池 {}", taskName);
this.queue = new ArrayBlockingQueue<>(2 * getPageSize(), true);
this.counter = new AtomicLong(0);
//2. 初始化消费者线程
for (int i = 0; i < threadNum; i++) {
ConsumerTask task = new ConsumerTask();
executor.execute(task);
logger.info("{}-消费者线程-{} 启动完成", taskName, i);
}
}
/**
* 处理多线程
* @param baseRequest 请求
*/
protected void handleMultiThread(BaseRequest baseRequest) {
final int pageSize = this.getPageSize();
long count = this.queryCount(baseRequest);
long totalNo = count % pageSize == 0 ? count / pageSize : (count / pageSize) + 1;
logger.info("开始处理信息,总数据量{},总共{}页", count, totalNo);
// 注意:采用分页一定要保证查询的条件不能在迭代中被修改,否则会导致有些更新不到。
for (int currentNo = 1; currentNo <= totalNo; currentNo++) {
// 生产者等待
awaitQueue();
logger.info("当前处理第 {} 页数据", currentNo);
List<T> pageList = this.queryByPage(currentNo, pageSize, baseRequest);
logger.info("完成查询第 {} 页,结果大小: {}", currentNo, pageList.size());
queue.addAll(pageList);
logger.info("添加到队列完成: {}", pageList.size());
}
}
@Override
public void handle(BaseRequest baseRequest) {
try {
counter = new AtomicLong(0);
this.baseRequest = baseRequest;
logger.info("开始处理入参:{}", baseRequest);
if (YesOrNoEnum.Y.getCode().equals(baseRequest.getFlag())) {
this.handleMultiThread(baseRequest);
} else {
List<String> idList = baseRequest.getMerIdList();
List<T> list = this.queryByIds(idList, baseRequest);
if (CollectionUtils.isEmpty(list)) {
logger.info("{} 对应的列表为空,忽略处理。", baseRequest);
return;
}
for (T entity : list) {
this.doHandle(baseRequest, entity);
}
}
logger.info("完成处理入参:{}", baseRequest);
} catch (Exception exception) {
logger.error("{} 处理异常", baseRequest, exception);
} finally {
TraceIdUtil.remove();
}
}
// 消费线程任务
private class ConsumerTask implements Runnable {
@Override
public void run() {
while (true) {
try {
// 会阻塞直到获取到元素
T entity = queue.take();
logger.info("开始处理元素:{}", JSON.toJSON(entity));
doHandle(baseRequest, entity);
long num = counter.incrementAndGet();
logger.info("当前已完成处理的总数: {}, queue.size {}", num, queue.size());
} catch (InterruptedException e) {
logger.error("消费者线程执行异常 999999", e);
}
}
}
}
/**
* 等待,直到 queue 的小于等于 limit,才进行生产处理
*/
protected void awaitQueue() {
while (true) {
// 获取阻塞队列的大小
int size = queue.size();
// 对于可变的更新,需要保证查询的内容不重复。
final int limitSize = 1;
final long awaitMills = getAwaitMills();
if (size >= limitSize) {
try {
logger.info("当前队列大小:{}, 限制大小: {},睡眠等待", size, limitSize);
// 根据实际的情况进行调整
Thread.sleep(awaitMills);
} catch (InterruptedException e) {
logger.error("等待队列异常", e);
}
} else {
logger.info("生产者等待结束。");
break;
}
}
}
}
抽象可变的实现
如果我们是一变查询,一边修改。
那么,就可以改为如下的实现:
/**
* 抽象业务队列-支持一遍查询,一遍修改的方式
*
* @author binbin.hou
*/
public abstract class AbstractBizModifyQueue<T> extends AbstractBizQueue<T> {
private static final Logger logger = LoggerFactory.getLogger(AbstractBizModifyQueue.class);
/**
* 处理多线程
*
* 一遍查询一遍修改,就是每一次都从第一页开始查询,边查询遍修改。
* @param baseRequest 请求
*/
@Override
protected void handleMultiThread(BaseRequest baseRequest) {
final int pageSize = this.getPageSize();
long count = this.queryCount(baseRequest);
long totalNo = count % pageSize == 0 ? count / pageSize : (count / pageSize) + 1;
logger.info("开始处理信息,总数据量{},总共{}页", count, totalNo);
// 注意:采用分页一定要保证查询的条件不能在迭代中被修改,否则会导致有些更新不到。
// 这里有一个小问题,因为数据一直在更新,所以不应该使用页数的方式
int pageCount = 1;
int handleCount = 0;
while (true) {
// 生产者等待
awaitQueue();
logger.info("当前处理第 {} 页数据", pageCount);
// 定时添加到阻塞队列中
List<T> list = this.queryByPage(1, pageSize, baseRequest);
if(CollectionUtils.isEmpty(list)) {
logger.info("当前队列信息为空,停止处理。");
break;
}
queue.addAll(list);
pageCount++;
// 避免死循环
handleCount += list.size();
logger.info("添加到队列完成: {}, 已添加总数: {}", list.size(), handleCount);
if(handleCount > count) {
logger.info("处理总数 {} 已超过 {},跳出循环。", handleCount, count);
break;
}
}
}
}