拓展阅读
JVM FULL GC 生产问题 II-如何定位内存泄露? 线程通用实现
JVM FULL GC 生产问题 III-多线程执行队列的封装实现,进一步抽象
java 多线程实现通用方法 threadpool implement in java
业务说明
有时候业务需要全量做一些事情。
比如给全量用户推送数据、全量抓取数据。
一般而言,如果处理的信息可以过滤,那最好先做一层过滤。
如果无法过滤,数量又比较多的情况下,那使用多线程处理不失为一种解决方式。
抽象目标方法
java 实现如下:
import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
/**
* 说明:全量推送,当数量较多时,推送较慢。所以采用多线程的方式。
*/
@Component
@Slf4j
public abstract class AbstractThreadBiz {
/**
* 初始化的线程池
*/
private final ExecutorService executorService = new ThreadPoolExecutor(getThreadNum(), getThreadNum(), 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(getThreadNum()),
new ThreadFactory() {
private int i = 1;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, String.format("THREAD-POOL-%s", i++));
t.setDaemon(true);
return t;
}
});
/**
* 线程数量
* @return 数量
*/
protected int getThreadNum() {
return 5;
}
/**
* 是否需要推送标识
* @return 推送标识
*/
protected boolean needPushFlag() {
return true;
}
/**
* 处理业务
* @param bizId 业务标识
* @param transTime 交易时间
*/
protected abstract void doHandleBiz(String bizId, String transTime);
/**
* 查询所有的业务列表
*/
protected abstract List<String> doQueryAllBizList();
/**
* 对外的处理入口
* @param transTime 执行时间
*/
public void execute(String transTime) {
boolean needPushFlag = needPushFlag();
if(!needPushFlag) {
log.warn("{} 推送标识关闭,忽略推送处理", transTime);
return;
}
//0. 插入初始化任务
String taskId = IdUtil.uuid32();
try {
//1. 查询所有的业务标识
List<String> bizIdList = doQueryAllBizList();
//2. 多线程处理
this.threadHandleBizList(bizIdList, transTime);
//3. 任务结束
} catch (Exception exception) {
//4. 任务失败
log.error("任务执行异常", exception);
}
}
/**
* 多线程处理业务列表
* @param list 所有的列表
* @param transTime 交易时间
*/
protected void threadHandleBizList(List<String> list,
String transTime) {
if(CollectionUtil.isEmpty(list)) {
log.warn("绑定列表为空,完成处理");
return;
}
int threadNum = getThreadNum();
int count = list.size();
log.info("总业务数量 {}, 总线程数量 {}", count, threadNum);
// 每个线程处理量
final int countPerThread = count / threadNum;
final String mdcId = LogUtil.getMdcId();
// 数量较少时
if(countPerThread <= 0) {
log.warn("数量较少直接单线程执行 count {}, threadNum {}", count, threadNum);
handleBizList(list, transTime, mdcId);
return;
}
int startIndex = 0;
int endIndex = 0;
CountDownLatch countDown = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
startIndex = endIndex;
endIndex = startIndex + countPerThread;
// 最后一个分剩余的所有
if (i == threadNum - 1) {
endIndex = list.size();
}
List<String> subList = list.subList(startIndex, endIndex);
executorService.submit(() -> {
handleBizList(subList, transTime, mdcId);
// 计数递减1
countDown.countDown();
});
}
try {
// 等待所有线程结束
countDown.await();
} catch (Exception e) {
log.error("主线程等待异常", e);
}
}
/**
* 处理业务列表
* @param list 列表
* @param mdcId 日志标识
*/
protected void handleBizList(List<String> list,
String transTime,
String mdcId) {
try {
// 异步线程注意设置 mdcId
LogUtil.putMdc(mdcId);
final int size = list.size();
log.info("开始处理子列表 {}", size);
for(String bizId : list) {
log.info("开始处理业务标识:{}", bizId);
//TODO: 业务处理逻辑
doHandleBiz(bizId, transTime);
log.info("完成处理业务标识:{}", bizId);
}
log.info("完成处理子列表 {}", size);
} catch (Exception exception) {
log.error("处理异常", exception);
}
}
}
分页优化
有时候我们不适合直接查出所有的信息。
可以把一次性查出所有,改成分页查询。
注意结合业务。
小结
希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。
我是老马,期待与你的下次相遇。