多线程笔记 - provider-consumer

通过多线程实现一个简单的生产者-消费者案例(笔记). 首先定义一个要生产消费的数据类 :  public class Data { private String id; private String name; public Data(...

通过多线程实现一个简单的生产者-消费者案例(笔记).

首先定义一个要生产消费的数据类 : 

public class Data {

    private String id;

    private String name;

    public Data(String id, String name) {
        this.id = id;
        this.name = name;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Data{" +
                "id='" + id + ''' +
                ", name='" + name + ''' +
                '}';
    }
}

生产者首先需要一个装载数据的容器, 产生的数据, 需要往里面放.

public class Provider implements Runnable {

    //1. 装载数据的容器
    private BlockingQueue<Data> queue;

    //2. 运行标志
    private volatile boolean isRunning = true;

    //3. 为数据产生id
    private static AtomicInteger count = new AtomicInteger();

    //4. 随机休眠
    private static Random r = new Random();

    public Provider(BlockingQueue queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        while (isRunning){
            try {
                //随机休眠, 模拟产生数据逻辑耗时
                Thread.sleep(r.nextInt(100));
                //产生一个数据id, 为了避免多线程产生的id重复, 所以这里使用 AtomicInteger
                int id = count.incrementAndGet();
                //创建数据
                Data data = new Data(Integer.toString(id), "data" + id);
                //打印创建日志
                System.out.println( Thread.currentThread().getName() + " ++++++ " + id);
                //将数据加载到容器中
                if(!this.queue.offer(data, 1, TimeUnit.SECONDS)){
                    System.out.println(id + " 提交失败......");
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void stop(){
        this.isRunning = false;
    }

    public void reset(){
        this.isRunning = true;
    }
}

消费者, 也需要知道数据存放的容器, 才能从里面拿取数据进行处理.

public class Consumer implements Runnable {

    //1. 数据容器, 从中取数据进行消费
    private BlockingQueue<Data> queue;

    public Consumer(BlockingQueue<Data> queue){
        this.queue = queue;
    }

    //随机休眠
    private static Random r = new Random();

    @Override
    public void run() {
        while (true){
            try {
                //从队列中取数据
                Data data = this.queue.take();
                //如果获取到的数据为空, 则不进行处理
                if(data == null){
                    continue;
                }
                //随机休眠, 模拟消费数据逻辑处理耗时
                Thread.sleep(r.nextInt(1000));
                //打印消费日志
                System.out.println( Thread.currentThread().getName() + " ------  " + data.getId());
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

测试方法:

public static void main(String[] args){
        //BlockingQueue<Data> queue = new LinkedBlockingQueue<>(16);
        BlockingQueue<Data> queue = new ArrayBlockingQueue<>(16);
        List<Provider> providers = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Provider p = new Provider(queue);
            providers.add(p);
        }

        List<Consumer> consumers = new ArrayList<>();
        for (int i = 0; i < 2; i++) {
            Consumer c = new Consumer(queue);
            consumers.add(c);
        }

        ExecutorService pool = Executors.newCachedThreadPool();
        for (Provider provider : providers) {
            pool.execute(provider);
        }

        for (Consumer consumer : consumers) {
            pool.execute(consumer);
        }

        try {
            Thread.sleep(3000);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (Provider provider : providers) {
            provider.stop();
        }
    }

这里特意用了一个有界队列, 并且特意设置了生产者多于消费者.

打印日志:

 + 号代表产生了数据, 但是不一定加入到容器里了.

- 号代表消费成功, 数据从队列中删除

这里需要注意一个问题:

生产消费都是有速度的, 也就是说, 如果消费速度小于生产速度

1. 使用有界队列 - 会丢数据, 需要对这部分数据做特殊处理

2. 使用无界队列 - 可能会让电脑宕机, 数据积累越来越多, 导致内存不足, 卡死或者直接死机.

需要对应用场景进行分析, 才能决定使用哪种方式.

  • 发表于 2020-02-26 22:01
  • 阅读 ( 86 )
  • 分类:网络文章

条评论

请先 登录 后评论
不写代码的码农
小编

篇文章

作家榜 »

  1. 小编 文章
返回顶部
部分文章转自于网络,若有侵权请联系我们删除