一、多线程与分布式两周介绍
两三年前,不了解分布式没关系;但今日,分布式已是基础内容。
1.1 本周内容
常用的工具
线程:
线程就是相互独立的任务。
比如:
如果把迅雷当作一个进程,当同时下载多个文件的时候,每个任务就被视为一个线程。
二、初始线程池
2.1 本章内容
体系课的部分内容,会跟实战课部分内容相同。因为重要啊,所以很有必要在体系课再梳理下。
2.2 “池”
就是提前准备好,先放在这个池子里。后续使用时,直接从这里拿,而不用重新收集。
比如:人才池,比如应急物资仓库,比如线程池。
“池”机制的核心:
- 重(chong)用
- 缓冲
- 限制最大数
2.3 线程池的必要性:如果不用线程池
通过如果不用线程池所带来的问题,来凸显使用线程池的必要性。
建立工程concurrency_tools_practice
:
concurrency:
the fact of two or more events happening or existing at the same time.
并发
新建类EveryTaskOneThread
:
public class EveryTaskOneThread {
public static void main(String[] args) {
Thread thread = new Thread(new Task());
thread.start();
}
// 任务
public static class Task implements Runnable{ // 实现Runnable接口
@Override
public void run() {
System.out.println("执行了任务");
}
}
}
结果为:
执行了任务
新建类ForLoop
:
唯一不同,是线程的生产方式不同
package threadpool;
public class ForLoop {
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new Task());
thread.start();
}
}
// 任务
public static class Task implements Runnable{ // 实现Runnable接口
@Override
public void run() {
System.out.println("执行了任务");
}
}
}
结果为:
执行了任务
执行了任务
执行了任务
执行了任务
执行了任务
执行了任务
执行了任务
执行了任务
执行了任务
执行了任务
表面上看,在初级阶段,一个任务一个线程,也能满足业务需求。
但是,当任务数量非常多,比如10000个呢?
上述方式也能得出结果,但有个明显的弊端:
因为Java编程语言中的每一个线程,都会对应到操作系统的一个线程。Java程序10000个线程,就会在操作系统创建10000个线程。
线程的创建和销毁,是消耗资源的,比如内存;且回收线程时,也会给GC垃圾回收器带来压力。(成本高)
就跟资本家招聘计件的员工一样:
10000个任务,如果1个任务就要招1个人,那么就要招聘10000个员工。用工成本太高,怎么办?
减少员工数为100人;
让这100人不停的工作,做完第一个任务,还要继续做第二个,反复做、直到每人做完100个。
2.4 线程池的好处:3个
2.5 适合的应用场景
三、创建线程池
线程池有很多的构造方法,因为它的参数有很多。
3.1 线程池构造方法的参数:6个
最主要的是前两个
3.1.1 参数:corePoolSize、maxPoolSize
设计巧妙。
由上可知:
线程池在初始化之后,里面是没有线程池的。
当在线程池放置任务时,就会创建新的线程来执行这个任务。
任务并非均匀固定,比如,今天忙、后天很闲。
所以,任务多时,就是用maxPoolSize;任务很少时,就使用corePoolSize。
核心线程数corePoolSize:
就是一直存活的线程数量
思想:
不写死,而是通过两个参数,营造一种区间范围。
2.添加线程的规则
图示:
举例:
3.增减线程的特点
3.1.2 参数:keep Alive Time
机制:当线程数过多、冗余空闲时,就没有存在的必要了
目的:减少资源消耗。
3.1.3 参数:ThreadFactory
看下源码:
3.1.4 参数:workQueue
3.2 线程池应该手动创建,还是自动创建
以下,介绍几种JDK内置的、提前设计好的的线程池。以及它们的问题。
3.2.1 newFixedThreadPool
特点:线程数是固定的,即core=max
代码示例1:
// 类1:演示newFixedThreadPool
public class FixedThreadPoolTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task()); // 传入下面的任务对象
}
}
}
// 类2:
class Task implements Runnable{
@Override
public void run() {
try {
Thread.sleep(500); // 休眠500ms
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()); // 打印出当前线程的名字
}
}
结果:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-2
pool-1-thread-4
...
看源码:
代码示例2:
因为模拟任务处理速度很慢,所以假设就1个核心线程数,
因为模拟队列中东西越来越多,内存占用过多,把内存设置小一些:
因为内存占用是太多,不能塞了
所以,报出异常OOM
OOM
Out Of Memory
内存不足、内存溢出
3.2.2 SingleThreadExecutor
就是上节newFixedThreadPool
的低配版,即core=max=1
代码示例:
源码:
3.2.3 newCachedThreadPool
可缓存线程池:具有自动回收多余线程的功能
代码示例:
源码:
core=0;
max没有上限
3.2.4 newScheduledThreadPool
特点:支持定时、周期性的执行任务
1.定时的执行任务:
周期性的执行任务:
每间隔3秒来执行任务
3.2.5 应该根据具体业务场景,手动创建
以上的四种JDK内置的、提前设计好的的线程池,各有它们的问题。
所以,一般与具体业务不是能完全契合。
3.3 线程池里的线程数量,应该设定多少合适
3.4 对比常见线程池的特点
1.
这一种用的不多:
2.构造方法的参数对比
3.存储队列分析
四、关闭线程池
4.1 5个方法
其中:
- 5是对于正在执行的任务:发起中断信号;对于在队列中等待执行的任务,会撤回。
4.2 代码示例
1.正常运行的状态:
2.shutdown
方法
通过方式1:通过提交增量任务是否报出异常,来判断
按照预期的报出异常,这说明已成功校验:线程池已shutdown
通过方式2:通过isShutDown
方法返回布尔值,来判断
按照预期的报出 true,这说明已成功校验:线程池已shutdown
3.isTerminated()
是否完全终止?
未完全终止,因为shutDown之后,一些存量任务还是在继续执行,把活儿干完
是否完全终止?
已完全终止,因为10s的时间内,存量任务已执行完成
4.awaitTermination
方法
3秒之内能否执行完毕?不能
即使加了shutdown方法,结果也是false:
如果是7秒之内:
就能 完全执行完毕
5.shutdownNow
方法:
一是会立马发起中断信号,立马中断正在执行的任务
二是撤回在队列中等待执行的任务
List<Runnable> runnables = executorService.shutdownNow(); // 返回的runnables,是队列中待执行的任务
五、暂停、恢复线程池
5.1 四种拒绝策略
1.不要给我加新任务了
2.我现在已经超负荷工作了
根据业务需求,进行选择:
discard
get rid of (someone or something) as no longer useful or desirable.
5.2 钩子方法,给线程池加点料
线程池本身功能强大,还可以通过钩子方法,进行功能扩展。
钩子:
实际上是一个处理消息的程序段,通过系统调用,把它挂入系统。每当特定的消息发出,在没有到达目的窗口前,钩子程序就先捕获该消息。
这时钩子函数即可以加工处理(改变)该消息,也可以不作处理而继续传递该消息,还可以强制结束消息的传递。
对每种类型的钩子由系统来维护一个钩子链,最近安装的钩子放在链的开始,而最先安装的钩子放在最后,也就是后加入的先获得控制权。
5.3 代码示例
新建类PauseableThreadPool
:
// 可暂停的线程池
// 演示每个任务执行前后,放钩子函数
public class PauseableThreadPool extends ThreadPoolExecutor {
// 标记位:用于标记线程是否处于暂停状态
private boolean isPaused;
// 为了让上述布尔值的并发修改是安全的,所以上一把锁
private final ReentrantLock lock = new ReentrantLock();
// 用于休眠线程
private Condition unpaused = lock.newCondition();
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
// 在执行每一个任务之前,都会调用该方法:通过识别标记位,来休眠暂停线程
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await(); // 休眠暂停线程
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
// 实例方法,用于暂停
public void pause(){
lock.lock();
try{
isPaused = true; // 确认暂停
}finally {
lock.unlock(); // 释放这把锁
}
}
// 恢复线程池
public void resume(){
lock.lock();
try{
isPaused = false;
unpaused.signalAll(); // 唤醒正在休眠的线程
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
// 1.创建线程池
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
// 2.新建Runnable对象,即任务
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10); // 防止运行的太快,进行短暂休息
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 3.往线程池中,添加10000个任务
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
// 4.暂停线程池
Thread.sleep(1500); // ?
pauseableThreadPool.pause(); // 暂停线程池
System.out.println("线程池被暂停了");
// 5.恢复线程池
Thread.sleep(1500); // ?
pauseableThreadPool.resume(); // 恢复线程池
System.out.println("线程池被恢复了");
}
}
结果为:
...
我被执行
我被执行
我被执行
我被执行
线程池被暂停了
// 中间等待了1.5s
线程池被恢复了
我被执行
我被执行
我被执行
...
以上,就是线程池,通过钩子函数,具备了暂停和恢复的扩展能力。
我的问题:
Thread.sleep(1500)啥意思?
让当前“正在执行的线程”休眠(暂停执行)
六、线程池的实现原理、源码分析
6.1 线程池的组成部分
6.2 Executor家族
关系图示:
以下是源码:
1.Executor
接口:
2.ExecutorService
接口:
是上面的子接口
3.Executors
工具类:
用于快速创建线程池;
线程池ThreadPoolExecutor
返回的就是上面的子接口ExecutorService
6.3 线程池实现线程复用的原理
七、线程池的状态
7.1 5种
7.2 execute
方法
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
}
Comments | NOTHING