生产者消费者问题总结

生产者-消费者算是并发编程中常见的问题。依靠缓冲区我们可以实现生产者与消费者之间的解耦。生产者只管往缓冲区里面放东西,消费者只管往缓冲区里面拿东西。这样我们避免生产者想要交付数据给...

生产者-消费者算是并发编程中常见的问题。依靠缓冲区我们可以实现生产者与消费者之间的解耦。生产者只管往缓冲区里面放东西,消费者只管往缓冲区里面拿东西。这样我们避免生产者想要交付数据给消费者,但消费者此时还无法接受数据这样的情况发生。

wait notify

这个问题其实就是线程间的通讯,所以要注意的是不能同时读写。生产者在缓冲区满的时候不生产,等待;消费者在缓冲区为空的时候不消费,等待。比较经典的做法是waitnotify

生产者线程执行15次set操作

public class Producer implements Runnable{
    private Channel channel;

    public Producer(Channel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        for(int i=0;i<15;i++){
            channel.set(Thread.currentThread().getName()+" "+i);
        }
    }
}

消费者线程执行10次get操作

public class Consumer implements Runnable {
    private Channel channel;

    public Consumer(Channel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        for(int i=0;i<10;i++){
            System.out.println("Consumer "+Thread.currentThread().getName()+" get "+channel.get());
        }
    }
}

现在定义Channel类,并创建两个生产者线程和三个消费者线程

public class Channel {
    private List<String> buffer=new ArrayList<>();
    private final int MAX_SIZE=10;

    public synchronized String get(){
        while (buffer.size()==0){//不要用if,醒来了也要再次判断
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        String str=buffer.remove(0);
        notifyAll();
        return str;
    }
    public synchronized void set(String str){
        while (buffer.size()==MAX_SIZE){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        buffer.add(str);
        notifyAll();
    }

    public static void main(String[] args) {
        Channel channel=new Channel();
        Producer producer=new Producer(channel);
        Consumer consumer=new Consumer(channel);
        for(int i=0;i<2;i++){
            new Thread(producer).start();
        }
        for (int i=0;i<3;i++){
            new Thread(consumer).start();
        }
    }
}

使用notifyAll而不是notify的原因是,notify有可能出现多次唤醒同类的情况,造成“假死”。我们可以使用Condition来实现更精确的唤醒。

Condition

将上面代码中的Channel类修改一下即可

public class Channel {
    private List<String> buffer=new ArrayList<>();
    private final int MAX_SIZE=10;
    private Lock lock=new ReentrantLock();
    private Condition producer=lock.newCondition();
    private Condition consumer=lock.newCondition();
    
    public String get(){
        String str=null;
        try {
            lock.lock();
            while (buffer.size()==0){
                consumer.await();
            }
            str=buffer.remove(0);
            producer.signalAll();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return str;
    }
    public void set(String str){
        try {
            lock.lock();
            while (buffer.size()==MAX_SIZE){
                producer.await(); 
            }
            buffer.add(str);
            consumer.signalAll();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

双缓冲与Exchanger

当同步的花销非常大时,我们可以采用双缓冲区的办法。双缓冲的一个好处就在于:因为生产者和消费者各自拥有一个缓冲区,所以他们不会同时对同一个缓冲区进行操作,那么我们就不需要为读写操作加锁,用空间换了时间。在Java中可以通过Exchanger来交换两个线程之间的数据结构。

public class Producer implements Runnable{
    private List<String> buffer;
    private Exchanger<List<String>> exchanger;
    public Producer(List<String> buffer, Exchanger<List<String>> exchanger){
        this.buffer=buffer;
        this.exchanger=exchanger;
    }
    @Override
    public void run() {
        for(int i=0;i<10;i++){
            for (int j=0;j<10;j++)
            buffer.add("Thrad "+Thread.currentThread().getName()+" : "+i+" "+j);
            try {
                buffer=exchanger.exchange(buffer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

public class Consumer implements Runnable {
    private Exchanger<List<String>> exchanger;
    private List<String> buffer;

    public Consumer(List<String> buffer,Exchanger<List<String>> exchanger) {
        this.exchanger = exchanger;
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for(int i=0;i<10;i++){
            try {
             buffer=exchanger.exchange(buffer);
            } catch (InterruptedException e) {
             e.printStackTrace();
            }
            for(int j=0;j<10;j++){
                String message=buffer.get(0);
                System.out.println(message);
                buffer.remove(0);
            }
        }
    }
}

public class Main {
    public static void main(String[] args) {
        List<String> buffer1=new ArrayList<>();
        List<String> buffer2=new ArrayList<>();
        Exchanger<List<String>> exchanger=new Exchanger<>();
        Producer producer=new Producer(buffer1,exchanger);
        Consumer consumer=new Consumer(buffer2,exchanger);
        Thread t1=new Thread(producer);
        Thread t2=new Thread(consumer);
        t1.start();
        t2.start();
    }
}

BlockingQueue

我们可以使用更为方便安全的阻塞式集合来实现生产消费者模型。

这类集合具有的特点是:当集合已满或者是为空的时候,被调用的方法不会立即执行,该方法将被阻塞,直到可以成功执行为止。

public class Channel {
    private BlockingQueue<String> blockingQueue=new ArrayBlockingQueue<>(10);
    public String get(){
        String str=null;
        try {
            str=blockingQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return str;
    }
    public void set(String str){
        try {
            blockingQueue.put(str);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这次的Channel类是不是比之前的简洁了许多,有了BlockingQueue我们就不用再去写wait和notify了。

  • 发表于 2020-07-15 09:20
  • 阅读 ( 95 )
  • 分类:网络文章

条评论

请先 登录 后评论
不写代码的码农
小编

篇文章

作家榜 »

  1. 小编 文章
返回顶部
部分文章转自于网络,若有侵权请联系我们删除