1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > Java多线程---线程通信(wait notifyAll 生产者消费者经典范式 owner wait set

Java多线程---线程通信(wait notifyAll 生产者消费者经典范式 owner wait set

时间:2024-02-16 04:44:49

相关推荐

Java多线程---线程通信(wait notifyAll 生产者消费者经典范式 owner wait set

转自:/qq_35995514/article/details/91128585

1 学习内容

notifyAll生产者、消费者经典范式线程休息室 wait setsynchronized关键字的缺陷自定义显式锁BooleanLock总结

2 具体内容

2.1 多线程通信

2.1.1 notifyAll方法

多线程的通信要用到Object的notifyAll方法,notifyAll方法可以同时唤醒阻塞的全部线程,当然了被唤醒的线程仍需要继续争抢monitor的锁。

2.1.2 生产者消费者

上一节中我们定义了一个EventQueue,此队列的在多线程的环境中会出现数据不一致的情况,其中的情形是:LinkedList中没有元素的时候仍然调用removeFirst方法;当LinkList中的元素超过10个的时候仍旧执行了addLast方法

LinkedList为空时执行removeFirst方法分析

其实我想着我们在EventQueue的方法中都加了synchronized数据同步,为什么还会出现不一致的情况?我们假设EventQueue的元素为空,两个线程在执行take方法时都陷入阻塞中,另外一个offer线程执行了addLast方法之后唤醒了其中一个阻塞的take线程,该线程消费了一个元素之后刚好唤醒了一个take线程,这时就会执行空LinkedList

2.1.3 改进EventQueue中的offer和take方法

/*** 如果事件队列没有满则添加到队尾,否则等待* @param event*/public void offer(Event event) {synchronized (eventQueue) {while (eventQueue.size() >= max) { //事件队列 > 队列定义的最大值try {console("the Queue is full.");eventQueue.wait();} catch (InterruptedException e) {e.printStackTrace();}}console("the new event is committed.");eventQueue.addLast(event); eventQueue.notifyAll(); //唤醒那些曾经执行monitor的wait方法而陷入阻塞的线程}}/*** 从队头获取数据,如果队列中可用的数据那么工作线程就会调用wait阻塞*/public Event take() {synchronized (eventQueue) {if (eventQueue.isEmpty()) {try {console("the queue is empty. 没有可以拿的要我怎么办!");eventQueue.wait(); } catch (InterruptedException e) {e.printStackTrace();}}Event event = eventQueue.removeLast();this.eventQueue.notifyAll(); // 将等待队列中的任务全部唤醒console("the event" + event + "is handled.");return event;}}

其实只需要将临界值的判断if更改为while,将notify改为notifyAll就可以了。

2.1.4 等待/通知经典范式

如上一节EventQueue例子如是也(有点勉强),范式分为两部份,分别针对等待方(生产者)和通知方(消费者)

等待方遵循如下原则获取对象的锁如果条件不满足,则调用对象的wait方法,被通知后仍要检查条件条件满足则执行相应的逻辑

对应伪代码如下

synchronized(对象){while(条件不满足){对象.wait();}对应的逻辑处理}

通知方遵循如下原则

获取对象的锁改变条件通知所有等待在对象上的线程

对应伪代码如下

synchronized(对象){改变条件对象.notifyAll();}

2.2 生产者消费者经典范式(notifyAll)

可以看出Buffer缓冲区作为一个中介,将生产者和消费者分开,使得两部分相对独立,生产者消费者不需要知道对方的实现逻辑,对其中一个的修改,不会影响另一个,从设计模式的角度看,降低了耦合度。而对于图中处在多线程环境中Buffer,需要共享给多个多个生产者和消费者,为了保证读写数据和操作的正确性与时序性,程序需要对Buffer结构进行同步处理。通常情况下,生产者-消费者模式中的广泛使用的Buffer缓冲区结构是阻塞队列。

生产者消费示例(线程假死)

package com.thread.basicmethod.chapter05;import java.util.stream.Stream;/********************************* @Author: kangna* @Date: /8/23 22:39* @Version: 2.0* @Desc: 生产者 消费者********************************/public class ProduceConsumerVersion_2 {private int i = 0;final private Object LOCK = new Object();private volatile boolean isProduced = false; // 保证boolean 值可见public void produce() {synchronized (LOCK) {//如果已经生产过,别人还没有消费,那就等一下if (isProduced) {try {LOCK.wait(); // 也可以被打断} catch (InterruptedException e) {e.printStackTrace();}} else {i++;System.out.println("produce--->" + i);LOCK.notify(); // 唤醒wait set中的 消费者isProduced = true; // 生产者已经生产}}}public void consume() {synchronized (LOCK) {if (isProduced) { // 如果生产者生产System.out.println("consume--->" + i);// 消费者消费了之后,通知生产者,再生产LOCK.notify(); // 唤醒生产者(有多个生产者),赶紧给我 生产,我要isProduced = false; // 消费之后,将状态置为 false} else {try {LOCK.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}public static void main(String[] args) {ProduceConsumerVersion_2 pc = new ProduceConsumerVersion_2();Stream.of("P1","P2").forEach(n ->new Thread("P") {@Overridepublic void run() {while (true) {pc.produce();}}}.start());Stream.of("C1","C2").forEach(n ->new Thread("S") {@Overridepublic void run() {while (true) {pc.consume();}}}.start());}}

运行的结果如下:

两个生产者P1,P2,两个消费者C1,C2,为什么线程会进入假死,不进行生产和消费了,我们进行了jstack 信息打印看了一下,也确定没有死锁。问题就出在notify,因为到后边生产者生产了一个货物之后,不知道notify谁,陷入了两难的境地,最后大家都blocked了。

接下来我们修改一下代码其实跟上边的EventQueue一样的

package com.thread.basicmethod.chapter05;import java.util.stream.Stream;/********************************* @Author: kangna* @Date: /8/23 23:31* @Version: 3.0* @Desc: 多线程状态下的生产者和消费者********************************/public class ProduceConsumerVersion_3 {private int i = 0; // 货物final private Object LOCK = new Object(); // 定义共享锁private volatile boolean isProduced = false;public void produce() {synchronized (LOCK) {while (isProduced) { // 避免生产者生产 重复数据try {LOCK.wait(); // 也可以被打断} catch (InterruptedException e) {e.printStackTrace();}}i++;System.out.println("product--->" + i);LOCK.notifyAll(); isProduced = true; // 生产者已经生产了。}}public void consume() {synchronized (LOCK) {while (!isProduced) { // 避免生产一次,消费多次的状况try {LOCK.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("consume--->" + i);LOCK.notifyAll();isProduced = false;}}public static void main(String[] args) {ProduceConsumerVersion_3 pc = new ProduceConsumerVersion_3();Stream.of("P1","P2").forEach(n ->new Thread("P") {@Overridepublic void run() {while (true) {pc.produce();}}}.start());Stream.of("C1","C2").forEach(n ->new Thread("S") {@Overridepublic void run() {while (true) {pc.consume();}}}.start());}}

运行结果

将循环判断 while 判断改为 if 看看运行结果

说明,为什么会出现生产一个数据,有两次消费的情况?

p1,p2只有一个可以抢到锁,首先P1抢到锁,生产了一个数据,P2已经进入wait队列中放弃monitor,p1生产完之后,也会进入wait Set队列中。会执notifyAll,这里就是问题,不会循环判断,进而会继续向下执行(i++),唤醒wait set中的P2,p2又生产一个,P1生产了一个消费者还没有消费,P2又生产了一个。

为什么使用while 而不是if

在循环(loop)里调用 wait 和 notify,不是在 If语句,wait应该在被synchronized的背景下和那个被多线程共享的对象上调用,应该永远在while循环,而不是if语句中调用wait。因为线程是在某些条件下等待的——在我们的例子里,即“如果消费者没有消费的话,那么生产者线程应该等待”,你可能直觉就会写一个if语句。但 if 语句存在一些微妙的小问题,我们使用 if 确实出现了问题。所以如果不在线程被唤醒后再次使用while 循环检查唤醒条件是否被满足(满足条件),程序就有可能会出错——例如上面例子中我们生产了重复相同的数据(覆盖),导致消费者消费重复数据。

基于以上认知,下面这个是使用wait和notify函数的规范代码模板:

// The standard idiom for calling the wait method in Javasynchronized(sharedObject) {while(condition) {sharedObject.wait();// (Releases lock, and reacquires on wakeup)}// do action based upon condition e.g. take or put into queue}

在while循环里使用 wait 的目的,是在线程被唤醒的前后都持续检查条件是否被满足。如果条件并未改变,wait被调用之前notify的唤醒通知就来了,那么这个线程并不能保证被唤醒,有可能会导致死锁问题。

注意:

永远在synchronized的方法或对象里使用wait、notify和notifyAll,不然Java虚拟机会生成

IllegalMonitorStateException。永远在while循环里而不是if语句下使用wait。这样,循环会在线程睡眠前后都检查wait的条件,并在条件实际上并未改变的情况下处理唤醒通知。永远在多线程间共享的对象(在生产者消费者模型里即缓冲区队列)上使用wait。

2.2线程休息室wait set

在虚拟机规范中存在一个wait set的概念,线程调用了对象的wait方法之后线程会被加入与该对象monitor关联的wait set中,并且释放monitor的所有权,然后会进入可执行状态,继续加入抢锁的队伍中,如果抢到锁,就会继续执行,相反,则不能继续执行。

如图,是若干个线程调用wait方法之后被加入与monitor关联的wait set中,当另外一个线程调用该monitor的notify方法之后,其中一个线程会从wait set中弹出,至于是随机弹出还是以先进先出的方式弹出,虚拟机没说

而执行notifyAll则不需要考虑那个线程会被弹出,因为wait set中的所有wait线程都将被弹出

2.3 再看monitor(监视器和wait set)

接下来我们再看下在monitor在Java虚拟机(HotSpot)中的实现,其实现是基于C++的,由ObjectMonitor实现的,其主要数据结构如下:

ObjectMonitor() {_header = NULL;_count = 0;_waiters= 0,_recursions = 0;_object = NULL;_owner = NULL;_WaitSet= NULL;_WaitSetLock = 0 ;_Responsible = NULL ;_succ = NULL ;_cxq= NULL ;FreeNext= NULL ;_EntryList = NULL ;_SpinFreq= 0 ;_SpinClock = 0 ;OwnerIsThread = 0 ;}

ObjectMonitor.hpp源码地址:

在上面的源码我们可以看到ObjectMonitor中有几个关键属性:

_owner:指向持有ObjectMonitor对象的线程_WaitSet:存放处于wait状态的线程队列_EntryList:存放处于等待锁block状态的线程队列_recursions:锁的重入次数_count:用来记录该线程获取锁的次数

当多个线程同时访问一段同步代码时,首先会进入_EntryList队列中,当某个线程获取到对象的monitor后进入_Owner区域并把monitor中的_owner变量设置为当前线程,同时monitor中的计数器_count加1。即获得对象锁。

若持有monitor的线程调用wait()方法,将释放当前持有的monitor,_owner变量恢复为null,_count自减1,同时该线程进入_WaitSet集合中等待被唤醒。若当前线程执行完毕也将释放monitor(锁)并复位变量的值,以便其他线程进入获取monitor(锁)。如下图所示:

2.4 自定义显式锁BooleanLock

2.4.1 synchronized关键字的缺陷

synchronized关键字提供了一种排他式的数据同步机制,某个线程在获取monitor锁的时候可能陷入阻塞,而这种阻塞有两种缺陷:

无法控制阻塞时长阻塞不可中断

示例:

public class SynchronizedDefect {public synchronized void syncMethod(){try {TimeUnit.HOURS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String args[]) throws InterruptedException{SynchronizedDefect defect = new SynchronizedDefect();Thread t1 = new Thread(defect :: syncMethod, "T1");t1.start();TimeUnit.MILLISECONDS.sleep(2); //主线程休息2毫秒Thread t2 = new Thread(defect :: syncMethod, "T2");t2.start();}}

上面的代码T1线程会最先进入同步方法,而导致T2阻塞,T2的执行完全取决于T1何时释放,如果T2计划最后一分钟获得所有权,否则就放弃,很显然这种方式是做不到的,其实这就是前面说的阻塞时长无法控制。

另外一个缺陷就是T2线程会因争抢某个monitor的锁而进入阻塞状态,它是无法中断的,虽然T2可以设置中断interrupt标识,但是synchronized不像wait与sleep一样能够捕捉到中断信号。

2.4.2 显式锁BooleanLock

现在我们构造一个显式的BooleanLock,使其不仅具备synchronized关键字的所有功能同时具备可中断可超时的功能。

1.定义Lock接口

public interface Lock {/*** lock方法永远阻塞,除非获得了锁,这一点和synchronized相似,但是该 方法可以被中断的,中断时抛出异常InterruptedException* @throws InterruptedException*/public void lock() throws InterruptedException;/*** lock方法除了可以被中断外 还增加了超时功能* @param mills* @throws InterruptedException* @throws TimeoutException*/public void lock(long mills) throws InterruptedException, TimeoutException;/*** 该方法可用来进行锁的释放*/public void unlock();/*** @return 获取当前有哪些线程被阻塞*/public List<Thread> getBlockedThreads();}

2 实现BooleanLock

BooleanLock是Lock的一个Boolean实现,通过控制一个Boolean变量的开关来决定是否允许当前的线程获取该锁。

package com.kangna.concurrent.chapter05;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.Optional;import java.util.concurrent.TimeoutException;import static java.lang.Thread.currentThread;import static java.lang.System.currentTimeMillis;public class BooleanLock implements Lock{//当前拥有锁的线程private Thread currentThread;//locked是一个Boolean开关, true代表该锁已经被某个线程获得private boolean locked = false;//存储拿些线程在获取当前线程时进入了阻塞状态private final List<Thread> blockedList = new ArrayList<>();@Overridepublic void lock() throws InterruptedException {synchronized(this){ // 同步代码块进行方法同步// 如果当前锁已经被某个线程获得,则该线程加入阻塞队列,并且使当前线程wait释放对this monitor的所有权while(locked){ //暂停当前线程final Thread tempThread = currentThread();try {if (!blockedList.contains(tempThread)) {blockedList.add(currentThread());this.wait(); //等待} } catch (InterruptedException e) {//如果当前线程在wait时被中断,则从blockedList中将其删除,避免内存泄漏blockedList.remove(currentThread());throw e;} }//如果当前锁没有被其它线程获得,则该线程将尝试从阻塞队列中删除自己,blockedList.remove(currentThread());//该锁已经被某个线程获得this.locked = true;//记录获取锁的线程this.currentThread = currentThread();}}@Overridepublic void lock(long mills) throws InterruptedException, TimeoutException {synchronized(this){if(mills <= 0){this.lock(); // 抛异常也可以} else {long remainingMills = mills;long endMills = currentTimeMillis() + remainingMills;while(locked){//如果remainingMills小于等于0,则意味着当前线程被其它线程唤醒或者在指定的wait时间后还没有获得锁,会抛出超时异常if(remainingMills <= 0){throw new TimeoutException("can not get the lock during " + mills + " ms.");}if(!blockedList.contains(currentThread)){blockedList.add(currentThread);//等待remainingMills的毫秒数,this.wait(remainingMills);remainingMills = endMills - currentTimeMillis();}//获得锁,并且从block列表中删除当前线程,将locked的状态改为true并且指定获得锁的线程就是当前线程blockedList.remove(currentThread());this.locked = true;this.currentThread = currentThread();}}}}/*** 此方法将locked状态改为false,并且唤醒在wait set中的其它线程,再次争抢锁资源,,注意哪个线程 加的锁只能由该线程来解锁*/@Overridepublic void unlock() {synchronized(this){//判断当前线程是否为获取锁的线程,只有加了锁的线程才有资格进行解锁,if(currentThread == currentThread()){this.locked = false;Optional.of(currentThread().getName() + "release the lock.").ifPresent(System.out :: println);//通知wait set 中的线程你们可以再次尝试抢锁了this.notifyAll();}}}@Overridepublic List<Thread> getBlockedThreads() {return Collections.unmodifiableList(blockedList);}}

3.使用BooleanLock

(1)多个线程通过lock()方法争抢锁

package com.kangna.concurrent.chapter05;import static java.lang.Thread.currentThread;import static java.util.concurrent.ThreadLocalRandom.current;import java.util.concurrent.TimeUnit;import java.util.stream.IntStream;public class BooleanLockTest {//定义BooleanLockprivate final Lock lock = new BooleanLock();//使用try finally 语句确保lock每次都能正确的释放public void synMethod(){try{lock.lock(); int randomInt = current().nextInt(10);System.out.println(currentThread() + "get the lock.");TimeUnit.SECONDS.sleep(randomInt);} catch(InterruptedException e) {e.printStackTrace();} finally {lock.unlock(); //释放锁}}public static void main(String args[]){BooleanLockTest blt = new BooleanLockTest();//定义一个线程并且启动IntStream.range(0, 10).mapToObj(i -> new Thread(blt :: synMethod)).forEach(Thread :: start);}}

测试结果

Thread[Thread-0,5,main]get the lock.Thread-0release the lock.Thread[Thread-9,5,main]get the lock.Thread-9release the lock.Thread[Thread-1,5,main]get the lock.Thread-1release the lock.Thread[Thread-8,5,main]get the lock.Thread-8release the lock.Thread[Thread-2,5,main]get the lock.Thread-2release the lock.Thread[Thread-7,5,main]get the lock.Thread-7release the lock.Thread[Thread-3,5,main]get the lock.Thread-3release the lock.Thread[Thread-6,5,main]get the lock.Thread-6release the lock.Thread[Thread-4,5,main]get the lock.Thread-4release the lock.Thread[Thread-5,5,main]get the lock.Thread-5release the lock.

从测试结果我们可以看出,每次确保只有一个线程能够获得锁的执行权限,这一点与synchronized很接近了。

(2)可中断被阻塞的线程

修改测试类

BooleanLockTest blt = new BooleanLockTest();new Thread(blt :: synMethod, "T1").start();TimeUnit.MICROSECONDS.sleep(2);Thread t2 = new Thread(blt :: synMethod, "T2");t2.start();TimeUnit.MICROSECONDS.sleep(10);t2.interrupt(); //T2线程运行10毫秒后主动将其打断

(3)阻塞的线程可超时

public class BooleanLockTest {//定义BooleanLockprivate final Lock lock = new BooleanLock();public void synMethodTimeoutable(){try{lock.lock(1000);System.out.println(currentThread() + "get the lock.");int randomInt = current().nextInt(10);TimeUnit.SECONDS.sleep(randomInt);} catch (InterruptedException | TimeoutException e){e.printStackTrace();} finally {lock.unlock();}}public static void main(String args[]) throws InterruptedException{BooleanLockTest blt = new BooleanLockTest();new Thread(blt :: synMethod, "T1").start();TimeUnit.MICROSECONDS.sleep(2);Thread t2 = new Thread(blt :: synMethodTimeoutable, "T2");t2.start();TimeUnit.MICROSECONDS.sleep(10);}}

运行结果

Thread[T1,5,main]get the lock.T1release the lock.java.util.concurrent.TimeoutException: can not get the lock during 1000 ms.T2release the lock.at com.kangna.concurrent.chapter05.BooleanLock.lock(BooleanLock.java:59)at com.kangna.concurrent.chapter05.BooleanLockTest.synMethodTimeoutable(BooleanLockTest.java:27)at java.lang.Thread.run(Unknown Source)

3 总结

生产者与消费者是最常用的模型之一wait set介绍针对synchronized缺陷开发显式锁BooleanLock

Java多线程---线程通信(wait notifyAll 生产者消费者经典范式 owner wait set 自定义显式锁BooleanLock)

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。