需求 1

需要多个线程同时进行,并发执行。同时要求每个线程定时执行,并且线程之间存在一定的时间差。

实现

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package org.example; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class FixThreadScheduleService { public FixThreadScheduleService(int threadNum, Runnable runnable) { // 初始化 int actualThreadNum = Math.max(1, threadNum); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(actualThreadNum); // 初始化固定线程,执行对应的任务。 for(int i = 0; i < threadNum; i++) { //1+threadNum * 2L 把每一个线程的时间错开 scheduledExecutorService.scheduleAtFixedRate(runnable, 1+threadNum * 2L, 2, TimeUnit.SECONDS); } } public static void main(String[] args) { FixThreadScheduleService scheduleService = new FixThreadScheduleService(3, new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "-hello"); } }); } }

测试日志

  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pool-1-thread-3-hello pool-1-thread-2-hello pool-1-thread-1-hello pool-1-thread-1-hello pool-1-thread-2-hello pool-1-thread-3-hello pool-1-thread-1-hello pool-1-thread-2-hello pool-1-thread-3-hello pool-1-thread-1-hello pool-1-thread-2-hello pool-1-thread-3-hello pool-1-thread-1-hello pool-1-thread-3-hello pool-1-thread-2-hello

需求 2

说明

模拟程序每次从 kafka 拉取 2W 的数据。

然后解处理完这些数据,以前执行的时候,直接单个处理落库,但是数据库比较慢,所以需要优化。

将数据合并到内存队列中,然后指定大小后一次入库。入库可以做成同步阻塞,这样方便一些,避免异步压垮数据库。

代码实现

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package org.example; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class MessageStore { // 设置为可以变化的量 private static final int SIZE = 100; private static final BlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(SIZE+1); public synchronized boolean add(String message) { // 如果满了 if(QUEUE.size() >= SIZE) { batchSave(); } QUEUE.add(message); return true; } private synchronized void batchSave() { System.out.println("队列已经满了,开始执行批量入库操作...."); // 构建结果 List<String> textList = new ArrayList<>(); for(String text : QUEUE) { textList.add(text); } System.out.println("------------------ 批量入库开始 size=" + textList.size()); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("------------------ 批量入库完成"); // 清空 QUEUE.clear(); System.out.println("队列已经满了,完成执行批量入库操作...."); } }

调用方

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package org.example; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class MessageStoreMain { public static void main(String[] args) throws InterruptedException { // 全局唯一 MessageStore messageStore = new MessageStore(); while (true) { List<String> kafkaList = pullDataFromKafka(500); for(String kafka : kafkaList) { messageStore.add(kafka); } TimeUnit.SECONDS.sleep(5); } } // 模拟从 kafka 取消息 private static List<String> pullDataFromKafka(int size) { List<String> list = new ArrayList<>(); for(int i = 0; i < size; i++) { list.add(i+""); } return list; } }

测试日志

  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
队列已经满了,开始执行批量入库操作.... ------------------ 批量入库开始 size=100 ------------------ 批量入库完成 队列已经满了,完成执行批量入库操作.... 队列已经满了,开始执行批量入库操作.... ------------------ 批量入库开始 size=100 ------------------ 批量入库完成 队列已经满了,完成执行批量入库操作.... 队列已经满了,开始执行批量入库操作.... ------------------ 批量入库开始 size=100 ------------------ 批量入库完成 队列已经满了,完成执行批量入库操作.... 队列已经满了,开始执行批量入库操作.... ------------------ 批量入库开始 size=100 ------------------ 批量入库完成 队列已经满了,完成执行批量入库操作.... 队列已经满了,开始执行批量入库操作.... ------------------ 批量入库开始 size=100 ------------------ 批量入库完成 队列已经满了,完成执行批量入库操作.... 队列已经满了,开始执行批量入库操作.... ------------------ 批量入库开始 size=100

参考资料

chat