Shawn's Blog

蒸汽兔

多线程高并发编程 - 线程池框架(三)

字数:3352 字 阅读时长:约 18 分钟 阅读

线程池:由系统维护的容纳线程的容器,由CLR控制的所有AppDomain共享。线程池可用于执行任务、发送工作项、处理异步 I/O、代表其他线程等待以及处理计时器。

Executor & ExecutorService

Executor

首先我们来认识一下它Excecutor[ɪɡˈzekjətə(r)] ,该接口是Java线程实现中的最顶层接口,通过传入一个 Runnable接口,由Runnable接口来定义一项执行任务,再交由 Executor.execute() 方法定义如何执行任务。

package c_026;

import java.util.concurrent.Executor;

public class T01_MyExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        command.run();
        // new Thread(command).star(); 
    }
}

当然,此接口只需要了解并知道如何使用即可,毕竟在日常开发中Java给我们封装了极其多的线程池来供我们使用(^_^)。

ExecutorService

该接口继承了Executor接口,service 一般都是后台线程,跑在后台提供服务,在这里呢它则代表执行器服务,ExecutorService就是这种线程,在启动后一直在后台等待任务并将其扔到容器中执行;

既然该接口是继承于Executor接口,那么就代表该接口也具有execute 方法,并且还在其基础上添加了自已一些实现的封装方法,具体如下:

Future submit(Runnable): 提交Runnable任务以执行并返回表示该任务的Future。

Future submit(Runnable, result): 提交可运行的任务以供执行并返回成功完成后返回给定结果。

Future submit(Callable): 提交一个有返回值的callable任务,并将返回值作为future对象包装返回

execute 和 submit 的区别: 没有本质区别,只不过execute只能执行Runnable接口;

Runnable & CallAble

共同点: 接口都是设计出来给一个线程来调用的

不同点:

​ Runnable的run()没有返回值,并且不能抛出异常!

​ Callable的call()方法具有返回值,并且可以抛出异常!

Executors

用来操作Executor的工具类

Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

1、public static ExecutorService newFiexedThreadPool(int Threads) 创建固定数目线程的线程池。

2、public static ExecutorService newCachedThreadPool():创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

3、public static ExecutorService newSingleThreadExecutor():创建一个单线程化的Executor。

4、public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

ThreadPool

ThreadPool线程池,其内部维护了一堆线程执行任务,一般用于执行任务、发送工作项、处理异步 I/O、代表其他线程等待以及处理计时器。

用一个并行计算来演示线程池的作用,贴代码:

package c_026;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * 计算 1-200000 之间的质数
 */
public class T07_ParallelComputing {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        List<Integer> result = getPrime(1, 20_0000);
        long end = System.currentTimeMillis();
        System.out.println(end - start); // 使用单线程计算的时间

        ExecutorService service = Executors.newFixedThreadPool(4);
        ComputeTask t1 = new ComputeTask(1, 8_0000);
        ComputeTask t2 = new ComputeTask(8_0001, 13_0000);
        ComputeTask t3 = new ComputeTask(13_0001, 17_0000);
        ComputeTask t4 = new ComputeTask(17_0001, 20_0000);  // 这里为什么不均分? 因为数字越大, 质数的数量就越多
        // 提交任务给ExecutorService执行
        Future<List<Integer>> f1 = service.submit(t1);
        Future<List<Integer>> f2 = service.submit(t2);
        Future<List<Integer>> f3 = service.submit(t3);
        Future<List<Integer>> f4 = service.submit(t4);
        // 执行开始
        start = System.currentTimeMillis();
        f1.get();
        f2.get();
        f3.get();
        f4.get();
        end = System.currentTimeMillis();
        System.out.println(end - start);
        service.shutdown();
    }
    
    static class ComputeTask implements Callable<List<Integer>> {

        private int start, end;
        
        ComputeTask (int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        public List<Integer> call() throws Exception {
            return getPrime(start, end);
        }
    }
    
    static boolean isPrime(int num) {
        for (int i = 2; i < num / 2; i++) {
            if (num % i == 0) return false;
        }
        return true;
    }

    /**
     * 返回指定范围的质数列表
     */
    static List<Integer> getPrime(int start, int end) {
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < end; i++) {
            if (isPrime(i)) list.add(i);
        }
        return list;
    }
}

运行结果(ps: 运行时长差距可能会因cpu单线程运算能力而有所误差):

5300
2829

FixedThreadPool

固定线程的线程池,在执行任务过程中,无须启动新的线程,如遇到所有线程都在执行状态下,空闲任务会在其内部维护的一个任务队列等待(BlockingQueue);这样就让我们在处理任务时无须再多起线程来消耗系统多余的资源

线程池中维护了两个任务队列: 1. 未执行的任务队列 2. 已执行的任务队列

示例代码:

package c_026;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * ThreadPool 
 * 线程池
 */
public class T05_ThreadPool {

    public static void main(String[] args) throws InterruptedException {

        ExecutorService service = Executors.newFixedThreadPool(5); // 固定长度的线程池
        for (int i = 0; i < 6; i++) { // 执行六个任务,  在只有五个固定容量的线程池中
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        System.out.println(service); // [Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
        // 内部一般是BlockingQueue
        // pool size =  5  线程池的容量
        // active thread = 5 激活的线程队列长度
        // queued tasks = 1 等待处理任务长度
        // completed task = 0 完成执行的任务数量
        
        // 关闭线程池
        service.shutdown(); // 未执行完毕,不会停止,只会进入停止中状态
        System.out.println(service.isTerminated()); // false 判断任务是否结束
        System.out.println(service.isShutdown()); // true 判断线程池状态是否关闭
        System.out.println(service);  //java.util.concurrent.ThreadPoolExecutor@3b9a45b3[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
        

        TimeUnit.SECONDS.sleep(5); // 5s 后肯定执行完成了

        System.out.println(service.isTerminated()); // true 
        System.out.println(service.isShutdown()); // true
        System.out.println(service); // java.util.concurrent.ThreadPoolExecutor@3b9a45b3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
   
    }
}

使用linkedBlockingQueue当容器

CatchedThreadPool

可缓存的线程,弹性的

当有个请求进入线程池内, 线程池将会启用一个线程。

当再次有个请求进入线程池内, 并且上个线程未结束, 仍然会启用一个线程。

当有线程执行完毕后,这个线程不会被清除, 而是被缓存,当有请求进入时, 直接使用缓存线程调用。

跟 fixedThreadPool 类似, 只不过没有上限(最多Integer最大值), 是一种弹性操作。

当线程一直不被使用, 缓存最多持续1分钟(AliveTime默认值),就会被线程池销毁。

示例代码:

package c_026;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class T08_CachedThreadPool {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        // pool size 为0
        System.out.println(service); // java.util.concurrent.ThreadPoolExecutor@7f31245a[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]cu'

        for (int i = 0; i < 2; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        // pool size 变为2 
        System.out.println(service); // java.util.concurrent.ThreadPoolExecutor@7f31245a[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]


        try {
            TimeUnit.SECONDS.sleep(80); // 最多持续1分钟,这里sleep80s
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // pool size 变为0
        System.out.println(service); // java.util.concurrent.ThreadPoolExecutor@7f31245a[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]

    } 
}

SingleThreadPool

线程池中只有一个线程,作用:保证线程执行的时序性。

使用场景:让任务按前后顺序执行;

内部实现:由一个阻塞队列维护;

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

示例代码:

package c_026;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * SingleThreadPool
 */
public class T09_SingleThreadPool {

    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int j = i;
            service.execute(() -> {
                System.out.println(j + " " + Thread.currentThread().getName());
            });
        }
    } 
}

运行结果:

0 pool-1-thread-1
1 pool-1-thread-1
2 pool-1-thread-1
3 pool-1-thread-1
4 pool-1-thread-1

这个线程没什么好讲的,理解使用就好啦;

ScheduleThreadPool

Scheduled: 计划中的,定时的;

固定时间执行的线程池,一般用来做定时任务

示例代码:

package c_026;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * ScheduledPool
 * 执行定时的任务,类似Delay, 可以替代Timer
 */
public class T10_ScheduledPool {

    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        // 使用固定的频率执行某个任务
        // 四个参数
        // command: 执行的任务
        // initialDelay: 第一次执行延时多久执行
        // period: 每隔多久执行一次这个任务
        // unit: 时间单位
        service.scheduleAtFixedRate(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }, 0, 500, TimeUnit.MILLISECONDS);  // 每隔500ms打印一下线程名称
        // 线程执行1000ms,而每sleep 500 就要新启动一个线程
        // 上个线程未执行完毕,会启用新的线程执行
        // 如果线程池已满,只有延时
    } 
}

WorkStealingThreadPool

工作窃取线程池,启动的线程是后台线程(也称精灵线程,守护线程),运行效果与FixedThreadPool差不多,区别就是FixedThreadPool启动的为普通线程,WorkStealingThreadPool启动的为 后台线程

内部实现:

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

可以看出他的实现其实只是new了一个ForkJoinPool线程池,所以本质上其实与ForkJoinPool并没有什么区别,只是在其之上的一层封装,方便使用。

示例代码:

package c_026;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * WorkStealingPool
 */
public class T11_WorkStealingPool {

    public static void main(String[] args) throws IOException {
        // CPU 核数
        System.out.println(Runtime.getRuntime().availableProcessors());
        
        // workStealingPool 会自动启动cpu核数个线程去执行任务
        ExecutorService service = Executors.newWorkStealingPool();
        service.execute(new R(1000));  // 我的cpu核数为12 启动13个线程,其中第一个是1s执行完毕,其余都是2s执行完毕,
                                                // 有一个任务会进行等待,当第一个执行完毕后,会再次偷取第十三个任务执行
        for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
            service.execute(new R(2000));
        }
        
        // 因为work stealing 是deamon线程,即后台线程,精灵线程,守护线程
        // 所以当main方法结束时, 此方法虽然还在后台运行,但是无输出
        // 可以通过对主线程阻塞解决
        System.in.read();
    }
    
    static class R implements Runnable {

        int time;

        R(int time) {
            this.time = time;
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "  " + time);
        }
    }
}

ForkJoinPool

将一个任务拆分多个任务执行(理论可以无限切分,递归),然后再将结果合并;

使用场景: 比如大量的并行计算, 如下: 求100_0000个数字之和, 使用多线程

ForkJoinPool执行的是ForkJoinTask,即RecursiveAction,RecursiveTask;

RecursiveAction与RecursiveTask的区别就是RecursiveAction执行后没有返回值,反之RecursiveTask执行后有返回值。

示例代码:

package c_026;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;

/**
 * T12_ForkJoinPool
 */
public class T12_ForkJoinPool {

    static int[] nums = new int[100_0000];
    static final int MAX_NUM = 5_0000; // 每个线程最多可以运行5万个数字相加
    static Random random = new Random();
    
    // 初始化这100_000个数字, 每个数字范围在100之内
    static {
        
        for (int i = 0; i < nums.length; i++) {
            nums[i] = random.nextInt(100);
        }
        // 所有数字和, 事先计算:
        //System.out.println(Arrays.stream(nums).sum()); // 使用单线程stream api 进行求和
    }
    
	/*
	* RecursiveAction任务
	*/
    static class AddTask extends RecursiveAction {
        
        int start, end;
        
        AddTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            
            // 进行计算
            // 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork 
            if (end - start <= MAX_NUM) {
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                System.out.println("sum = " + sum);
            } else {
                int middle = (end - start) / 2;
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
            }
        }
    }
    
	/*
	* RecursiveTask
	*/
    static class AddTask2 extends RecursiveTask<Long> {

        int start, end;
        
        AddTask2(int start, int end) {
            this.start = start;
            this.end = end;
        }
        
        @Override
        protected Long compute() {
            // 进行计算
            // 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork 
            if (end - start <= MAX_NUM) {
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                return sum;
            } else {
                int middle = start + (end - start) / 2; // 注意这里,如果有问题,会抛出java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node 异常
                AddTask2 subTask1 = new AddTask2(start, middle);
                AddTask2 subTask2 = new AddTask2(middle, end);
                subTask1.fork();
                subTask2.fork();
                return subTask1.join() + subTask2.join();
            }
        }
    }

    // 运行
    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask2 task = new AddTask2(0, nums.length);
        fjp.execute(task);
        System.out.println(task.join());        
        //System.in.read();
    }

}

ThreadPoolExecutor

自定义线程池,线程池的实现原理,除了ForkJoinPool与WorkStealingPool线程池,其他线程池大部分线程池背后都是ThreadPoolExecutor

构造 ThreadPoolExecutor:

​ corePoolSize 线程池核心线程数,最小线程数

​ maximumPoolSize 最大线程数

​ keepAlive 线程空闲后存活时间, 0代表永远不会消失

​ timeUnit 单位

​ BlockingQueue workQueue 任务容器具体查看每个线程池

举个栗子:

SingleThreadPool创建实现,内部是由ThreadPoolExecutor来创建的。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

其他几个就不都一一介绍了,可以自己去查看源码实现。。。

parallelStreamAPI

Java8的新增特性

平行流API,运用多线程的流API.它通过默认的ForkJoinPool,可能提高你的多线程任务的速度.

还是用质数运算举例,来感受下效果,示例代码如下:

package c_026;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class T14_ParallelStreamAPI {

    public static void main(String[] args) {

        List<Integer> nums = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < 1_0000; i++) {
            nums.add(100_0000 + random.nextInt(100_0000));
        }

        long start, end;

        start = System.currentTimeMillis();
        nums.stream().forEach(v -> isPrime(v));
        end =System.currentTimeMillis();
        
        System.out.println(end - start);
        
        // 使用parallel stream api
        start = System.currentTimeMillis();
        nums.parallelStream().forEach(v -> isPrime(v));
        end =System.currentTimeMillis();

        System.out.println(end - start);
    }
    static boolean isPrime(int num) {
        for (int i = 2; i < num / 2; i++) {
            if (num % i == 0) return false;
        }
        return true;
    }
}

运行结果:

1511
675

总结

莫道君行早,更有早行人。

此博文是本人对于多线程学习之余的一些总结….如有什么错误麻烦请及时指教,在此提前谢过!另外学习资料来源于马士兵老师的多线程编程系列,有兴趣的可以自行去瞧瞧哦(^_^)

本文项目地址:https://github.com/xf616510229/java-concurrent


© Shawn Jim. All rights reserved. 本站总访问量 次, 访客数 人次.