30.1 线程池——治理线程的法宝



一、多线程与分布式两周介绍

两三年前,不了解分布式没关系;但今日,分布式已是基础内容。

1.1 本周内容

常用的工具

线程:

线程就是相互独立的任务。

比如:
如果把迅雷当作一个进程,当同时下载多个文件的时候,每个任务就被视为一个线程。

二、初始线程池

2.1 本章内容

体系课的部分内容,会跟实战课部分内容相同。因为重要啊,所以很有必要在体系课再梳理下。

2.2 “池”

就是提前准备好,先放在这个池子里。后续使用时,直接从这里拿,而不用重新收集。

比如:人才池,比如应急物资仓库,比如线程池。

“池”机制的核心:

  1. 重(chong)用
  2. 缓冲
  3. 限制最大数

2.3 线程池的必要性:如果不用线程池

通过如果不用线程池所带来的问题,来凸显使用线程池的必要性。

建立工程concurrency_tools_practice

concurrency:

the fact of two or more events happening or existing at the same time.

并发

新建类EveryTaskOneThread

public class EveryTaskOneThread {

    public static void main(String[] args) {
        Thread thread = new Thread(new Task());
        thread.start();
    }


    // 任务
    public static class Task implements Runnable{             // 实现Runnable接口
        @Override
        public void run() {
            System.out.println("执行了任务");
        }
    }
    
}

结果为:

执行了任务

新建类ForLoop

唯一不同,是线程的生产方式不同

package threadpool;


public class ForLoop {

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new Task());
            thread.start();
        }

    }


    // 任务
    public static class Task implements Runnable{             // 实现Runnable接口

        @Override
        public void run() {
            System.out.println("执行了任务");
        }
    }


}

结果为:

执行了任务
执行了任务
执行了任务
执行了任务
执行了任务
执行了任务
执行了任务
执行了任务
执行了任务
执行了任务

表面上看,在初级阶段,一个任务一个线程,也能满足业务需求。

但是,当任务数量非常多,比如10000个呢?

上述方式也能得出结果,但有个明显的弊端:
因为Java编程语言中的每一个线程,都会对应到操作系统的一个线程。Java程序10000个线程,就会在操作系统创建10000个线程。

线程的创建和销毁,是消耗资源的,比如内存;且回收线程时,也会给GC垃圾回收器带来压力。(成本高)

就跟资本家招聘计件的员工一样:

10000个任务,如果1个任务就要招1个人,那么就要招聘10000个员工。用工成本太高,怎么办?

减少员工数为100人;

让这100人不停的工作,做完第一个任务,还要继续做第二个,反复做、直到每人做完100个。

2.4 线程池的好处:3个

2.5 适合的应用场景

三、创建线程池

线程池有很多的构造方法,因为它的参数有很多。

3.1 线程池构造方法的参数:6个

最主要的是前两个

3.1.1 参数:corePoolSize、maxPoolSize

设计巧妙。

由上可知:

线程池在初始化之后,里面是没有线程池的。

当在线程池放置任务时,就会创建新的线程来执行这个任务。

任务并非均匀固定,比如,今天忙、后天很闲。

所以,任务多时,就是用maxPoolSize;任务很少时,就使用corePoolSize。

核心线程数corePoolSize:

就是一直存活的线程数量

思想:

不写死,而是通过两个参数,营造一种区间范围。

2.添加线程的规则

图示:

举例:

3.增减线程的特点

3.1.2 参数:keep Alive Time

机制:当线程数过多、冗余空闲时,就没有存在的必要了

目的:减少资源消耗。

3.1.3 参数:ThreadFactory

看下源码:

3.1.4 参数:workQueue

3.2 线程池应该手动创建,还是自动创建

以下,介绍几种JDK内置的、提前设计好的的线程池。以及它们的问题。

3.2.1 newFixedThreadPool

特点:线程数是固定的,即core=max

代码示例1:

// 类1:演示newFixedThreadPool
public class FixedThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());               // 传入下面的任务对象
        }
    }
}

// 类2:
class Task implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(500);                              // 休眠500ms
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());    // 打印出当前线程的名字
    }
}

结果:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-2
pool-1-thread-4
...

看源码:

代码示例2:

因为模拟任务处理速度很慢,所以假设就1个核心线程数,

因为模拟队列中东西越来越多,内存占用过多,把内存设置小一些:

因为内存占用是太多,不能塞了

所以,报出异常OOM

OOM

Out Of Memory

内存不足、内存溢出

3.2.2 SingleThreadExecutor

就是上节newFixedThreadPool的低配版,即core=max=1

代码示例:

源码:

3.2.3 newCachedThreadPool

可缓存线程池:具有自动回收多余线程的功能

代码示例:

源码:

core=0;

max没有上限

3.2.4 newScheduledThreadPool

特点:支持定时、周期性的执行任务

1.定时的执行任务:

周期性的执行任务:

每间隔3秒来执行任务

3.2.5 应该根据具体业务场景,手动创建

以上的四种JDK内置的、提前设计好的的线程池,各有它们的问题。

所以,一般与具体业务不是能完全契合。

3.3 线程池里的线程数量,应该设定多少合适

3.4 对比常见线程池的特点

1.

这一种用的不多:

2.构造方法的参数对比

3.存储队列分析

四、关闭线程池

4.1 5个方法

其中:

  • 5是对于正在执行的任务:发起中断信号;对于在队列中等待执行的任务,会撤回。

4.2 代码示例

1.正常运行的状态:

2.shutdown方法

通过方式1:通过提交增量任务是否报出异常,来判断

按照预期的报出异常,这说明已成功校验:线程池已shutdown

通过方式2:通过isShutDown方法返回布尔值,来判断

按照预期的报出 true,这说明已成功校验:线程池已shutdown

3.isTerminated()

是否完全终止?

未完全终止,因为shutDown之后,一些存量任务还是在继续执行,把活儿干完

是否完全终止?

已完全终止,因为10s的时间内,存量任务已执行完成

4.awaitTermination方法

3秒之内能否执行完毕?不能

即使加了shutdown方法,结果也是false:

如果是7秒之内:

就能 完全执行完毕

5.shutdownNow方法:

一是会立马发起中断信号,立马中断正在执行的任务

二是撤回在队列中等待执行的任务

List<Runnable> runnables = executorService.shutdownNow();           // 返回的runnables,是队列中待执行的任务

五、暂停、恢复线程池

5.1 四种拒绝策略

1.不要给我加新任务了

2.我现在已经超负荷工作了

根据业务需求,进行选择:

discard

get rid of (someone or something) as no longer useful or desirable.

5.2 钩子方法,给线程池加点料

线程池本身功能强大,还可以通过钩子方法,进行功能扩展。

钩子:

实际上是一个处理消息的程序段,通过系统调用,把它挂入系统。每当特定的消息发出,在没有到达目的窗口前,钩子程序就先捕获该消息。

这时钩子函数即可以加工处理(改变)该消息,也可以不作处理而继续传递该消息,还可以强制结束消息的传递。

对每种类型的钩子由系统来维护一个钩子链,最近安装的钩子放在链的开始,而最先安装的钩子放在最后,也就是后加入的先获得控制权。

5.3 代码示例

新建类PauseableThreadPool

// 可暂停的线程池
// 演示每个任务执行前后,放钩子函数
public class PauseableThreadPool extends ThreadPoolExecutor {

    // 标记位:用于标记线程是否处于暂停状态
    private boolean isPaused;

    // 为了让上述布尔值的并发修改是安全的,所以上一把锁
    private final ReentrantLock lock = new ReentrantLock();

    // 用于休眠线程
    private Condition unpaused = lock.newCondition();


    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }


    // 在执行每一个任务之前,都会调用该方法:通过识别标记位,来休眠暂停线程
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (isPaused) {
                unpaused.await();               // 休眠暂停线程
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }


    // 实例方法,用于暂停
    public void pause(){
        lock.lock();
        try{
            isPaused = true;                    // 确认暂停
        }finally {
            lock.unlock();                      // 释放这把锁
        }
    }


    // 恢复线程池
    public void resume(){
        lock.lock();
        try{
            isPaused = false;
            unpaused.signalAll();           // 唤醒正在休眠的线程
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 1.创建线程池
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        // 2.新建Runnable对象,即任务
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("我被执行");
                try {
                    Thread.sleep(10);           // 防止运行的太快,进行短暂休息
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        // 3.往线程池中,添加10000个任务
        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);
        }
        // 4.暂停线程池
        Thread.sleep(1500);                 // ?
        pauseableThreadPool.pause();         // 暂停线程池
        System.out.println("线程池被暂停了");
        // 5.恢复线程池
        Thread.sleep(1500);                  // ?
        pauseableThreadPool.resume();        // 恢复线程池
        System.out.println("线程池被恢复了");
    }

}

结果为:

...
我被执行
我被执行
我被执行
我被执行
线程池被暂停了
                // 中间等待了1.5s
线程池被恢复了
我被执行
我被执行
我被执行
...

以上,就是线程池,通过钩子函数,具备了暂停和恢复的扩展能力。

我的问题:

Thread.sleep(1500)啥意思?

让当前“正在执行的线程”休眠(暂停执行)

六、线程池的实现原理、源码分析

6.1 线程池的组成部分

6.2 Executor家族

关系图示:

以下是源码:

1.Executor接口:

2.ExecutorService接口:

是上面的子接口

3.Executors工具类:

用于快速创建线程池;

线程池ThreadPoolExecutor返回的就是上面的子接口ExecutorService

6.3 线程池实现线程复用的原理

七、线程池的状态

7.1 5种

7.2 execute方法

public class ThreadPoolExecutor extends AbstractExecutorService { 
    ...
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
}

7.3 使用线程池的注意点

声明:Jerry's Blog|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - 30.1 线程池——治理线程的法宝


Follow excellence, and success will chase you.