跳至主要內容

生产-消费者模式

引领潮流大约 1 分钟java高并发编程juc

生产消费模型

示例

public interface BlockQueue<E>{
    void add(E e); //添加元素
    E take();      //取走元素
    E take(int timeout);
    int size(); //队列size
}

public ArrayBlockQueue<E> implement BlockQueue<E>{
    
    private List<E> blockList = new ArrayList();
    private Object lock = new Object();
    private volaitle int size = 0;
    
    //生产者
    public void add(E e){
        synchionized(lock){
            blockList.add(e);
            lock.notifyAll();   
        }
    }
    
    //消费者
    public E take(int timeout){
    
        synchionized(lock){
            if(timeout <= 0){
                while(blockList.isEmpty()){
                    lock.wait();
                }
            }else{
                long future = System.currentTimeMills()+timeout;
                long remaining = timeout;
                while(blockList.isEmpty() && remain > 0){
                    lock.wait(remaining);            
                    remaining = future - System.currentTimeMills();
                }                
            }
        return blockList.get(0);
    }
    
    //消费者
    public E take(){
        return take(0);
    }
    
    //获得当前队列大小
    public int size(){
        synchionized(lock){
            size = blockList.size();  
        }
        return size;
    }
    
}

场景 生产者--消费者关系

juc使用

线程安全阻塞队列 put/take方法

BlockingQueue 阻塞队列 ArrayBlockingQueue 基于数组BlockingQueue LinkedBlockingQueue 基于链表BlockingQueue 个数不限 PriorityBlockingQueue 带有优先级的BlockingQueue DelayQueue 一定时间才可以take的BlockingQueue ConcurrentLinkedQueue 元素个数没有限制的安全队列

juc数据交换

Exchanger交换缓冲区

main(){
  final Exchanger exchanger = new Exchanger();
  new Thread( new Worker(exchanger, "chen")).start();
  new Thread( new Worker(exchanger, "tong")).start();
}

public Worker implement Runnable{
  private String data;
  private Exchanger changer;

  public Worker(Exchanger changer, String data){
    this. changer = changer;
    this.data = data;
  }

  public void run(){
    System.out.println(Thread.currentThread().getName() + "正在把数据 " + data + " 交换出去");

    String data2 = (String) exchanger.exchange(data);

    System.out.println(Thread.currentThread().getName() + "交换数据 到  " + data2);
}
}

当线程A调用Exchange对象的exchange()方法后,他会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行。