8.java并发工具
大约 2 分钟
CountDownLatch 等待多线程完成
等待其他线程完成
倒数计数
计数器只能使用1次 系统启动
public class CountDownLatchTest {
private static CountDownLatch latch = new CountDownLatch( 2 );
public static void main(String[] args) throws InterruptedException {
new Thread( new Runnable() {
@Override
public void run() {
latch.countDown();
System.out.println("1");
}
} ).start();
new Thread( new Runnable() {
@Override
public void run() {
latch.countDown();
System.out.println("2");
}
} ).start();
latch.await();
System.out.println("3");
}
}
同步屏障CyclicBarrier
循环使用(Cyclic)的屏障(Barrier)
让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会 开门,所有被屏障拦截的线程才会继续运行。
合并计算结果
//循环计数
// reset重置次数 ,重复使用
public class CyclicBarrierTest {
public static void main(String[] args) {
testBarrierAction();
}
static void testBarrierAction() {
final CyclicBarrier barrier = new CyclicBarrier( 2, new BarrierAction() );
new Thread( new Runnable() {
@Override
public void run() {
System.out.println( 1 );
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
} ).start();
new Thread( new Runnable() {
@Override
public void run() {
System.out.println( 2 );
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
} ).start();
}
static class BarrierAction implements Runnable {
@Override
public void run() {
System.out.println( "======BarrierAction=======" );
}
}
}
CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,
而CyclicBarrier的计数器可以使用reset()方法重置
控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以 保证合理的使用公共资源。
流量控制
//公用有限资源限制
//一次性只能有限资源并行执行
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool( THREAD_COUNT );
private static Semaphore semaphore = new Semaphore( 10 );
public static void main(String[] args){
for (int i = 0; i<THREAD_COUNT;i++){
threadPool.execute( new Runnable() {
@Override
public void run() {
try {
//资源许可
semaphore.acquire();
System.out.println("----work----");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} );
}
threadPool.shutdown();
}
}
线程间交换数据的Exchanger
Exchanger也可以用于两人校对工作
public class ExchangerTest {
private static Exchanger<String> exchanger = new Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool( 2 );
public static void main(String[] args){
threadPool.execute( new Runnable() {
@Override
public void run() {
String strB = "this is bank B";
try {
String strA = exchanger.exchange( strB);
System.out.println("threadPool-1 equal "+strA.equals( strB ));
System.out.println("threadPool-1 A:"+strA);
System.out.println("threadPool-1 B:"+strB);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} );
threadPool.execute( new Runnable() {
@Override
public void run() {
String strA = "this is bank A";
try {
String strB = exchanger.exchange( strA );
System.out.println("threadPool-2 equal "+strA.equals( strB ));
System.out.println("threadPool-2 A:"+strA);
System.out.println("threadPool-2 B:"+strB);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} );
}
}