需求
我们希望指定线程池的数量,比如固定一个线程,然后往里面不断添加任务。
实现 1
最简单的,我们通过下面的方式
1
2
3
4
5
6
7ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(new Runnable() {
@Override
public void run() {
//todo
}
});
坑在哪里
这个在 jdk 中的工具方法,实际上存在一个大坑。
那就是默认的创建是一个无界队列,如果任务执行的很慢,每个任务又比较占用内存,可能把内存打爆。
1
2
3
4
5
6public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
默认的队列,大小就是:
1
2
3
4
5
6
7/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
改进版 V1
实现
吐槽一下,这种写法真的非常不优雅,写了一大堆。
但是这样才能保证队列不会一直扩大。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public static void main(String[] args) {
int corePoolSize = 1; // 核心线程数
int maxPoolSize = 1; // 最大线程数
long keepAliveTime = 0L; // 线程空闲时间
int queueSize = 2; // 队列大小
// 自定义线程池名字
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("prefix-" + t.getId());
return t;
}
};
// 使用有界队列 LinkedBlockingQueue
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(queueSize);
// 创建线程池
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
queue,
threadFactory
);
// 使用 executorService 执行任务
for(int i = 0; i < 10; i++) {
final int val = i;
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "-" + val);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
// 关闭线程池
// executorService.shutdown();
}
测试日志:
1
2
3
4
5
6
7
8
9prefix-20-0
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@677327b6 rejected from java.util.concurrent.ThreadPoolExecutor@14ae5a5[Running, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at org.example.CreateThreadDefineTest.main(CreateThreadDefineTest.java:39)
prefix-20-1
prefix-20-2
这样,当队列满的时候会直接进行拒绝。
定时任务
需求
如果一个定时执行的任务,会怎么样?
实现 v1
1
2
3
4
5
6
7
8
9
10
11
12
13
14public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(System.currentTimeMillis());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, 1, 1, TimeUnit.MILLISECONDS);
}
我们本意事项 1mS 触发一次任务,不过任务执行比较慢。
这个时候,任务是直接 1ms 一次,还是等任务执行完成?
测试日志
测试发现,会等任务执行完成。
1
2
31704208560633
1704208565640
1704208570640
这种还好,不会导致内存爆炸。
拒绝时的内容进一步优化?
场景
上面的拒绝策略,我们可以发现并不知道什么内容被拒绝了。
如何自定义线程池?
在 Java 中,自定义线程池可以通过使用 ThreadPoolExecutor
类来实现。
ThreadPoolExecutor
提供了丰富的配置选项,使得你可以根据具体需求定制线程池的行为。
以下是一些关键步骤和示例代码,帮助你创建自定义的线程池:
1. 定义线程池参数
线程池的关键参数包括核心线程数、最大线程数、线程空闲时间、时间单位、任务队列、线程工厂和拒绝策略。
2. 创建线程池
使用 ThreadPoolExecutor
类来创建线程池。下面是一个示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadPool {
public static void main(String[] args) {
// 核心线程数
int corePoolSize = 5;
// 最大线程数
int maximumPoolSize = 10;
// 线程空闲保持时间
long keepAliveTime = 60;
// 时间单位
TimeUnit unit = TimeUnit.SECONDS;
// 任务队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100);
// 线程工厂
ThreadFactory threadFactory = new CustomThreadFactory();
// 拒绝策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
// 创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler
);
// 提交任务
for (int i = 0; i < 20; i++) {
threadPoolExecutor.execute(new Task());
}
// 关闭线程池
threadPoolExecutor.shutdown();
}
// 自定义线程工厂
static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = "custom-pool-thread-";
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
// 任务
static class Task implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is executing task.");
try {
// 模拟任务执行时间
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
3. 参数解释
- 核心线程数(corePoolSize): 线程池中始终保持的线程数,即使线程处于空闲状态。
- 最大线程数(maximumPoolSize): 线程池中允许的最大线程数。
- 线程空闲保持时间(keepAliveTime): 超过核心线程数的空闲线程在终止前等待新任务的最长时间。
- 时间单位(unit):
keepAliveTime
参数的时间单位。 - 任务队列(workQueue): 用于保存等待执行的任务的队列。
- 线程工厂(threadFactory): 用于创建新线程的工厂。
- 拒绝策略(handler): 当任务无法提交时的处理策略。常用策略包括:
AbortPolicy
: 抛出RejectedExecutionException
。CallerRunsPolicy
: 由提交任务的线程处理该任务。DiscardPolicy
: 丢弃任务,不予处理。DiscardOldestPolicy
: 丢弃队列中最老的任务,然后重新提交被拒绝的任务。
4. 关闭线程池
调用 shutdown()
方法来关闭线程池,以确保所有已提交的任务执行完毕后线程池才会终止。
如果需要立即终止,可以调用 shutdownNow()
。
通过以上步骤,你可以创建一个功能强大的自定义线程池,并根据需要调整其行为和参数配置。
丢弃时,如何看到丢弃的内容呢?
要具体识别被拒绝的任务,你需要在任务中重写 toString
方法,以便在拒绝策略中可以输出有意义的信息。
以下是一个具体实现的示例,展示如何在任务被拒绝时输出详细信息:
1. 自定义任务类
首先,定义一个自定义的任务类,并重写 toString
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 // 自定义任务
static class Task implements Runnable {
private final int taskId;
public Task(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is executing task " + taskId);
try {
// 模拟任务执行时间
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public String toString() {
return "Task{" + "taskId=" + taskId + '}';
}
}
// 自定义拒绝策略
static class CustomDiscardPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("Task " + r.toString() + " is rejected.");
}
}
}
2. 解释
- 自定义任务类:
Task
类实现了Runnable
接口,并添加了一个taskId
字段来标识每个任务。- 重写了
toString
方法,返回任务的详细信息,包括taskId
。
- 自定义拒绝策略:
CustomDiscardPolicy
类实现了RejectedExecutionHandler
接口。- 在
rejectedExecution
方法中,使用r.toString()
打印被拒绝任务的信息。
- 示例代码说明:
- 创建一个线程池,核心线程数为 2,最大线程数为 4,任务队列容量为 2。
- 提交 10 个任务到线程池,每个任务都有一个唯一的
taskId
。 - 由于任务数量超过了线程池和任务队列的容量,多余的任务将会被拒绝,并触发自定义的拒绝策略,输出被拒绝任务的详细信息。
这样,当任务被拒绝时,你可以在控制台看到具体是哪个任务被拒绝了,通过输出的信息可以识别每个被拒绝的任务。
参考资料
chat