Java并发编程模拟管程(霍尔Hoare管程、汉森Hansan管程、MESA管程)
在前面个两篇博文中,我们使用Java模拟了AND型信号量和信号量集,本文将使用Java来模拟管程,关于管程的一些理论知识,可以参考另一篇博客。
对于管程,在这里我们不做具体的讨论了。不过对于Java和管程之间的一些事,还是很有意思的。Java中,每个对象其实都一个Monitor(java中翻译为监视器),Java中提供的synchronized关键字及wait()、notify()、notifyAll()方法,都是Monitor的一部分,或者说,在Jdk1.5之前也就是JUC没有出现之前,Java都是通过Monitor来实现并发的。Monitor在OS中或者别处的翻译是管程,我也更倾向于翻译为管程,Java中使用的是MESA管程,本文呢,我们就来模拟实现霍尔管和汉森管程,来加强我们对并发编程的能力和增加对线程同步问题的理解。
在实现之前,我们先对几种管程的区别来简单的说一下,这里还是使用我在进程同步机制中说到的问题:如果进程P1因x条件处于阻塞状态,那么当进程P2执行了x.signal操作唤醒P1后,进程P1和P2此时同时处于管程中了,这是不被允许的,那么如何确定哪个执行哪个等待?这个问题也很简单,可采用下面的两种方式之一进行处理:
P2等待,直至P1离开管程或者等待另一个条件;
P1等待,直至P2离开管程或者等待另一个条件。
Hoare管程采用了第一种处理方式;MESA管程采用第二种方式;Hansan管程采用了两者的折中,它规定管程中的所有过程执行的signal操作是过程体的最后一个操作,于是,进程P2执行完signal操作后立即退出管程,因此进程P1马上被恢复执行。
在下面不同管程的具体实现中,我们还是通过解决经典(一直都是他)的生产者–消费者问题来具体的解释。并且下面所有的实现,都是使用JUC中的ReentrantLock(可重入锁)+Condition(条件变量)来实现的。
1.霍尔管程
首先是霍尔管程,也是因为我们对其理论介绍的最多,并且在理论篇中给出了其wait()、signal()操作的伪代码,因此我们首先来实现霍尔管程,我们使用内部类来实现上一文中cond和interf的数据定义,其代码如下:
package XXX.util;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList; import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;/*** 霍尔管程*/ @Slf4j public class MonitorsUtil {//缓冲池中缓冲区static final Integer N = 50;//缓冲池static List<Integer> buffer = new ArrayList<>(N);//互斥锁,用以实现缓冲的互斥访问static Lock lock = new ReentrantLock();static cond notFull = new cond(lock, "notFull");static cond notEmpty = new cond(lock, "notEmpty");//用与存放因无法进入管程的阻塞队列&&因为调用signal阻塞自身的线程(Hoare)static interf IM = new interf(lock);public static void wait(String id, cond declar, interf IM) throws InterruptedException {//获取锁,需要在获得锁的情况下才可以操作conditiondeclar.count++;//log.info("当前condition中阻塞的线程数:【{},{},{},{}】", IM.enterCount, IM.nextCount, notFull.count, notEmpty.count);//判断是否有进程在高优先级队列中if (IM.nextCount > 0) {//唤醒因调用signal操作的线程IM.next.release();}log.info("线程【{}】调用wait被挂起到条件变量【{}】。", id, declar.name);//挂起时自动释放锁,等待进入管程的队列可以获得锁并进入管程declar.condition.await();log.info("被挂起的线程【{}】被唤醒执行。", id);declar.count--;}public static void signal(String id, cond declar, interf IM) throws InterruptedException {log.info("线程【{}】执行了释放资源", id);if (declar.count > 0) {//挂起自己后,因为调用signal挂起自己的进程数量加1IM.nextCount++;//唤醒因为条件变量而阻塞的线程declar.condition.signal();log.info("唤醒的条件变量为:【{}】", declar.name);//log.info("释放后所有condition中阻塞的线程数:【mutex:{},nextCount:{},notFull:{},notEmpty:{}】", IM.enterCount, IM.nextCount, notFull.count, notEmpty.count);//释放资源后,立即把自己挂起,进入高优先级队列-------Hoare的处理方式log.info("线程【{}】调用signal被挂起。", id);//释放lock,不然别的线程无法进入管程lock.unlock();//将当前线程插入到next的阻塞队列中IM.next.acquire();//再次获取锁-->进入管程lock.lock();log.info("被挂起的线程【{}】被唤醒执行。", id);//恢复执行后,等待调用的管程的线程数量减1IM.nextCount--;}}static class interf {//等待着进入管程的队列Condition enter;//等待着进入管程的阻塞队列中线程的数量int enterCount;//发出signal的进程挂起自己的信号量,信号量中记录着等待调用管程的进程Semaphore next;//在next上等待的线程数int nextCount;interf(Lock lock) {enter = lock.newCondition();enterCount = 0;next = new Semaphore(0);nextCount = 0;}}static class cond {String name;Condition condition;int count;cond(Lock lock, String id) {condition = lock.newCondition();count = 0;name = id;}}//往缓冲区中投放消息public static void putMessage(String id, Integer item) throws InterruptedException {lock.lock();//如果缓冲池满了,就挂起到notFull的阻塞队列中log.info("执行了投放消息,缓冲池的消息的数量:【{}】", buffer.size());while (buffer.size() >= N) {log.info("缓冲池满,线程【{}】阻塞", id);wait(id, notFull, IM);}//保证互斥访问//IM.mutex.acquire();buffer.add(item);//IM.mutex.release();signal(id, notEmpty, IM);//... 一些别的操作lock.unlock();}//从缓冲区中取消息消费public static void getMessage(String id, Integer item) throws InterruptedException {//保证互斥访问lock.lock();//如果缓冲池满了,就挂起到notFull的阻塞队列中log.info("执行了消费消息,缓冲池的消息的数量:【{}】", buffer.size());while (buffer.size() <= 0) {wait(id, notEmpty, IM);}item = buffer.remove(0);log.info("消费了一条消息:【{}】", item);//IM.mutex.release();signal(id, notFull, IM);//... 一些别的操作lock.unlock();}} 上面的代码,我们通过ReenTrantLock来控制线程互斥的访问管程,管程提供的过程putMessage()、getMessage()通过先获取lock,保证可以进入管程线程只有一个,对于ReenTrantLock的强大功能来说,在这里成了我模拟霍尔管程的一大“阻碍”,因为ReenTrantLock+Condition,即使线程因调用了condition的wait而阻塞,当被唤醒再次执行时,需要重新去获取lock,如果获取不到就要被插入到阻塞队列中,只能等待lock被释放才有可能执行。因此在上面的signal中,为了保证霍尔管程的规定,我们在阻塞当前线程时,需要先释放lock锁,再次被唤醒时再次重新获得锁,这也是signal中重复的进行释放和获取的原因。
另外,在我们的整个实现中,interf中设计被用来作为互斥进入管程的条件变量没有用到,lock自身的强大帮我们把这部分工作做了,这里保留是为了和理论篇的伪代码保持一致。
2.MESA管程
因为Java实现并发参考的就是MESA模型的管程,因此其实现的ReenTrantLock+Condition就可以很完美的实现MESA管程,我们将霍尔管程中的signal操作进行修改,代码如下:
public static void signal(String id, cond declar, interf IM) throws InterruptedException {log.info("线程【{}】执行了释放资源", id);if (declar.count > 0) {//挂起自己后,因为调用signal挂起自己的进程数量加1IM.nextCount++;//唤醒因为条件变量而阻塞的线程declar.condition.signal();log.info("唤醒的条件变量为:【{}】", declar.name);//log.info("释放后所有condition中阻塞的线程数:【mutex:{},nextCount:{},notFull:{},notEmpty:{}】", IM.enterCount, IM.nextCount, notFull.count, notEmpty.count);//释放资源,继续执行,直至线程退出管程后,别的线程才可进入-------MESA的处理方式log.info("被挂起的线程【{}】被唤醒执行。", id);//恢复执行后,等待调用的管程的线程数量减1IM.nextCount--;} } 我们可以看到,我们只需将Hoare上面那段为了让线程将自己挂起并释放锁的代码去掉就可以实现MESA管程了。
3.汉森管程
对于汉森管程,其实其实现和MESA是相同的,但是,根据其规定,signal应当是线程的最后一个操作,执行完signal操作后要立即退出管程,也就是说也释放lock,让别的线程可以进入管程,也就是其对应的操作代码要如下:
//从缓冲区中取消息消费 public static void getMessage(String id, Integer item) throws InterruptedException {//保证互斥访问lock.lock();//如果缓冲池满了,就挂起到notFull的阻塞队列中log.info("执行了消费消息,缓冲池的消息的数量:【{}】", buffer.size());while (buffer.size() <= 0) {wait(id, notEmpty, IM);}item = buffer.remove(0);log.info("消费了一条消息:【{}】", item);//IM.mutex.release();signal(id, notFull, IM);//signal操作应当是最后一个操作,此处不再允许有别的操作,应当立即退出管程,lock.unlock(); } 也就是说,在每个过程执行完sginal后,应当立即退出管程,而不允许再执行别的操作。
4.执行结果分析
这里呢,只对比霍尔管程和MESA管程(因为MESA管程在执行顺序上适合汉森管程一致的,只不过汉森管程在设计上变得更加严格,规定死了signal执行的位置),这里呢,我们先给生产者、消费者的代码:
/*** 管程测试*/ @Slf4j public class MonitorsTest {static Integer count = 0;static class Producer extends Thread {Producer(String name) {super.setName(name);}@Overridepublic void run() {do {try {log.info("生产了一条消息:【{}】", count);MonitorsUtil.putMessage(this.getName(), count++);//Thread.sleep(1000);} catch (InterruptedException e) {log.error("生产消息时产生异常!");}} while (true);}}static class Consumer extends Thread {Consumer(String name) {super.setName(name);}@Overridepublic void run() {do {try {Integer item = -1;MonitorsUtil.getMessage(this.getName(), item);//Thread.sleep(1000);} catch (InterruptedException e) {log.error("消费消息时产生异常!");}} while (true);}}public static void main(String[] args) {Producer p1 = new Producer("p1");Producer p2 = new Producer("p2");Producer p3 = new Producer("p3");Producer p4 = new Producer("p4");Consumer c1 = new Consumer("c1");Consumer c2 = new Consumer("c2");Consumer c3 = new Consumer("c3");Consumer c4 = new Consumer("c4");p1.start();p2.start();p3.start();p4.start();c1.start();c2.start();c3.start();c4.start();} } 这里因为为了简单模拟消息,知识使用了整形的变量代替消息,首先我们来看下霍尔管程的执行结果,如下图所示,我们可以看到,c4执行signal操作,在释放完走远后,立刻将自身阻塞。
下面我们来看下MESA管程的执行结果,如下图所示,对于执行结果来说,两种管程都可以保证并发的正确执行,但是MESA让线程的执行更顺畅,不会被频繁的阻塞,从结果中也能体现。
又到了分隔线以下,本文到此就结束了,本文内容和代码都是博主自己本人整理和编写,如有错误,还请批评指正。
本文的所有java代码都已通过测试,对其中有什么疑惑的,可以评论区留言,欢迎你的留言与讨论;另外原创不易,如果本文对你有所帮助,还请留下个赞,以表支持。
如有兴趣,还可以查看我的其他几篇博客,都是OS的干货(目录),喜欢的话还请点赞、评论加关注_。
总结
以上是生活随笔为你收集整理的Java并发编程模拟管程(霍尔Hoare管程、汉森Hansan管程、MESA管程)的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 今日头条信息流投放:今日头条怎么开户?多
- 下一篇: Java开发 - Redis初体验