1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > JAVA多线程之生产者消费者模型

JAVA多线程之生产者消费者模型

时间:2021-08-06 12:57:46

相关推荐

JAVA多线程之生产者消费者模型

生产者消费者模型

所谓的生产者消费者模型,是通过一个容器来解决生产者和消费者的强耦合问题。通俗的讲,就是生产者在不断的生产,消费者也在不断的消费,可是消费者消费的产品是生产者生产的,这就必然存在一个中间容器,我们可以把这个容器想象成是一个货架,当货架空的时候,生产者要生产产品,此时消费者在等待生产者往货架上生产产品,而当货架满的时候,消费者可以从货架上拿走商品,生产者此时等待货架的空位,这样不断的循环。那么在这个过程中,生产者和消费者是不直接接触的,所谓的‘货架’其实就是一个阻塞队列,生产者生产的产品不直接给消费者消费,而是仍给阻塞队列,这个阻塞队列就是来解决生产者消费者的强耦合的。就是生产者消费者模型。

总结一下:生产者消费者能够解决的问题如下:

生产与消费的速度不匹配软件开发过程中解耦

在具体实现生产者消费者模型之前需要先描述几个用到的方法:

wait()

先看一下wait()是干什么的?

1.wait()是Object里面的方法,而不是Thread里面的,这一点很容易搞错。它的作用是将当前线程置于预执行队列,并在wait()所在的代码处停止,等待唤醒通知。

2.wait()只能在同步代码块或者同步方法中执行,如果调用wait()方法,而没有持有适当的锁,就会抛出异常。

wait()方法调用后悔释放出锁,线程与其他线程竞争重新获取锁。

举个例子:

public class TestWait implements Runnable {private final Object object=new Object();@Overridepublic void run() {synchronized (object){System.out.println("线程执行开始。。。");try {object.wait();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程执行结束。。。");}}public static void main(String[] args) {TestWait testWait=new TestWait();Thread thread=new Thread(testWait);thread.start();}}

结果如下:

从结果中我们可以看出线程调用了wait()方法后一直在等待,不会继续往下执行。这也就能解释上面说的wait()一旦执行,除非接收到唤醒操作或者是异常中断,否则不会继续往下执行。

notify()方法

在上面的代码中我们看到wait()调用以后线程一直在等待,在实际当中我们难免不希望是这样的,那么这个时候就用到了另一个方法notify方法:

1.notify()方法也是要在同步代码块或者同步方法中调用的,它的作用是使停止的线程继续执行,调用notify()方法后,会通知那些等待当前线程对象锁的线程,并使它们重新获取该线程的对象锁,如果等待线程比较多的时候,则有线程规划器随机挑选出一个呈wait状态的线程。

2.notify()调用之后不会立即释放锁,而是当执行notify()的线程执行完成,即退出同步代码块或同步方法时,才会释放对象锁。

还是上面的例子,刚才我们调用了wait()方法后,线程便一直在等待,接下来我们给线程一个唤醒的信号,代码如下:

public class TestWait implements Runnable {private final Object object=new Object();public void setFlag(boolean flag) {this.flag = flag;}private boolean flag=true;@Overridepublic void run() {if(flag){this.testwait();}else {this.testnotify();}}public void testwait(){synchronized (object){try {System.out.println("线程开始执行。。。");Thread.sleep(1000);object.wait();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程执行结束。。。");}}public void testnotify(){synchronized (object){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}object.notify();}}public static void main(String[] args) {TestWait testWait=new TestWait();Thread thread=new Thread(testWait);thread.start();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}testWait.setFlag(false);Thread thread1=new Thread(testWait);thread1.start();}}

结果如下:

我们看到在调用notify()方法之后,线程又继续了。

notifyAll()方法

从字面意思就可以看出notifyAll是唤醒所有等待的线程。

public class TestWait implements Runnable {private final Object object=new Object();private boolean flag=true;public void setFlag(boolean flag) {this.flag = flag;}@Overridepublic void run() {if(flag){this.testwait();}else {this.testnotify();}}public void testwait(){synchronized (object){try {System.out.println(Thread.currentThread().getName()+"线程开始执行。。。");Thread.sleep(1000);object.wait();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"线程执行结束。。。");}}public void testnotify(){synchronized (object){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}object.notifyAll();}}public static void main(String[] args) {TestWait testWait=new TestWait();Thread thread=new Thread(testWait,"线程1");thread.start();Thread thread1=new Thread(testWait,"线程2");thread1.start();try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}testWait.setFlag(false);Thread thread2=new Thread(testWait);thread2.start();}}

结果如下:

可见notifyAll()方法确实唤醒了所有等待的线程。

小结

出现阻塞的情况大体分为如下5种:

线程调用 sleep方法,主动放弃占用的处理器资源。线程调用了阻塞式IO方法,在该方法返回前,该线程被阻塞。线程试图获得一个同步监视器,但该同步监视器正被其他线程所持有。线程等待某个通知。程序调用了 suspend方法将该线程挂起。此方法容易导致死锁,尽量避免使用该方法。

run()方法运行结束后进入销毁阶段,整个线程执行完毕。

每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒后,才会进入就绪队列,等待CPU的调度;反之,一个线程被wait后,就会进入阻塞队列,等待下一次被唤醒。

生产者消费者模型代码示例

商品类

public class Goods {private int id;private String name;public Goods(int id, String name) {this.id = id;this.name = name;}}

生产者类

public class Producer implements Runnable {private Goods goods;@Overridepublic void run() {while (true) {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (TestPC.queue) {goods=new Goods(1,"商品");if (TestPC.queue.size()<MAX_POOL) {TestPC.queue.add(goods);System.out.println(Thread.currentThread().getName()+"生产商品");} else {try {TestPC.queue.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}}}

消费者类

public class Consumer implements Runnable {@Overridepublic void run() {while (true){try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (TestPC.queue){if(!TestPC.queue.isEmpty()){TestPC.queue.poll();System.out.println(Thread.currentThread().getName()+"消费商品");}else {TestPC.queue.notify();}}}}}

测试类

public class TestPC {public static final int MAX_POOL=10;public static final int MAX_PRODUCER=5;public static final int MAX_CONSUMER=4;public static Queue<Goods> queue=new ArrayBlockingQueue<>(MAX_POOL);public static void main(String[] args) {Producer producer=new Producer();Consumer consumer=new Consumer();for(int i=0;i<MAX_PRODUCER;i++) {Thread threadA = new Thread(producer, "生产者线程"+i);threadA.start();}for(int j=0;j<MAX_CONSUMER;j++) {Thread threadB = new Thread(consumer, "消费者线程"+j);threadB.start();}}}

部分结果展示:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。