跳至主要內容

9.java线程池

引领潮流大约 3 分钟java并发编程艺术juc

用到知识点

  1. 降低资源消耗
  2. 提高响应速度
  3. 提高线程的可管理性

线程池原理

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务
  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务
  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法

线程池参数

  1. corePoolSize(线程池的基本大小)
  2. maximumPoolSize(线程池最大数量)
  3. keepAliveTime(空闲线程活动保持时间):线程池工作线程空闲后,保持存活时间
  4. TimeUnit(时间单位)
  5. workQueue(任务队列):用于保存等待执行任务的阻塞队列 ArrayBlockQueue LinkedBlockQueue SynchronousQueue(不存储元素阻塞队列) PriorityBlockingQueue(优先级无限阻塞队列)
  6. threadFactory(线程工厂)
  7. RejectedExecutionHandler 饱和策略 1、AbortPolicy:直接抛出异常 2、CallerRunsPolicy:调用者所在线程来运行任务 3、DiscardOrderestPolicy:丢队列最近一个任务,并执行 4、DiscardPolicy:不处理,丢掉

关闭线程池

shutdownNow shutdown

线程池监控

taskCount:线程池需要执行的任务数量。

completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。

largestPoolSize:线程池里曾经创建过的最大线程数量。

getPoolSize:线程池的线程数量

getActiveCount:获取活动的线程数。

重写扩展方法

beforeExecute

afterExecute

terminated

线程池自定义实现

1、shutdown 优雅关闭线程 2、notify wait 实现生产者消费者

/**
 * 线程池技术
 * @author chentong
 */
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {

    private static final int MAX_WORKER_NUM = 10;
    private static final int DEFAULT_WORKER_NUM = 5;
    private static final int MIN_WORKER_NUM = 1;

    private final List<Worker> workers = Collections.synchronizedList( new ArrayList<Worker>() );

    //lock同步
    private final Object lock = new Object();

    //原子变量操作
    private volatile AtomicInteger workerNum = new AtomicInteger( 0 );

    private final JobQueue<Job> queue = new JobQueue<>();

    public DefaultThreadPool() {
        addWorkers( DEFAULT_WORKER_NUM );
    }

    @Override
    public void execute(Job job) {
        if (job != null) {
            queue.enqueue( job );
        }
    }

    @Override
    public void addWorkers(int num) {

        synchronized (lock) {
            if (num + this.workerNum.get() > MAX_WORKER_NUM) {
                num = MAX_WORKER_NUM - this.workerNum.get();
                if (num <= 0) return;
            }

            for (int i = 0; i < num; i++) {
                Worker worker = new Worker( queue );
                workers.add( worker );
                Thread thread = new Thread( worker, "ThreadPool-Worker-" + workerNum.incrementAndGet() );
                thread.start();
            }
        }

    }

    @Override
    public void removeWorker(int num) {

        synchronized (lock) {
            //线程池最小线程数
            if (num >= this.workerNum.get()) {
                num = this.workerNum.get() - MIN_WORKER_NUM;
                if (num <= 0) return;
            }

            int count = 0;

            while (count < num) {

                Worker worker = workers.get( count );
                if (workers.remove( worker )) {
                    worker.shutdown();
                    count++;
                }
            }
            //减少线程
            workerNum.getAndAdd( -num );
        }

    }

    @Override
    public void shutdown() {
        synchronized (lock){
            for (Worker worker : workers) {
                worker.shutdown();
            }
            workers.clear();
        }
    }

    @Override
    public int getJobSize() {
        return queue.getJobSize();
    }

}

//优雅关闭线程
class Worker implements Runnable {

    private volatile boolean running = true;

    private JobQueue queue;

    public Worker(JobQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (running) {
            Runnable job = queue.dequeue();
            if (job != null) {
                try {
                    job.run();
                } catch (Exception e) {
                }
            }
        }
    }

    public void shutdown() {
        running = false;
    }
}

//工作队列
//2、notify wait 实现生产者消费者
class JobQueue<Job extends Runnable> {

    private final LinkedList<Job> jobs = new LinkedList<>();

    private final Object lock = new Object();

    public void enqueue(Job job) {
        synchronized (lock) {
            lock.notifyAll();
            jobs.addLast( job );
        }
    }

    public Job dequeue() {

        synchronized (lock) {
            while (jobs.isEmpty()) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            Job job = jobs.removeFirst();
            return job;
        }
    }

    public int getJobSize() {
        synchronized (lock) {
            return jobs.size();
        }
    }

}

//线程池
interface ThreadPool<Job extends Runnable> {

    //执行job
    void execute(Job job);

    //关闭连接池
    void shutdown();

    //增加works
    void addWorkers(int num);

    //减少工作线程
    void removeWorker(int num);

    int getJobSize();

}