Future Pattern
在Thread-Per-Message Pattern中,我们研究过“收到每个请求建立一个线程”的做法,但这样的请求是不要求有返回值的。如果当需要返回值,但由于后台处理需要很久,返回值 不能马上获取,那么就可以使用 Future Pattern。Future Pattern同样会每个请求建立一个线程处理,同时会马上返回一个对象,但该对象并不是真正的返回值,真正的返回值可能现在还没有准备好,不过客户端可 以根据这个返回对象,在之后的时间来获取真正的返回值。
publicinterfaceData{
publicStringgetContent();
}
publicclassRealDataimplementsData{
privateStringcontent;
publicRealData(intcount,charc){
System.out.println("makingRealData("+count+","+c+")Begin.");
char[]buffer=newchar[count];
for(inti=0;i
buffer[i]=c;
slowly();
}
this.content=String.valueOf(buffer);
System.out.println("makingRealData("+count+","+c+")End.");
}
@Override
publicStringgetContent(){
returnthis.content;
}
privatevoidslowly(){
try{
Thread.sleep(100);
}catch(InterruptedExceptione){
}
}
}
publicclassFutureDataimplementsData{
privateRealDatarealData;
privatebooleanready=false;
publicsynchronizedvoidsetRealData(RealDatarealData){
if(ready){
return;
}
this.realData=realData;
this.ready=true;
notifyAll();
}
@Override
publicsynchronizedStringgetContent(){
while(!ready){
try{
wait();
}catch(InterruptedExceptione){
}
}
returnthis.realData.getContent();
}
}
publicclassHost{
publicDatahandle(finalintcount,finalcharc){
System.out.println("handle("+count+","+c+")Begin.");
finalFutureDatafutureData=newFutureData();
newThread(){
@Override
publicvoidrun(){
RealDatarealData=newRealData(count,c);
futureData.setRealData(realData);
}
}.start();
System.out.println("handle("+count+","+c+")End.");
returnfutureData;
}
}
publicclassMain{
publicstaticvoidmain(String[]args){
System.out.println("mainBegin.");
Hosthost=newHost();
Datadata1=host.handle(10,'a');
Datadata2=host.handle(20,'b');
Datadata3=host.handle(30,'c');
System.out.println("mainotherjobBegin.");
try{
Thread.sleep(2000);
}catch(InterruptedExceptione){
}
System.out.println("mainotherjobEnd.");
System.out.println("data1="+data1.getContent());
System.out.println("data2="+data2.getContent());
System.out.println("data3="+data3.getContent());
System.out.println("mainEnd.");
}
}
在Worker Thread Pattern中,我们讨论过“方法调用”和“方法执行”的分离。而Future Pattern 分离了“准备返回值”和“使用返回值”。我们在Futtern Pattern中,可以看到设计模式Proxy Pattern的实现。
Two-Phase Termination Pattern
Two-Phase Termination
Pattern很简单,但该模式提供了一种结束线程的优雅方法。java.lang.Thread类有一个用来强制结束掉线程的stop()方法。但是
stop方法已经不建议使用(deprecated),原因是stop()方法会使实例丧失安全性的保障。使用stop()方法时,线程会抛出
java.lang.ThreadDeath异常而马上结束,即使该线程现在正在执行灵界区间(例如synchronized方法的中间),也会马上结
束。
publicclassCountupThreadextendsThread{
privatebooleanisShutdown=false;
privateintcount=0;
@Override
publicvoidrun(){
try{
while(isShutdown){
doWork();
}
}catch(InterruptedExceptione){
}finally{
doShutdown();
}
}
publicvoidshutdownReqeust(){
this.isShutdown=true;
interrupt();
}
privatevoiddoShutdown(){
System.out.println("doShutdown:currentcountis"+this.count);
}
privatevoiddoWork()throwsInterruptedException{
System.out.println("currencountis"+++count);
Thread.sleep(500);
}
publicstaticvoidmain(String[]args){
System.out.println("mainBegin.");
CountupThreadcountupThread=newCountupThread();
countupThread.start();
try{
Thread.sleep(100000);
}catch(InterruptedExceptione){
}
System.out.println("main:shutdownrequest.");
countupThread.shutdownReqeust();
System.out.println("main:join");
//等待线程结束
try{
countupThread.join();
}catch(InterruptedExceptione){
}
System.out.println("mainEnd.");
}
}
Thread-Specific Storage Pattern
Thread-Specific Storage
Pattern就是“线程独有的存储库”、“针对每个线程提供的内存空间”的意义。java.lang.ThreadLocal的实例可以想象成一种集合
架构(collection)或许会比较好理解。ThreadLocal的实例只有一个,管理多个对象。
publicclassLog{
privatestaticfinalThreadLocaltsLogCollection=newThreadLocal();
publicstaticvoidprintln(Strings){
getTSLog().printWrite(s);
}
publicstaticvoidclose(){
getTSLog().close();
}
privatestaticTSLoggetTSLog(){
TSLogtsLog=tsLogCollection.get();
//如果线程时第一次调用,新建立新文件并注册log
if(tsLog==null){
tsLog=newTSLog(Thread.currentThread().getName()+"-log.txt");
tsLogCollection.set(tsLog);
}
returntsLog;
}
}
importjava.io.FileNotFoundException;
importjava.io.PrintWriter;
publicclassTSLog{
privatePrintWriterwriter;
publicTSLog(Stringfilename){
try{
this.writer=newPrintWriter(filename);
}catch(FileNotFoundExceptione){
}
}
publicvoidprintWrite(Strings){
writer.println(s);
}
publicvoidclose(){
writer.println("===========Endoflog===========");
writer.close();
}
}
publicclassClientThreadextendsThread{
publicClientThread(Stringname){
super(name);
}
@Override
publicvoidrun(){
System.out.println(getName()+"Begin.");
for(inti=0;i
Log.println("i="+i);
try{
Thread.sleep(100);
}catch(InterruptedExceptione){
}
}
Log.close();
System.out.println(getName()+"End.");
}
publicstaticvoidmain(String[]args){
newClientThread("Alice").start();
newClientThread("Bobby").start();
newClientThread("Chris").start();
}
}
Active Object Pattern
Active Object
Pattern其实可以看作是多个多线程模式和多个设计模式组合成的一种更高级的模式,里面多个对象各司其职,共同协作。Active Object
Pattern里面使用到了Producer-Consumer Pattern、Thread-Per-Message Pattern、Future
Pattern和设计模式的Proxy Pattern、Command Pattern等。
Server端代码:
publicinterfaceActiveObject{
publicResultmakeString(intcount,charfillchar);
publicvoiddisplayString(Stringstring);
}
publicclassProxyimplementsActiveObject{
privateSchedulerThreadscheduler;
privateServantservant;
publicProxy(SchedulerThreadscheduler,Servantservant){
this.scheduler=scheduler;
this.servant=servant;
}
@Override
publicResultmakeString(intcount,charfillchar){
FutureResultfuture=newFutureResult();
MakeStringRequestrequest=newMakeStringRequest(servant,future,count,fillchar);
this.scheduler.invoke(request);
returnfuture;
}
@Override
publicvoiddisplayString(Stringstring){
DisplayStringRequestrequest=newDisplayStringRequest(servant,string);
this.scheduler.invoke(request);
}
}
publicclassServantimplementsActiveObject{
@Override
publicResultmakeString(intcount,charfillchar){
char[]buffer=newchar[count];
for(inti=0;i
buffer[i]=fillchar;
try{
Thread.sleep(500);
}catch(InterruptedExceptione){
}
}
RealResultresult=newRealResult(String.valueOf(buffer));
returnresult;
}
@Override
publicvoiddisplayString(Stringstring){
System.out.println("displayString("+string+")");
try{
Thread.sleep(10);
}catch(InterruptedExceptione){
}
}
}
publicinterfaceResult{
publicStringgetResultValue();
}
publicclassFutureResultimplementsResult{
privateResultresult;
privatebooleanisReady=false;
publicsynchronizedvoidsetResult(Resultresult){
if(isReady){
return;
}
this.result=result;
this.isReady=true;
notifyAll();
}
@Override
publicsynchronizedStringgetResultValue(){
while(!isReady){
try{
wait();
}catch(InterruptedExceptione){
}
}
returnresult.getResultValue();
}
}
publicclassRealResultimplementsResult{
privateStringresultValue;
publicRealResult(StringresultValue){
this.resultValue=resultValue;
}
@Override
publicStringgetResultValue(){
returnthis.resultValue;
}
}
publicabstractclassMethodRequest{
protectedfinalServantservant;
protectedfinalFutureResultfuture;
publicMethodRequest(Servantservant,FutureResultfuture){
this.servant=servant;
this.future=future;
}
publicabstractvoidexecute();
}
publicclassMakeStringRequestextendsMethodRequest{
privateintcount;
privatecharfillchar;
publicMakeStringRequest(Servantservant,FutureResultfuture,intcount,charfillchar){
super(servant,future);
this.count=count;
this.fillchar=fillchar;
}
@Override
publicvoidexecute(){
Resultresult=this.servant.makeString(count,fillchar);
future.setResult(result);
}
}
publicclassDisplayStringRequestextendsMethodRequest{
privateStringstring;
publicDisplayStringRequest(Servantservant,Stringstring){
super(servant,null);
this.string=string;
}
@Override
publicvoidexecute(){
this.servant.displayString(string);
}
}
publicclassSchedulerThreadextendsThread{
privateActivationQueuequeue=newActivationQueue();
publicvoidinvoke(MethodRequestrequest){
this.queue.putRequest(request);
}
@Override
publicvoidrun(){
while(true){
this.queue.takeRequest().execute();
}
}
}
packageactiveobject.server;
importjava.util.LinkedList;
publicclassActivationQueue{
privatefinalLinkedListrequestQueue=newLinkedList();
privatefinalintqueueSize=100;
publicsynchronizedvoidputRequest(MethodRequestrequest){
while(this.requestQueue.size()>=queueSize){
try{
wait();
}catch(InterruptedExceptione){
}
}
this.requestQueue.addLast(request);
notifyAll();
}
publicsynchronizedMethodRequesttakeRequest(){
while(this.requestQueue.size()==0){
try{
wait();
}catch(InterruptedExceptione){
}
}
MethodRequestrequest=this.requestQueue.removeFirst();
notifyAll();
returnrequest;
}
}
publicclassActiveObjectFactory{
publicstaticActiveObjectcreateActiveObjcet(){
Servantservant=newServant();
SchedulerThreadscheduler=newSchedulerThread();
Proxyproxy=newProxy(scheduler,servant);
scheduler.start();
returnproxy;
}
}
UML如下图:
客户端代码:
importactiveobject.server.ActiveObject;
publicclassDisplayClientThreadextendsThread{
privateActiveObjectactiveObj;
publicDisplayClientThread(Stringname,ActiveObjectactiveObj){
super(name);
this.activeObj=activeObj;
}
@Override
publicvoidrun(){
inti=0;
while(true){
i++;
Stringstring=getName()+"No."+i;
activeObj.displayString(string);
}
}
}
importactiveobject.server.ActiveObject;
importactiveobject.server.Result;
publicclassMakerClientThreadextendsThread{
privatefinalActiveObjectactiveObj;
privatefinalcharfillchar;
publicMakerClientThread(Stringname,ActiveObjectactiveObj){
super(name);
this.activeObj=activeObj;
this.fillchar=name.charAt(0);
}
@Override
publicvoidrun(){
inti=0;
while(true){
i++;
Resultresult=activeObj.makeString(i,fillchar);
try{
Thread.sleep(1000);
}catch(InterruptedExceptione){
}
StringresultValue=result.getResultValue();
System.out.println(Thread.currentThread().getName()+":value="+resultValue);
}
}
}
importactiveobject.server.ActiveObject;
importactiveobject.server.ActiveObjectFactory;
publicclassMain{
publicstaticvoidmain(String[]args){
ActiveObjectactiveObj=ActiveObjectFactory.createActiveObjcet();
newMakerClientThread("Alice",activeObj).start();
newMakerClientThread("Bobby",activeObj).start();
newDisplayClientThread("Chris",activeObj).start();
}
}