java 原生方式,创建指定个数定时执行的线程?如何把独立的 kafka 消息,基于内存聚合批量操作?
2023年12月6日大约 3 分钟
需求 1
需要多个线程同时进行,并发执行。同时要求每个线程定时执行,并且线程之间存在一定的时间差。
实现
package org.example;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class FixThreadScheduleService {
public FixThreadScheduleService(int threadNum, Runnable runnable) {
// 初始化
int actualThreadNum = Math.max(1, threadNum);
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(actualThreadNum);
// 初始化固定线程,执行对应的任务。
for(int i = 0; i QUEUE = new ArrayBlockingQueue<>(SIZE+1);
public synchronized boolean add(String message) {
// 如果满了
if(QUEUE.size() >= SIZE) {
batchSave();
}
QUEUE.add(message);
return true;
}
private synchronized void batchSave() {
System.out.println("队列已经满了,开始执行批量入库操作....");
// 构建结果
List textList = new ArrayList<>();
for(String text : QUEUE) {
textList.add(text);
}
System.out.println("------------------ 批量入库开始 size=" + textList.size());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("------------------ 批量入库完成");
// 清空
QUEUE.clear();
System.out.println("队列已经满了,完成执行批量入库操作....");
}
}
调用方
package org.example;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class MessageStoreMain {
public static void main(String[] args) throws InterruptedException {
// 全局唯一
MessageStore messageStore = new MessageStore();
while (true) {
List kafkaList = pullDataFromKafka(500);
for(String kafka : kafkaList) {
messageStore.add(kafka);
}
TimeUnit.SECONDS.sleep(5);
}
}
// 模拟从 kafka 取消息
private static List pullDataFromKafka(int size) {
List list = new ArrayList<>();
for(int i = 0; i < size; i++) {
list.add(i+"");
}
return list;
}
}
测试日志
队列已经满了,开始执行批量入库操作....
------------------ 批量入库开始 size=100
------------------ 批量入库完成
队列已经满了,完成执行批量入库操作....
队列已经满了,开始执行批量入库操作....
------------------ 批量入库开始 size=100
------------------ 批量入库完成
队列已经满了,完成执行批量入库操作....
队列已经满了,开始执行批量入库操作....
------------------ 批量入库开始 size=100
------------------ 批量入库完成
队列已经满了,完成执行批量入库操作....
队列已经满了,开始执行批量入库操作....
------------------ 批量入库开始 size=100
------------------ 批量入库完成
队列已经满了,完成执行批量入库操作....
队列已经满了,开始执行批量入库操作....
------------------ 批量入库开始 size=100
------------------ 批量入库完成
队列已经满了,完成执行批量入库操作....
队列已经满了,开始执行批量入库操作....
------------------ 批量入库开始 size=100
参考资料
chat
贡献者
binbin.hou