博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java - ReentrantLock和Condition实现生产者-消费者
阅读量:7095 次
发布时间:2019-06-28

本文共 5346 字,大约阅读时间需要 17 分钟。

java - ReentrantLock和Condition实现生产者-消费者

ReentrantLock

重入锁(ReentrantLock)是一种递归无阻塞的同步机制。

一个可重入的互斥锁(Lock),它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。

  1. ReentrantLock 将由当前已经成功获得锁,并且还没有释放锁的线程所拥有。
  2. 使用lock获得锁,unlock释放锁。
  3. 可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法来检查当前线程是否获得锁及调用了几次

lock

  1. 当锁没有被任何线程获得时,调用lock方法的线程将成功获的锁定并返回,将锁的保持计数设置为 1。
  2. 如果当前线程已经获得该锁(重复获取锁),则将保持计数加 1,并且该方法立即返回。
  3. 如果该锁已经被另一个线程获得,则出于线程调度的目的,将当前线程阻塞,并且在获得锁之前,该线程将一直处于休眠状态。

Condition

参考文章:

任何一个Java对象,都拥有一组监视器方法,主要包括wait()、notify()、notifyAll()方法,这些方法与synchronized关键字配合使用可以实现等待/通知机制。类似地,Condition接口也提供类似的Object的监视器的方法,主要包括await()、signal()、signalAll()方法,这些方法与Lock锁配合使用也可以实现等待/通知机制。

Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll()。传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。

相比Object实现的监视器方法,Condition接口的监视器方法具有一些Object所没有的特性:

  1. Condition接口可以支持多个等待队列,在前面已经提到一个Lock实例可以绑定多个Condition,所以自然可以支持多个等待队列了
  2. Condition接口支持响应中断,前面已经提到过
  3. Condition接口支持当前线程释放锁并进入等待状态到将来的某个时间,也就是支持定时功能

和Object监视器区别

Condition支持在await时不响应Thread的interrupt请求(awaitUninterruptibly方法):

Condition支持的定时唤醒功能(不同于Object的带参wait方法,这里的定时指在指定日期唤醒):

boolean awaitUntil(Date deadline) throws InterruptedException;复制代码

实现生产者-消费者

示例使用 jdk 1.8,用Consumer接口表示消费者,Supplier[]数组表示待生产的产品

关键成员变量

private final Lock lock = new ReentrantLock();    //要生产的“货物”及其生产方式    private final Supplier
[] suppliers; //消费者 private final Consumer
consumer; //生产出的产品 private T product; //生产者是否已经生产出一件产品 private boolean isProducted = false; //条件:生产的产品还没有被消费者消费,需要等待消费者消费 private Condition whenProductWaitConsume = lock.newCondition(); //消费者是否已经消费了生产者生产的产品 private boolean isConsumed = true; //条件:没有产品可以消费,需要等待生产者 private Condition whenHaveProductToConsume = lock.newCondition(); //生产者是否已经生产完所有的产品 private boolean finished = false;复制代码

构造器

public SuppliersAndConsumerPlus(Consumer
consumer, Supplier
... suppliers) { this.suppliers = suppliers; this.consumer = consumer; }复制代码

生产过程

private void doProduct(Supplier
supplier) throws InterruptedException { lock.lock(); try { while (!isConsumed) { P.out("supplier wait"); // 等待生产条件: // 我发现刚生产的产品还没被消费掉,等到产品被消费之后提醒我 whenProductWaitConsume.await(); } // 收到可以生产的信号 product = supplier.get(); // 模拟生产耗时 TimeUnit.SECONDS.sleep(2); P.out("+ " + product); isProducted = true; isConsumed = false; // 产品生产好了,可以消费了 whenHaveProductToConsume.signal(); } finally { lock.unlock(); } }复制代码

遍历所有待生产的产品进行生产:

private void product() throws InterruptedException {        // 遍历生产产品        for (Supplier
supplier : suppliers) { doProduct(supplier); } finished = true; }复制代码

消费过程

private void consume() throws InterruptedException {        P.out("begin consumer");        while (!finished) {            lock.lock();            try {                while (!isProducted) {                    P.out("consumer wait");                    // 等待消费条件:                    // 我发现现在还没有产品可以消费,所以等有产品可以消费了就提醒我                    whenHaveProductToConsume.await();                }                // 收到可以消费的信号                consumer.accept(product);                // 模拟消费耗时                TimeUnit.SECONDS.sleep(2);                P.out("- " + product);                isConsumed = true;                isProducted = false;                // 产品被消费了,可以继续生产了                whenProductWaitConsume.signal();            } finally {                lock.unlock();            }        }    }复制代码

启动生产者和消费者

public void start() {        // 开启消费者线程        new Thread(() -> {            try {                consume();            } catch (InterruptedException e) {                e.printStackTrace();            }        }).start();        //确保生产者在消费者之后启动        try {            TimeUnit.SECONDS.sleep(1);        } catch (InterruptedException e) {            e.printStackTrace();        }        // 开启生产者线程        new Thread(() -> {            try {                product();            } catch (InterruptedException e) {                e.printStackTrace();            }        }).start();    }复制代码

测试

public static void main(String[] args) {        @SuppressWarnings("unchecked")        SuppliersAndConsumerPlus
consumer = new SuppliersAndConsumerPlus<>( String::getBytes, () -> "a", () -> "b", () -> "c", () -> "d"); consumer.start(); }复制代码

测试结果:

05/01/2018 09:47:53-794 begin consumer05/01/2018 09:47:53-795 consumer wait05/01/2018 09:47:56-762 + a05/01/2018 09:47:56-762 supplier wait05/01/2018 09:47:58-763 - a05/01/2018 09:47:58-763 consumer wait05/01/2018 09:48:00-763 + b05/01/2018 09:48:00-763 supplier wait05/01/2018 09:48:02-764 - b05/01/2018 09:48:02-764 consumer wait05/01/2018 09:48:04-764 + c05/01/2018 09:48:04-764 supplier wait05/01/2018 09:48:06-764 - c05/01/2018 09:48:06-764 consumer wait05/01/2018 09:48:08-765 + d05/01/2018 09:48:10-765 - dProcess finished with exit code 0复制代码

代码已上传GitHub,你可以在找到


---END---

转载地址:http://yiaql.baihongyu.com/

你可能感兴趣的文章
HTML5 本地存储 localStorage、sessionStorage 的遍历、存储大小限制处理
查看>>
【leetcode】688. Knight Probability in Chessboard
查看>>
【leetcode】Maximum Product of Word Lengths
查看>>
C 工具库 GLib --- 提供多种高级的数据结构,如内存块、双向和单向链表、哈希表、动态字符串等...
查看>>
SQL中format()函数对应的格式
查看>>
svn command
查看>>
职业插画之路
查看>>
Java入门篇(五)——字符串/String类
查看>>
python 的StringIO
查看>>
第三个阶段事后诸葛亮
查看>>
java中的sql语句中如果有like怎么写
查看>>
【原创】驱动加载之StartService
查看>>
1751: [Usaco2005 qua]Lake Counting
查看>>
【BZOJ】4753: [Jsoi2016]最佳团体 01分数规划+树上背包
查看>>
iOS 获取设备信息之UIDevice的使用,Swift 基于 API
查看>>
IntelliJ cannot log in to GitHub上传github报错解决
查看>>
PlayWithHeyCoder
查看>>
Kotlin入门(27)文件读写操作
查看>>
设计模式-单例模式
查看>>
[开源]KJFramework.Message 智能二进制消息框架 -- 对于数组的极致性优化
查看>>