场景
在一些流处理中,比如 kafka 消费等,我们需要不停的解析处理消息,然后进行入库。
有时候消息需要进行落库,每次都是单个落库,对数据库的压力比较大。
可不可以把单个操作变化为批量入库,来提升性能呢?
单个落库改为批量
目的
比如 100 条数据要落库,单个调用数据库 100 次,比一次批量入库耗时要多。
所以我们可以想办法把单个调用进行合并,然后调用入库。
单个同步落库
最基础的单个同步落库流程如下:
根据固定数量批量
我们可以固定一个内存大小,比如满足 100 个才进行入库,否则就放在内存中。
流程如下:
定时触发批量入库
固定数量适合触发比较多的情况。
如果命中的数据不多, 比如一些异常匹配处理等,但是对实时性要求又比较高。
可以通过定时任务来触发批量信息的入库,其他没变。
核心实现代码
单个数据的落库
每一次数据处理时,直接放入内存队列。
// 感觉这里可以直接替换为 COW,保持高并发。
// 或者使用 concurrentHashMap,不过需要处理 key
protected synchronized void addToList(final T object) {
this.innerList.add(object);
// 事后处理
this.addToListAfter(object);
}
固定数量的场景
如果是固定数量的批量入库,我们可以在每一次加入内存队列之后,判断是否满足入库条件。
/**
* 触发是否满足 fixed size?
* @param object 对象
*/
protected void addToListAfter(final T object) {
if(this.innerList.size() >= this.batchConfig.getBatchSize()) {
log.debug("[Stream2Batch] addToListAfter fired save start...");
// 真正触发保存
fireBatchSave();
log.debug("[Stream2Batch] addToListAfter fired save end...");
}
}
真正的批量保存逻辑
批量保存时,为了提升性能,也可以转换为异步入库。
protected void fireBatchSave() {
try {
log.info("[Stream2Batch] Fire batch save start...");
// 资源加锁
synchronized (innerList) {
// 拷贝资源
final List<T> copyList = new ArrayList<>(innerList);
// 执行保存
// 同步保存
if(batchConfig.isBatchSaveAsyncFlag()) {
actualSaveThread.submit(new Runnable() {
@Override
public void run() {
actualBatchSave(copyList);
}
});
} else {
this.actualBatchSave(copyList);
}
// 资源清空
innerList.clear();
}
log.info("[Stream2Batch] Fire batch save end...");
} catch (Exception e) {
log.error("[Stream2Batch] FireBatchSave meet ex", e);
batchConfig.getFireBatchSaveErrorHandler().onError(e);
}
}
定时调度触发
定时调度触发批量的方式,可以定时触发是否入库。
/**
* 初始化保存调度线程池
*/
private void initSaveFireThread() {
saveFireThread.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 触发保存
fireBatchSave();
}
}, batchConfig.getBatchFixedTimeInterval(), batchConfig.getBatchFixedTimeInterval(), TimeUnit.SECONDS);
}
其他不变。
已有的实现
说明
当然,我们每次自己定义也比较麻烦。有一些已有的实现可以直接使用。
maven 依赖
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>stream2batch-core</artifactId>
<version>0.1.0</version>
</dependency>
固定时间间隔
适合场景:匹配的数量一般,对实时性有一定的要求。可以通过定时调度的方式驱动。
我们只需要定义好对应的 storeSingle/storeBatch 即可。
其他的很多属性也都可以自定义配置。
IStream2Batch<UserInfo> stream2Batch = Stream2BatchBs.<UserInfo>newInstance()
// 单个保存策略
.storeSingle(new FakeStoreSingle<>())
// 批量保存策略
.storeBatch(new FakeStoreBatch<>())
.fixedTime();
for(int i = 0; i < 100; i++) {
UserInfo userInfo = new UserInfo();
userInfo.setUsername("u-"+i);
stream2Batch.execute(userInfo);
}
TimeUnit.SECONDS.sleep(10);
// 资源关闭
stream2Batch.shutdown();
固定大小
使用场景:匹配的数量会比较多,为了避免内存压力过大,采用固定数量的方式驱动。
public static void main(String[] args) throws InterruptedException {
IStream2Batch<UserInfo> stream2Batch = Stream2BatchBs.<UserInfo>newInstance()
// 单个保存策略
.storeSingle(new FakeStoreSingle<>())
// 批量保存策略
.storeBatch(new FakeStoreBatch<>())
.batchSize(10)
.fixedSize();
for(int i = 0; i < 100; i++) {
UserInfo userInfo = new UserInfo();
userInfo.setUsername("u-"+i);
stream2Batch.execute(userInfo);
}
TimeUnit.SECONDS.sleep(10);
// 资源关闭
stream2Batch.shutdown();
}
小结
流式处理转批量处理是一个比较场景的优化方式。
异步入库也可以大幅度提升吞吐量,在一个实时链路场景可以考虑使用。
拓展阅读
JVM FULL GC 生产问题 II-如何定位内存泄露? 线程通用实现
JVM FULL GC 生产问题 III-多线程执行队列的封装实现,进一步抽象
java 多线程实现通用方法 threadpool implement in java