拓展阅读

JVM FULL GC 生产问题 I-多线程通用实现

JVM FULL GC 生产问题 II-如何定位内存泄露? 线程通用实现

JVM FULL GC 生产问题 III-多线程执行队列的封装实现,进一步抽象

java 多线程实现通用方法 threadpool implement in java

情景回顾

我们在上一篇 JVM FULL GC 生产问题笔记 中提出了如何更好的实现一个多线程消费的实现方式。

没有看过的小伙伴建议看一下。

本来以为一切都可以结束的,不过又发生了一点点意外,这里记录一下,避免自己和小伙伴们踩坑。

但是上一篇的文章还是存在一点不足,原来的实现无法直接复用

为什么无法复用呢?因为不够抽象!

java 实现

接口定义

  [java]
1
2
3
4
5
public interface IQueueService<T> { void handle(final Request request); }

Request 就是我们业务中的入参对象。

抽象类实现

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package com.github.houbb.queue.service.queue; import com.github.houbb.queue.service.dto.Request; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; /** * @author binbin.hou * @since 1.0.0 */ public abstract class AbstractQueueService<T> implements IQueueService<T> { private ArrayBlockingQueue<T> queue = null; /** * 计算总数 */ private AtomicLong counter = null; /** * 分页大小 * @return 大小 */ protected int getPageSize() { return 10000; } /** * 线程数量 * @return 数量 */ protected int getThreadNum() { return 10; } /** * 等待的毫秒数 * @return 等待 */ protected long getAwaitMills() { return 3000; } public AbstractQueueService() { final int threadNum = getThreadNum(); //1. 初始化 Executor executor = Executors.newFixedThreadPool(threadNum); this.queue = new ArrayBlockingQueue<T>(2 * getPageSize(), true); this.counter = new AtomicLong(0); //2. 初始化消费者线程 for(int i = 0; i < threadNum; i++) { ConsumerTask task = new ConsumerTask(); executor.execute(task); System.out.println(this.getClass().getSimpleName() + "消费者线程-" + i + " 启动完成"); } } /** * 查询总数 * @param request 条件 * @return 结果 */ protected abstract int queryCount(Request request); /** * 查询列表 * @param pageNum 当前页 * @param pageSize 大小 * @param request 请求 * @return 结果 */ protected abstract List<T> queryList(int pageNum, int pageSize, Request request); /** * 查询列表 * @param request 请求 * @return 结果 */ protected abstract List<T> queryByIds(Request request); /** * 单个处理逻辑 * @param t 单个实体 */ protected abstract void doHandle(T t); /** * 核心处理逻辑 * @param request 请求参数 */ public void handle(Request request) { System.out.println("接收到参数:" + request); counter = new AtomicLong(0); //1. 是否为多线程模式 boolean threadFlag = request.isThread(); if(threadFlag) { // 分页查询 int pageSize = getPageSize(); int total = this.queryCount(request); int totalPage = total / pageSize; for(int i = 1; i <= totalPage; i++) { // 等待消费者处理已有的信息 awaitQueue(); System.out.println("开始查询第"+i+"页"); List<T> list = this.queryList(i, pageSize, request); System.out.println("完成查询第"+i+"页"); // 直接往队列里面扔 queue.addAll(list); } } else { // 根据传入的列表判断 List<String> ids = request.getList(); List<T> queryList = this.queryByIds(request); // 如果列表为空。 for(T t : queryList) { this.doHandle(t); } } } // 消费线程任务 private class ConsumerTask implements Runnable { @Override public void run() { while (true) { try { T t = queue.take(); doHandle(t); long count = counter.incrementAndGet(); System.out.println(this.getClass().getSimpleName()+" 已完成:" + count); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 等待,直到 queue 的小于等于 limit,才进行生产处理 */ private void awaitQueue() { int limit = this.getPageSize(); while (true) { // 获取阻塞队列的大小 int size = queue.size(); if(size >= limit) { try { // 根据实际的情况进行调整 Thread.sleep(getAwaitMills()); } catch (InterruptedException e) { e.printStackTrace(); } } else { break; } } } }

实现类

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.github.houbb.queue.service.queue; import com.github.houbb.queue.service.dal.entity.Menu; import com.github.houbb.queue.service.dto.Request; import java.util.ArrayList; import java.util.List; import java.util.UUID; /** * @author binbin.hou * @since 1.0.0 */ public class MenuQueueService extends AbstractQueueService<Menu> { @Override protected int queryCount(Request request) { return 50000; } @Override protected List<Menu> queryList(int pageNum, int pageSize, Request request) { List<Menu> list = new ArrayList<Menu>(); for(int i = 0; i < pageSize; i++) { Menu menu = new Menu(); menu.setName(UUID.randomUUID().toString()); list.add(menu); } return list; } @Override protected List<Menu> queryByIds(Request request) { return null; } @Override protected void doHandle(Menu menu) { System.out.println("开始处理:" + menu); } }

UserQueueService

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.github.houbb.queue.service.queue; import com.github.houbb.queue.service.dal.entity.User; import com.github.houbb.queue.service.dto.Request; import java.util.ArrayList; import java.util.List; import java.util.UUID; /** * @author binbin.hou * @since 1.0.0 */ public class UserQueueService extends AbstractQueueService<User> { @Override protected int queryCount(Request request) { return 50000; } @Override protected List<User> queryList(int pageNum, int pageSize, Request request) { List<User> list = new ArrayList<User>(); for(int i = 0; i < pageSize; i++) { User user = new User(); user.setName(UUID.randomUUID().toString()); list.add(user); } return list; } @Override protected List<User> queryByIds(Request request) { return null; } @Override protected void doHandle(User user) { System.out.println("开始处理:" + user); } }

测试类

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.github.houbb.queue.service.queue; import com.github.houbb.queue.service.dto.Request; /** * @author binbin.hou * @since 1.0.0 */ public class Main { public static void main(String[] args) { Request request = new Request(); request.setThread(true); UserQueueService userQueueService = new UserQueueService(); userQueueService.handle(request); MenuQueueService menuQueueService = new MenuQueueService(); menuQueueService.handle(request); } }

这样 2 个类实际上是完全独立的实现。

小结

经过这样的抽象之后,省去了我们很多写代码的时间。

也避免了 copy 的时间消耗。

架构,就是抽象。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次相遇。