1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > Executor多线程框架学习笔记(五):FutureTask

Executor多线程框架学习笔记(五):FutureTask

时间:2024-07-20 21:01:19

相关推荐

Executor多线程框架学习笔记(五):FutureTask

FutureTask

ScheduledThreadPoolExecutor里面用到了FutureTask,提前将对其进行讲述

public class FutureTask<V> implements RunnableFuture<V> {//当前任务的运行状态 volatile修饰了他的可见性//因为任务在运行中是使用子线程运行的在其他线程中可以对任务进行取消和中断等操作,所以这里修饰为了再多线程中显示private volatile int state;//以下是state的几个值//任务初始是newprivate static final int NEW= 0;//任务执行完成private static final int COMPLETING = 1;//任务正常private static final int NORMAL = 2;//任务执行时发生异常private static final int EXCEPTIONAL = 3;//任务取消private static final int CANCELLED = 4;//任务中断中private static final int INTERRUPTING = 5;//任务中断private static final int INTERRUPTED = 6;//下面是任务状态的转换顺序/*NEW -> COMPLETING -> NORMALNEW -> COMPLETING -> EXCEPTIONALNEW -> CANCELLEDNEW -> INTERRUPTING -> INTERRUPTED*///最终执行的回调方法此接口内部只有一个call方法并且可以获取执行的返回值private Callable<V> callable;//任务执行的返回值,也就是callable执行的结果//此结果并没有可见性修饰是因为对他的操作都是原子操作private Object outcome; //此线程用于运行callableprivate volatile Thread runner;//等待任务执行结果节点private volatile WaitNode waiters;//获取最终的执行结果,可以看出是private修饰的是它内部使用的方法@SuppressWarnings("unchecked")private V report(int s) throws ExecutionException {//获取callable的返回结果Object x = outcome;//如果传入的状态是正常结束则返回获取的结果if (s == NORMAL)return (V)x;//如果获取的结果是取消或者中断则抛出取消异常if (s >= CANCELLED)throw new CancellationException();//可以通过上方的结果进行排除还有异常状态、初始状态、执行完成状态//但是这里只处理了异常状态并且包装成ExecutionException//因为其与两个状态并不会进入此方法,在下面会讲述throw new ExecutionException((Throwable)x);}//构造函数传入的Callable的实现对象//并且初始化状态为newpublic FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // 这里操作state将会叫所有操作此对象的线程得知state状态初始化为了new,也是volatile修饰的原因}//第二个构造器,支持传入runnable实现,但是任务执行的试callable所以需要第二个参数传入runnable的返回值public FutureTask(Runnable runnable, V result) {//通过传入的两个参数调用了Excetors用来创建callable实现//返回时RunnableAdapter的实例,此对象非常简单就是调用runnable的run方法并且返回传入resultthis.callable = Executors.callable(runnable, result);this.state = NEW;}//判断任务是否取消,中断也是取消所以此处大于等于取消public boolean isCancelled() {return state >= CANCELLED;}//判断是否完成,只要不是NEW都算完成,所以此方法判断是否执行完毕得到的结果是错误的。//因为运行中状态并不能算是完成因为并没有确切的结果public boolean isDone() {return state != NEW;}//取消执行任务//mayInterruptIfRunning 参数比较有含义,具体含义需要根据代码讲解public boolean cancel(boolean mayInterruptIfRunning) {//进入此方法先判断state==new 当前状态是new并且对state进行原子操作。//pareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)//传入当前对象并且传入state在当前对象中的内存偏移传入预计的值代表我认为当前state值是new,然后判断mayInterruptIfRunning如果为true则代表终端中,否则是取消状态//在最外层有个取反动作,代表如果当前任务状态是new并且成功设置为了中断中或者取消状态则执行下面的语句进行后续处理,如果当前状态已经不是new则返回false代表取消失败//这里有点绕,可以理解为如果当前状态是NEW并没有运行则代表取消失败如果当前状态是其他状态则会继续向下运行。if (!(state == NEW &&pareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try { //进入此处需要注意的是当前任务已经在运行中//而mayInterruptIfRunning变量继续发挥作用如果是true//上面代码也看到是true则设置为中断中状态所以此处获取当前任务的执行线程并且调用中断if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final state//最终结果一定是一个具体的结果,而上方设置为了中断中,所以在线程调用中断后再次设置状态为中断UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {//当中断操作完成则进行完成操作finishCompletion();}// 这里做个小结: 此方法取消状态为false 则代表此方法并未运行,为true则代表此方法运行中已经取消,而mayInterruptIfRunning参数则代表是立即中断线程取消,还是等待线程正常执行完,不管哪一种最终的状态都是取消//到此处则代表取消成功return true;}//获取任务执行结果//抛出了两个异常 线程中断异常,肯定是调用了上方的取消或者线程被意外干掉,执行异常,是FutureTask的结果异常代表在任务运行期间发生了异常,之前在将report方法的时候又看到抛出了CancellationException异常,这里并没有抛出,因为此异常是运行时异常public V get() throws InterruptedException, ExecutionException {//获取当前任务运行状态int s = state;//如果任务是未运行NEW或者执行完成COMPLETING状态则if (s <= COMPLETING)//当前线程等待完成传入参数后面讲解等待的时候会进行说明s = awaitDone(false, 0L);//不管是否执行完最终都会得到结果,如果执行完则直接返回结果//如果未执行完则等待执行直到结束所以s永远都会有值//而此方法也可以叫做死等待,他会一直等待直到结束所以此方法阻塞return report(s);}//此方法是上方方法的重构,传入两个参数1、超时时间类型long2、传入这串时间的时间类型是秒还是毫秒或者说纳秒public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;//不管传入的单位是什么最终他会以纳秒进行判断//如果当前状态是未运行或者运行中状态则等待与方法调用不同的是传入了true,并且传入了超时的纳秒数,如果等到超时则抛出超时异常if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}//丢给子类的钩子在任务完成的时候进行调用的protected void done() { }//此处是设置具体的执行结果protected void set(V v) {//之前说过执行结果是原子操作,所以此处使用了原子操作进行对结果操作//但是并不是直接原子操作操作执行结果而且使用了状态值//如果当前状态是NEW则修改为执行完成,修改失败则代表此任务已经被别的线程干预了取消或者中断if (pareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//到此处说明并没有其他线程干预结果从而设置当前值为传入的voutcome = v;//此处设置当前状态为正常,并且防止执行为重排序UNSAFE.putOrderedInt(this, stateOffset, NORMAL);//当到了最终状态则调用最后的处理方法finishCompletion();}} //设置当前任务结果为异常,操作与上方设置值一样只不过最终设置结果不是正常而是异常protected void setException(Throwable t) {if (pareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}}//任务的执行方法public void run() {//如果当前任务不是NEW代表已经有人在执行了所以直接return//但是只根据这个状态判断是不准确的再具体完成时一直是NEW所以在此设置当前的运行线程为当前线程先判断当前任务的运行线程是否为null如果是则设置为当前线程,如果设置失败则代表runner已经有值了所以也是reutrnif (state != NEW ||!pareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {//设置成功则获取当前的callabelCallable<V> c = callable;//如果当前任务不是null并且当前状态是NEW//这是为了防止在这一段代码执行时其他线程进行取消操作if (c != null && state == NEW) {V result;boolean ran;try {//因为进入这里的都是执行线程了所以直接调用call方法//这里可以看到使用了try捕获执行异常如果正常则设置ran为trueresult = c.call();ran = true;} catch (Throwable ex) {//如果发生了异常则设置当前值为执行一次并且设置ran为falseresult = null;ran = false;setException(ex);}//如果ran则代表正常执行完成则设置获取到的结果if (ran)set(result);}} finally {//到此处runner设置为null,runner最终的用处就是确保run方法在并发下只执行一次,而之所以在这设置为null因为结果和最终状态已经确定了runner = null;//获取当前状态,如果在运行到此处被中断或者去取消则进行之后的处理int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}//此方法与run方法最大的区别在于run方法只执行一次,而此方法会将当前任务重置可以再次执行,用于周期任务的执行protected boolean runAndReset() {//同样使用当前状态和当前执行线程用来锁住方法在同一次执行只执行一次,因为可能多个线程同时执行此方法if (state != NEW ||!pareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;//获取执行结果和当前任务的状态boolean ran = false;int s = state;try {//获取当前需要执行的具体任务callableCallable<V> c = callable;//任务不是null并且当前状态是NEW则调用call方法并且设置允运行结果状态为true,运行失败则设置结果内容为执行异常if (c != null && s == NEW) {try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}}} finally {//与run方法一致runner = null;s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}//因为此处没有设置结果为正常 所以当正常执行完成状态应该还是NEW//如果不是NEW则代表运行时发生可异常从而运行失败返回falsereturn ran && s == NEW;}//所谓的处理中断就是等待取消方法的中断,因为在讲取消方法的时候先会将当前状态设置为INTERRUPTING,然后进过一系列处理再将它设置为INTERRUPTED(这里是指mayInterruptIfRunning为true如果为false则此方法不会有执行操作因为只有s为INTERRUPTING的时候才会进入while)//而由于cpu执行速度非常快并不一定执行到这里会将cancel方法执行完。private void handlePossibleCancellationInterrupt(int s) {//所以此处判断当前状态如果是INTERRUPTING则告诉cpu我没有什么可以执行了放弃当前占用的cpu资源,如果cpu忽略你的放弃或者并没有其他资源用到cpu从而会是死循环不听的说我放弃cpu资源//直到状态修改为最终的INTERRUPTED此方法结束if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield(); }//等待节点,在获取结果数据的时候可能会有很多线程等待获取从而不确定数量所以java采用了链表作为等待返回的唤醒,在讲解创建的时候将会详细介绍static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}//之前在设置结果的时候不管是正常结束还是异常结束设置结果或者是 取消都使用到了此方法//而此方法的目的就是释放等待结果的线程,在get中有awateDone方法用于当前获取结果的线程等待执行结果,而此方法就是对线程的唤醒private void finishCompletion() {// assert state > COMPLETING;//声明q临时变量,并且将当前的等待链表赋值给他进行for遍历条件是q不等null等于null说明链表遍历完毕for (WaitNode q; (q = waiters) != null;) {//赋值过后紧接着给当前的等待列表赋值为null,可以看出采用了cas比较并且交换如果在赋值后等待列表发生了改变那么将会使此处比较失败从而不进if继续for循环进行复制操作。if (pareAndSwapObject(this, waitersOffset, q, null)) {//如果比较成功并且设置为null则代表再次之间等待链表并没有发生改变,从而进入这个死循环for (;;) {//获取当前节点的线程Thread t = q.thread;if (t != null) {//不等于null则唤醒当前线程q.thread = null;LockSupport.unpark(t);}//然后获取当前节点的nextWaitNode next = q.next;//如果等于null则跳出死循环if (next == null)break;//并且将q的next赋值为null//此处是为了方便GC回收从而断开q.next的引用q.next = null; //将next节点赋值给q//直到next==null的时候跳出循环,再次查看当前的等待链表是否又有新的等待线程如果有则继续重复否则此方法结束q = next;}break;}}//调用完成方法,此方法在这里是空的如果有特殊需要实现子类继承即可done();//将当前的任务设置为null,以便减少对内存的引用这样方便回收callable = null; }//此方法和上方方法对应,此方法是用来给链表里添加等待线程的//如果有线程在任务未执行完的时候调用了get方法将会进入此方法等待private int awaitDone(boolean timed, long nanos)throws InterruptedException {//获取他的死线值,死线值意思是死等结果的时间//如果设置了等待时间则timed是true则用当前时间的纳秒值加上传入的纳秒值获取等待的最终时间如果是timed是false则返回0Lfinal long deadline = timed ? System.nanoTime() + nanos : 0L;//创建一个链表节点WaitNode q = null;//是否已经加入到当前队列boolean queued = false;//进入死循环进行操作for (;;) {//如果当前线程已经被中断if (Thread.interrupted()) {//则从等待列表中移除并且抛出异常removeWaiter(q);throw new InterruptedException();}//获取当前状态int s = state;//如果大于COMPLETING运行完成状态则代表已经有具体的结果值了从而清空当前节点中的线程引用方便GC回收if (s > COMPLETING) {if (q != null)q.thread = null;return s;}//如果当前状态是运行完成则告诉系统当前cpu资源可以不用了,因为下面执行都是对线程等待的操作,既然已经知道结果了所以就跳过等待的操作从而进入上一个if直接返回结果else if (s == COMPLETING) // cannot time out yetThread.yield();//走到这里状态肯定是NEW等待执行或者执行中从而创建一个节点new 此节点的构造自动赋值thread为当前线程else if (q == null)q = new WaitNode();//到此处则判断是否加入队列而这里的queued代表是否已经加入队列//false代表并为加入所以取反进入ifelse if (!queued)//因为说是链表所以他的next属性指向原先的列表的头从而成为新的头部//然后比较当前运行时间点的等待列表是否进行了更新如果更新则继续循环判断上方的if否则将当前的等待列表进行更新queued = pareAndSwapObject(this, waitersOffset,q.next = waiters, q);//如果到此说明已经加入到了等待列表并且当前等待是有设置超时的else if (timed) {//从而判断在循环中循环的时间是否达到了用户设置的超时时间nanos = deadline - System.nanoTime();//如果达到则是小于等于0 if (nanos <= 0L) {//从而删除节点并且返回当前的状态,因为等待时间已经超过用户的设置时间removeWaiter(q);return state;}//否则则用互斥锁将当前的线程进行阻塞并且限制时间LockSupport.parkNanos(this, nanos);}else//如果没有设置时间则永久等待直到唤醒LockSupport.park(this);//这里要注意park、parkNanos两个方法是阻塞代表如果唤醒将还会在此处代码,说明唤醒后还在此处死循环这样再次经过上方的判断从而得到结果,当然写入等待节点只会执行一次}}//删除等待节点private void removeWaiter(WaitNode node) {//当前传入节点不是nullif (node != null) {//设置当前节点的线程引用nullnode.thread = null;//设置死循环的别名叫retryretry:for (;;) {//声明三个变量pred q s,这三个变量都是WaitNode类型//pred 代表头node//q 当前链表头//s 当前链表头的next//条件是q != null//而操作是q = s代表每次循环q都已经修改为q.nextfor (WaitNode pred = null, q = waiters, s; q != null; q = s) {//给s赋值当前的链表头s = q.next;//如果q的线程不是null则将q设置为链表头//如果等于null则代表此节点需要删除因为在进入删除节点方法的时候就设置节点线程值为null了if (q.thread != null)pred = q;//如果头节点不是null则代表删除节点已经不是头节点了所以讲头结点的next设置为之前链表头的next节点else if (pred != null) {pred.next = s;//进行检查是否当前节点被移除 如果是则重新进入死循环if (pred.thread == null) // check for racecontinue retry;}//如果第一个节点就是q则通过cas进行移除第一个节点,否则通过上方逻辑进行移除else if (!pareAndSwapObject(this, waitersOffset,q, s))continue retry;}break;}//此方法运行案例//链表值[1,2,3,4,5]//如果要移除3//第一次循环值为 pred = 1 q = 1 s = 2//第二次循环值为 pred = 2 q = 2 s = 3//第三次循环值为 pred = 3 q = 2 q.next = 4 此处链表结果为:[1,2,4,5]//第四次循环值为 pred = 4 q = 4 s = 5//第五次循环置为 pred = 5 q = 5 s = null从而跳出循环进入外层死循环//如果要移除1//第一次循环 pred = null q = 1 s = 2 通过cas替换当前等待链表的头从而结果为[2,3,4,5]}}// 文章中有很多cas操作而cas操作需要使用到内存中的地址偏移值而此处则是通过静态代码块获取三个属性的偏移值一遍操作的时候使用。//因为类的成员偏移是不会变的不管你是否赋值都会有哪一段空间所以此处使用静态获取并且操作的时候传入了this进行了优化否则个对象都需要获取偏移。private static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。