Show AgentInterceptor.java syntax highlighted
/*
* Copyright (c) 2006
* Helsinki Institute of Physics
* see LICENSE file for details
*
* AopProcessor.java
* Created on 09-May-2006
*/
package fi.hip.gb.mobile;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.rmi.RemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.aop.annotation.AnnotationElement;
import org.jboss.aop.joinpoint.MethodInvocation;
import fi.hip.gb.core.JobAttachment;
import fi.hip.gb.core.SerializedObject;
import fi.hip.gb.core.WorkDescription;
import fi.hip.gb.core.WorkResult;
import fi.hip.gb.core.WorkStatus;
import fi.hip.gb.net.ChangeListener;
import fi.hip.gb.net.ComputingInterface;
import fi.hip.gb.net.ClientSession;
import fi.hip.gb.server.scheduler.Scheduler;
/**
* Interceptor for one agent instance. When using singleton agents,
* this interceptor instance is stored for all agent instances.
*
* @author Juho Karppinen
*/
public class AgentInterceptor implements ChangeListener
{
/** session object */
private ClientSession session = null;
private static Log log = LogFactory.getLog(AgentInterceptor.class);
public Object executeAgent(MethodInvocation invocation) throws Throwable {
Method method = invocation.getMethod();
log.debug("Executing " + invocation.getTargetObject().getClass().getName() + "." + method);
try {
this.session = AgentManager.getSession(invocation.getTargetObject());
if(this.session == null) {
// no session exists, return
return invocation.invokeNext();
}
this.session.setChangeListener(this.session.getDescription().currentID(), this, Boolean.TRUE);
//String workDir = Config.getWorkingDir(this.session.getDescription().getParentID());
// scheduler
String targetService = this.session.getDescription().getServiceURL();
String migratingSession =
this.session.getDescription().flags().getProperty(Scheduler.AUTO_MIGRATE, "0");
if(Boolean.parseBoolean(migratingSession)) {
// no sticky sessions, migrate the agent
targetService = AgentManager.getRandomHost();
}
// work description creation for the method call
WorkDescription wds = new WorkDescription(targetService);
wds.getExecutable().setMethodName(method.getName());
if(invocation.getArguments() != null) {
wds.getExecutable().putParameters(invocation.getArguments());
log.debug("method call " + wds.getExecutable().printCallDebug());
}
// append new method call to existing agent
wds.setJobID(new Long[] {this.session.getDescription().currentID(), wds.currentID()});
this.session.getDescription().addChildren(wds);
log.debug("appending job description " + wds);
session.dispatch(wds);
// wait until results are available or exception is thrown
RemoteException exception = null;
Object result = null;
synchronized (this) {
while (exception == null && result == null) {
try{
// get results and search the correct ID
for(WorkResult r : this.session.getResult().getChildren()) {
if(r.currentID().equals(wds.currentID())) {
// our method call has returned something
if(method.getReturnType().equals(Void.TYPE) == false) {
// we have got something to return, deserialise the object
JobAttachment jr = r.firstResult();
if(File.class.getName().equals(jr.getType())) {
result = new File(jr.fileURL().getPath());
} else {
SerializedObject s = new SerializedObject();
s.setBytes(jr.readContent());
result = s.take();
//} else {
// throw new UnsupportedEncodingException("Unknown result type " + jr.getType());
}
} else {
result = Void.TYPE;
}
log.info("found result for " + wds.currentID() + " : " + result);
}
}
if(this.session.getStatus().getError() != null) {
exception = new RemoteException(this.session.getStatus().getError());
}
if (exception == null && result == null)
wait();
} catch(Exception e) {
log.error("Failed to fetch results", e);
}
}
}
AgentUndeployer undeployer = (AgentUndeployer)AnnotationElement.getAnyAnnotation(method, AgentUndeployer.class);
if(undeployer != null) {
// we are in the deconstructor method, the method is called so its safe to remove the agent now
this.session.abort(this.session.getDescription().currentID(), ComputingInterface.ABORT_AND_REMOVE);
AgentManager.removeSession(invocation.getTargetObject());
}
if(exception != null)
throw exception;
else
return result;
} catch (RemoteException re) {
String message = "Operation failed because of " + re.getMessage();
log.error(message, re);
throw re;
}
}
/*
* @see fi.hip.gb.net.ChangeListener#statusChanged(fi.hip.gb.core.WorkStatus)
*/
public synchronized void statusChanged(WorkStatus status) throws RemoteException {
log.debug("status changed");
if(status.finished()) {
notifyAll();
}
}
/*
* @see fi.hip.gb.net.ChangeListener#resultChanged(fi.hip.gb.core.WorkResult)
*/
public synchronized void resultChanged(WorkResult result) throws RemoteException {
log.debug("result changed " + result);
for(WorkResult sub : result.getChildren()) {
//this.result = ObjectSerializer.deSerialize(results.get(0).readContent());
this.session.getResult().insertSubResult(sub);
}
notifyAll();
log.debug("result notified");
}
}
See more files for this project here