跳至主要內容

8.java并发工具

引领潮流大约 2 分钟java并发编程艺术juc

CountDownLatch 等待多线程完成

等待其他线程完成

倒数计数

计数器只能使用1次 系统启动


public class CountDownLatchTest {

    private static CountDownLatch latch = new CountDownLatch( 2 );

    public static void main(String[] args) throws InterruptedException {

        new Thread( new Runnable() {
            @Override
            public void run() {
                latch.countDown();
                System.out.println("1");

            }
        } ).start();


        new Thread( new Runnable() {
            @Override
            public void run() {
                latch.countDown();
                System.out.println("2");
            }
        } ).start();

        latch.await();
        System.out.println("3");
    }
}    

同步屏障CyclicBarrier

循环使用(Cyclic)的屏障(Barrier)

让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会 开门,所有被屏障拦截的线程才会继续运行。

合并计算结果

//循环计数
// reset重置次数 ,重复使用
public class CyclicBarrierTest {

    public static void main(String[] args) {
        testBarrierAction();
    }

    static void testBarrierAction() {
        final CyclicBarrier barrier = new CyclicBarrier( 2, new BarrierAction() );
        new Thread( new Runnable() {
            @Override
            public void run() {
                System.out.println( 1 );
                try {
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } ).start();

        new Thread( new Runnable() {
            @Override
            public void run() {
                System.out.println( 2 );
                try {
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } ).start();
    }
    
    static class BarrierAction implements Runnable {
        @Override
        public void run() {
            System.out.println( "======BarrierAction=======" );
        }
    }
}

CyclicBarrier和CountDownLatch的区别

CountDownLatch的计数器只能使用一次,

而CyclicBarrier的计数器可以使用reset()方法重置

控制并发线程数的Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以 保证合理的使用公共资源。

流量控制

//公用有限资源限制
//一次性只能有限资源并行执行
public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = Executors.newFixedThreadPool( THREAD_COUNT );

    private static Semaphore semaphore = new Semaphore( 10 );


    public static void main(String[] args){

        for (int i = 0; i<THREAD_COUNT;i++){

            threadPool.execute( new Runnable() {
                @Override
                public void run() {
                    try {
                        //资源许可
                        semaphore.acquire();
                        System.out.println("----work----");
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } );
        }

        threadPool.shutdown();
    }

}

线程间交换数据的Exchanger

Exchanger也可以用于两人校对工作

public class ExchangerTest {

    private static Exchanger<String> exchanger = new Exchanger<String>();

    private static ExecutorService threadPool = Executors.newFixedThreadPool( 2 );

    public static void main(String[] args){

        threadPool.execute( new Runnable() {
            @Override
            public void run() {
                String strB = "this is bank B";
                try {
                    String strA = exchanger.exchange( strB);
                    System.out.println("threadPool-1 equal "+strA.equals( strB ));
                    System.out.println("threadPool-1 A:"+strA);
                    System.out.println("threadPool-1 B:"+strB);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        } );

        threadPool.execute( new Runnable() {
            @Override
            public void run() {

                String strA = "this is bank A";
                try {
                   String strB =  exchanger.exchange( strA );
                    System.out.println("threadPool-2 equal "+strA.equals( strB ));
                    System.out.println("threadPool-2 A:"+strA);
                    System.out.println("threadPool-2 B:"+strB);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } );

    }

}