9.java线程池
大约 3 分钟
用到知识点
- 降低资源消耗
- 提高响应速度
- 提高线程的可管理性
线程池原理
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务
- 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue
- 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务
- 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法
线程池参数
- corePoolSize(线程池的基本大小)
- maximumPoolSize(线程池最大数量)
- keepAliveTime(空闲线程活动保持时间):线程池工作线程空闲后,保持存活时间
- TimeUnit(时间单位)
- workQueue(任务队列):用于保存等待执行任务的阻塞队列 ArrayBlockQueue LinkedBlockQueue SynchronousQueue(不存储元素阻塞队列) PriorityBlockingQueue(优先级无限阻塞队列)
- threadFactory(线程工厂)
- RejectedExecutionHandler 饱和策略 1、AbortPolicy:直接抛出异常 2、CallerRunsPolicy:调用者所在线程来运行任务 3、DiscardOrderestPolicy:丢队列最近一个任务,并执行 4、DiscardPolicy:不处理,丢掉
关闭线程池
shutdownNow shutdown
线程池监控
taskCount:线程池需要执行的任务数量。
completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
largestPoolSize:线程池里曾经创建过的最大线程数量。
getPoolSize:线程池的线程数量
getActiveCount:获取活动的线程数。
重写扩展方法
beforeExecute
afterExecute
terminated
线程池自定义实现
1、shutdown 优雅关闭线程 2、notify wait 实现生产者消费者
/**
* 线程池技术
* @author chentong
*/
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
private static final int MAX_WORKER_NUM = 10;
private static final int DEFAULT_WORKER_NUM = 5;
private static final int MIN_WORKER_NUM = 1;
private final List<Worker> workers = Collections.synchronizedList( new ArrayList<Worker>() );
//lock同步
private final Object lock = new Object();
//原子变量操作
private volatile AtomicInteger workerNum = new AtomicInteger( 0 );
private final JobQueue<Job> queue = new JobQueue<>();
public DefaultThreadPool() {
addWorkers( DEFAULT_WORKER_NUM );
}
@Override
public void execute(Job job) {
if (job != null) {
queue.enqueue( job );
}
}
@Override
public void addWorkers(int num) {
synchronized (lock) {
if (num + this.workerNum.get() > MAX_WORKER_NUM) {
num = MAX_WORKER_NUM - this.workerNum.get();
if (num <= 0) return;
}
for (int i = 0; i < num; i++) {
Worker worker = new Worker( queue );
workers.add( worker );
Thread thread = new Thread( worker, "ThreadPool-Worker-" + workerNum.incrementAndGet() );
thread.start();
}
}
}
@Override
public void removeWorker(int num) {
synchronized (lock) {
//线程池最小线程数
if (num >= this.workerNum.get()) {
num = this.workerNum.get() - MIN_WORKER_NUM;
if (num <= 0) return;
}
int count = 0;
while (count < num) {
Worker worker = workers.get( count );
if (workers.remove( worker )) {
worker.shutdown();
count++;
}
}
//减少线程
workerNum.getAndAdd( -num );
}
}
@Override
public void shutdown() {
synchronized (lock){
for (Worker worker : workers) {
worker.shutdown();
}
workers.clear();
}
}
@Override
public int getJobSize() {
return queue.getJobSize();
}
}
//优雅关闭线程
class Worker implements Runnable {
private volatile boolean running = true;
private JobQueue queue;
public Worker(JobQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (running) {
Runnable job = queue.dequeue();
if (job != null) {
try {
job.run();
} catch (Exception e) {
}
}
}
}
public void shutdown() {
running = false;
}
}
//工作队列
//2、notify wait 实现生产者消费者
class JobQueue<Job extends Runnable> {
private final LinkedList<Job> jobs = new LinkedList<>();
private final Object lock = new Object();
public void enqueue(Job job) {
synchronized (lock) {
lock.notifyAll();
jobs.addLast( job );
}
}
public Job dequeue() {
synchronized (lock) {
while (jobs.isEmpty()) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Job job = jobs.removeFirst();
return job;
}
}
public int getJobSize() {
synchronized (lock) {
return jobs.size();
}
}
}
//线程池
interface ThreadPool<Job extends Runnable> {
//执行job
void execute(Job job);
//关闭连接池
void shutdown();
//增加works
void addWorkers(int num);
//减少工作线程
void removeWorker(int num);
int getJobSize();
}