需求 1
需要多个线程同时进行,并发执行。同时要求每个线程定时执行,并且线程之间存在一定的时间差。
实现
package org.example;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class FixThreadScheduleService {
public FixThreadScheduleService(int threadNum, Runnable runnable) {
// 初始化
int actualThreadNum = Math.max(1, threadNum);
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(actualThreadNum);
// 初始化固定线程,执行对应的任务。
for(int i = 0; i < threadNum; i++) {
//1+threadNum * 2L 把每一个线程的时间错开
scheduledExecutorService.scheduleAtFixedRate(runnable, 1+threadNum * 2L, 2, TimeUnit.SECONDS);
}
}
public static void main(String[] args) {
FixThreadScheduleService scheduleService = new FixThreadScheduleService(3, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "-hello");
}
});
}
}
测试日志
pool-1-thread-3-hello
pool-1-thread-2-hello
pool-1-thread-1-hello
pool-1-thread-1-hello
pool-1-thread-2-hello
pool-1-thread-3-hello
pool-1-thread-1-hello
pool-1-thread-2-hello
pool-1-thread-3-hello
pool-1-thread-1-hello
pool-1-thread-2-hello
pool-1-thread-3-hello
pool-1-thread-1-hello
pool-1-thread-3-hello
pool-1-thread-2-hello
需求 2
说明
模拟程序每次从 kafka 拉取 2W 的数据。
然后解处理完这些数据,以前执行的时候,直接单个处理落库,但是数据库比较慢,所以需要优化。
将数据合并到内存队列中,然后指定大小后一次入库。入库可以做成同步阻塞,这样方便一些,避免异步压垮数据库。
代码实现
package org.example;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class MessageStore {
// 设置为可以变化的量
private static final int SIZE = 100;
private static final BlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(SIZE+1);
public synchronized boolean add(String message) {
// 如果满了
if(QUEUE.size() >= SIZE) {
batchSave();
}
QUEUE.add(message);
return true;
}
private synchronized void batchSave() {
System.out.println("队列已经满了,开始执行批量入库操作....");
// 构建结果
List<String> textList = new ArrayList<>();
for(String text : QUEUE) {
textList.add(text);
}
System.out.println("------------------ 批量入库开始 size=" + textList.size());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("------------------ 批量入库完成");
// 清空
QUEUE.clear();
System.out.println("队列已经满了,完成执行批量入库操作....");
}
}
调用方
package org.example;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class MessageStoreMain {
public static void main(String[] args) throws InterruptedException {
// 全局唯一
MessageStore messageStore = new MessageStore();
while (true) {
List<String> kafkaList = pullDataFromKafka(500);
for(String kafka : kafkaList) {
messageStore.add(kafka);
}
TimeUnit.SECONDS.sleep(5);
}
}
// 模拟从 kafka 取消息
private static List<String> pullDataFromKafka(int size) {
List<String> list = new ArrayList<>();
for(int i = 0; i < size; i++) {
list.add(i+"");
}
return list;
}
}
测试日志
队列已经满了,开始执行批量入库操作....
------------------ 批量入库开始 size=100
------------------ 批量入库完成
队列已经满了,完成执行批量入库操作....
队列已经满了,开始执行批量入库操作....
------------------ 批量入库开始 size=100
------------------ 批量入库完成
队列已经满了,完成执行批量入库操作....
队列已经满了,开始执行批量入库操作....
------------------ 批量入库开始 size=100
------------------ 批量入库完成
队列已经满了,完成执行批量入库操作....
队列已经满了,开始执行批量入库操作....
------------------ 批量入库开始 size=100
------------------ 批量入库完成
队列已经满了,完成执行批量入库操作....
队列已经满了,开始执行批量入库操作....
------------------ 批量入库开始 size=100
------------------ 批量入库完成
队列已经满了,完成执行批量入库操作....
队列已经满了,开始执行批量入库操作....
------------------ 批量入库开始 size=100
参考资料
chat