多线程高并发编程 - 线程池框架(三)
线程池:由系统维护的容纳线程的容器,由CLR控制的所有AppDomain共享。线程池可用于执行任务、发送工作项、处理异步 I/O、代表其他线程等待以及处理计时器。
Executor & ExecutorService
Executor
首先我们来认识一下它Excecutor[ɪɡˈzekjətə(r)]
,该接口是Java线程实现中的最顶层接口,通过传入一个 Runnable
接口,由Runnable
接口来定义一项执行任务,再交由 Executor.execute() 方法定义如何执行任务。
package c_026;
import java.util.concurrent.Executor;
public class T01_MyExecutor implements Executor {
@Override
public void execute(Runnable command) {
command.run();
// new Thread(command).star();
}
}
当然,此接口只需要了解并知道如何使用即可,毕竟在日常开发中Java给我们封装了极其多的线程池来供我们使用(^_^)。
ExecutorService
该接口继承了Executor
接口,service
一般都是后台线程,跑在后台提供服务,在这里呢它则代表执行器服务,ExecutorService
就是这种线程,在启动后一直在后台等待任务并将其扔到容器中执行;
既然该接口是继承于Executor
接口,那么就代表该接口也具有execute 方法,并且还在其基础上添加了自已一些实现的封装方法,具体如下:
Future submit(Runnable): 提交Runnable任务以执行并返回表示该任务的Future。
Future submit(Runnable, result): 提交可运行的任务以供执行并返回成功完成后返回给定结果。
Future submit(Callable): 提交一个有返回值的callable任务,并将返回值作为future对象包装返回
execute 和 submit 的区别: 没有本质区别,只不过execute只能执行Runnable接口;
Runnable & CallAble
共同点: 接口都是设计出来给一个线程来调用的
不同点:
Runnable的run()没有返回值,并且不能抛出异常!
Callable的call()方法具有返回值,并且可以抛出异常!
Executors
用来操作Executor的工具类
Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。
1、public static ExecutorService newFiexedThreadPool(int Threads) 创建固定数目线程的线程池。
2、public static ExecutorService newCachedThreadPool():创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
3、public static ExecutorService newSingleThreadExecutor():创建一个单线程化的Executor。
4、public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
ThreadPool
ThreadPool
线程池,其内部维护了一堆线程执行任务,一般用于执行任务、发送工作项、处理异步 I/O、代表其他线程等待以及处理计时器。
用一个并行计算来演示线程池的作用,贴代码:
package c_026;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* 计算 1-200000 之间的质数
*/
public class T07_ParallelComputing {
public static void main(String[] args) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
List<Integer> result = getPrime(1, 20_0000);
long end = System.currentTimeMillis();
System.out.println(end - start); // 使用单线程计算的时间
ExecutorService service = Executors.newFixedThreadPool(4);
ComputeTask t1 = new ComputeTask(1, 8_0000);
ComputeTask t2 = new ComputeTask(8_0001, 13_0000);
ComputeTask t3 = new ComputeTask(13_0001, 17_0000);
ComputeTask t4 = new ComputeTask(17_0001, 20_0000); // 这里为什么不均分? 因为数字越大, 质数的数量就越多
// 提交任务给ExecutorService执行
Future<List<Integer>> f1 = service.submit(t1);
Future<List<Integer>> f2 = service.submit(t2);
Future<List<Integer>> f3 = service.submit(t3);
Future<List<Integer>> f4 = service.submit(t4);
// 执行开始
start = System.currentTimeMillis();
f1.get();
f2.get();
f3.get();
f4.get();
end = System.currentTimeMillis();
System.out.println(end - start);
service.shutdown();
}
static class ComputeTask implements Callable<List<Integer>> {
private int start, end;
ComputeTask (int start, int end) {
this.start = start;
this.end = end;
}
@Override
public List<Integer> call() throws Exception {
return getPrime(start, end);
}
}
static boolean isPrime(int num) {
for (int i = 2; i < num / 2; i++) {
if (num % i == 0) return false;
}
return true;
}
/**
* 返回指定范围的质数列表
*/
static List<Integer> getPrime(int start, int end) {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < end; i++) {
if (isPrime(i)) list.add(i);
}
return list;
}
}
运行结果(ps: 运行时长差距可能会因cpu单线程运算能力而有所误差):
5300
2829
FixedThreadPool
固定线程的线程池,在执行任务过程中,无须启动新的线程,如遇到所有线程都在执行状态下,空闲任务会在其内部维护的一个任务队列等待(BlockingQueue);这样就让我们在处理任务时无须再多起线程来消耗系统多余的资源
线程池中维护了两个任务队列: 1. 未执行的任务队列 2. 已执行的任务队列
示例代码:
package c_026;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* ThreadPool
* 线程池
*/
public class T05_ThreadPool {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(5); // 固定长度的线程池
for (int i = 0; i < 6; i++) { // 执行六个任务, 在只有五个固定容量的线程池中
service.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println(service); // [Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
// 内部一般是BlockingQueue
// pool size = 5 线程池的容量
// active thread = 5 激活的线程队列长度
// queued tasks = 1 等待处理任务长度
// completed task = 0 完成执行的任务数量
// 关闭线程池
service.shutdown(); // 未执行完毕,不会停止,只会进入停止中状态
System.out.println(service.isTerminated()); // false 判断任务是否结束
System.out.println(service.isShutdown()); // true 判断线程池状态是否关闭
System.out.println(service); //java.util.concurrent.ThreadPoolExecutor@3b9a45b3[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
TimeUnit.SECONDS.sleep(5); // 5s 后肯定执行完成了
System.out.println(service.isTerminated()); // true
System.out.println(service.isShutdown()); // true
System.out.println(service); // java.util.concurrent.ThreadPoolExecutor@3b9a45b3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
}
}
使用linkedBlockingQueue当容器
CatchedThreadPool
可缓存的线程,弹性的
当有个请求进入线程池内, 线程池将会启用一个线程。
当再次有个请求进入线程池内, 并且上个线程未结束, 仍然会启用一个线程。
当有线程执行完毕后,这个线程不会被清除, 而是被缓存,当有请求进入时, 直接使用缓存线程调用。
跟 fixedThreadPool 类似, 只不过没有上限(最多Integer最大值), 是一种弹性操作。
当线程一直不被使用, 缓存最多持续1分钟(AliveTime默认值),就会被线程池销毁。
示例代码:
package c_026;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class T08_CachedThreadPool {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
// pool size 为0
System.out.println(service); // java.util.concurrent.ThreadPoolExecutor@7f31245a[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]cu'
for (int i = 0; i < 2; i++) {
service.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
// pool size 变为2
System.out.println(service); // java.util.concurrent.ThreadPoolExecutor@7f31245a[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
try {
TimeUnit.SECONDS.sleep(80); // 最多持续1分钟,这里sleep80s
} catch (InterruptedException e) {
e.printStackTrace();
}
// pool size 变为0
System.out.println(service); // java.util.concurrent.ThreadPoolExecutor@7f31245a[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
}
}
SingleThreadPool
线程池中只有一个线程,作用:保证线程执行的时序性。
使用场景:让任务按前后顺序执行;
内部实现:由一个阻塞队列维护;
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
示例代码:
package c_026;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* SingleThreadPool
*/
public class T09_SingleThreadPool {
public static void main(String[] args) {
ExecutorService service = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int j = i;
service.execute(() -> {
System.out.println(j + " " + Thread.currentThread().getName());
});
}
}
}
运行结果:
0 pool-1-thread-1
1 pool-1-thread-1
2 pool-1-thread-1
3 pool-1-thread-1
4 pool-1-thread-1
这个线程没什么好讲的,理解使用就好啦;
ScheduleThreadPool
Scheduled: 计划中的,定时的;
固定时间执行的线程池,一般用来做定时任务
示例代码:
package c_026;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* ScheduledPool
* 执行定时的任务,类似Delay, 可以替代Timer
*/
public class T10_ScheduledPool {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
// 使用固定的频率执行某个任务
// 四个参数
// command: 执行的任务
// initialDelay: 第一次执行延时多久执行
// period: 每隔多久执行一次这个任务
// unit: 时间单位
service.scheduleAtFixedRate(() -> {
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}, 0, 500, TimeUnit.MILLISECONDS); // 每隔500ms打印一下线程名称
// 线程执行1000ms,而每sleep 500 就要新启动一个线程
// 上个线程未执行完毕,会启用新的线程执行
// 如果线程池已满,只有延时
}
}
WorkStealingThreadPool
工作窃取线程池,启动的线程是后台线程(也称精灵线程,守护线程),运行效果与FixedThreadPool差不多,区别就是FixedThreadPool启动的为普通线程,WorkStealingThreadPool启动的为 后台线程
内部实现:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
可以看出他的实现其实只是new了一个ForkJoinPool线程池,所以本质上其实与ForkJoinPool并没有什么区别,只是在其之上的一层封装,方便使用。
示例代码:
package c_026;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* WorkStealingPool
*/
public class T11_WorkStealingPool {
public static void main(String[] args) throws IOException {
// CPU 核数
System.out.println(Runtime.getRuntime().availableProcessors());
// workStealingPool 会自动启动cpu核数个线程去执行任务
ExecutorService service = Executors.newWorkStealingPool();
service.execute(new R(1000)); // 我的cpu核数为12 启动13个线程,其中第一个是1s执行完毕,其余都是2s执行完毕,
// 有一个任务会进行等待,当第一个执行完毕后,会再次偷取第十三个任务执行
for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
service.execute(new R(2000));
}
// 因为work stealing 是deamon线程,即后台线程,精灵线程,守护线程
// 所以当main方法结束时, 此方法虽然还在后台运行,但是无输出
// 可以通过对主线程阻塞解决
System.in.read();
}
static class R implements Runnable {
int time;
R(int time) {
this.time = time;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + time);
}
}
}
ForkJoinPool
将一个任务拆分多个任务执行(理论可以无限切分,递归),然后再将结果合并;
使用场景: 比如大量的并行计算, 如下: 求100_0000个数字之和, 使用多线程
ForkJoinPool执行的是ForkJoinTask,即RecursiveAction,RecursiveTask;
RecursiveAction与RecursiveTask的区别就是RecursiveAction执行后没有返回值,反之RecursiveTask执行后有返回值。
示例代码:
package c_026;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;
/**
* T12_ForkJoinPool
*/
public class T12_ForkJoinPool {
static int[] nums = new int[100_0000];
static final int MAX_NUM = 5_0000; // 每个线程最多可以运行5万个数字相加
static Random random = new Random();
// 初始化这100_000个数字, 每个数字范围在100之内
static {
for (int i = 0; i < nums.length; i++) {
nums[i] = random.nextInt(100);
}
// 所有数字和, 事先计算:
//System.out.println(Arrays.stream(nums).sum()); // 使用单线程stream api 进行求和
}
/*
* RecursiveAction任务
*/
static class AddTask extends RecursiveAction {
int start, end;
AddTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
// 进行计算
// 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork
if (end - start <= MAX_NUM) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += nums[i];
}
System.out.println("sum = " + sum);
} else {
int middle = (end - start) / 2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}
}
}
/*
* RecursiveTask
*/
static class AddTask2 extends RecursiveTask<Long> {
int start, end;
AddTask2(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 进行计算
// 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork
if (end - start <= MAX_NUM) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += nums[i];
}
return sum;
} else {
int middle = start + (end - start) / 2; // 注意这里,如果有问题,会抛出java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node 异常
AddTask2 subTask1 = new AddTask2(start, middle);
AddTask2 subTask2 = new AddTask2(middle, end);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
}
// 运行
public static void main(String[] args) throws IOException {
ForkJoinPool fjp = new ForkJoinPool();
AddTask2 task = new AddTask2(0, nums.length);
fjp.execute(task);
System.out.println(task.join());
//System.in.read();
}
}
ThreadPoolExecutor
自定义线程池,线程池的实现原理,除了ForkJoinPool与WorkStealingPool线程池,其他线程池大部分线程池背后都是ThreadPoolExecutor
构造 ThreadPoolExecutor:
corePoolSize 线程池核心线程数,最小线程数
maximumPoolSize 最大线程数
keepAlive 线程空闲后存活时间, 0代表永远不会消失
timeUnit 单位
BlockingQueue workQueue 任务容器具体查看每个线程池
举个栗子:
SingleThreadPool创建实现,内部是由ThreadPoolExecutor来创建的。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
其他几个就不都一一介绍了,可以自己去查看源码实现。。。
parallelStreamAPI
Java8的新增特性
平行流API,运用多线程的流API.它通过默认的ForkJoinPool,可能提高你的多线程任务的速度.
还是用质数运算举例,来感受下效果,示例代码如下:
package c_026;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class T14_ParallelStreamAPI {
public static void main(String[] args) {
List<Integer> nums = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < 1_0000; i++) {
nums.add(100_0000 + random.nextInt(100_0000));
}
long start, end;
start = System.currentTimeMillis();
nums.stream().forEach(v -> isPrime(v));
end =System.currentTimeMillis();
System.out.println(end - start);
// 使用parallel stream api
start = System.currentTimeMillis();
nums.parallelStream().forEach(v -> isPrime(v));
end =System.currentTimeMillis();
System.out.println(end - start);
}
static boolean isPrime(int num) {
for (int i = 2; i < num / 2; i++) {
if (num % i == 0) return false;
}
return true;
}
}
运行结果:
1511
675
总结
莫道君行早,更有早行人。
此博文是本人对于多线程学习之余的一些总结….如有什么错误麻烦请及时指教,在此提前谢过!另外学习资料来源于马士兵老师的多线程编程系列,有兴趣的可以自行去瞧瞧哦(^_^)