Code Search for Developers
 
 
  

AgentInterceptor.java from GridBlocks at Krugle


Show AgentInterceptor.java syntax highlighted

/*
 * Copyright (c) 2006 
 * Helsinki Institute of Physics
 * see LICENSE file for details
 * 
 * AopProcessor.java 
 * Created on 09-May-2006
 */
package fi.hip.gb.mobile;

import java.io.File;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.rmi.RemoteException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.aop.annotation.AnnotationElement;
import org.jboss.aop.joinpoint.MethodInvocation;

import fi.hip.gb.core.JobAttachment;
import fi.hip.gb.core.SerializedObject;
import fi.hip.gb.core.WorkDescription;
import fi.hip.gb.core.WorkResult;
import fi.hip.gb.core.WorkStatus;
import fi.hip.gb.net.ChangeListener;
import fi.hip.gb.net.ComputingInterface;
import fi.hip.gb.net.ClientSession;
import fi.hip.gb.server.scheduler.Scheduler;

/**
 * Interceptor for one agent instance. When using singleton agents,
 * this interceptor instance is stored for all agent instances.
 * 
 * @author Juho Karppinen
 */
public class AgentInterceptor implements ChangeListener
{
    /** session object */
    private ClientSession session = null;
    
    private static Log log = LogFactory.getLog(AgentInterceptor.class);
    
    public Object executeAgent(MethodInvocation invocation) throws Throwable {
        Method method = invocation.getMethod();
        log.debug("Executing " + invocation.getTargetObject().getClass().getName() + "." + method);
        try {
            this.session = AgentManager.getSession(invocation.getTargetObject());
            if(this.session == null) {
                // no session exists, return
                return invocation.invokeNext();
            }
            
            this.session.setChangeListener(this.session.getDescription().currentID(), this, Boolean.TRUE);

            //String workDir = Config.getWorkingDir(this.session.getDescription().getParentID());
            
            // scheduler
            String targetService = this.session.getDescription().getServiceURL();
            String migratingSession = 
                this.session.getDescription().flags().getProperty(Scheduler.AUTO_MIGRATE, "0");
            if(Boolean.parseBoolean(migratingSession)) {
                // no sticky sessions, migrate the agent
                targetService = AgentManager.getRandomHost();
            }
            
            // work description creation for the method call
            WorkDescription wds = new WorkDescription(targetService);
            wds.getExecutable().setMethodName(method.getName());
            if(invocation.getArguments() != null) {
                wds.getExecutable().putParameters(invocation.getArguments());
                log.debug("method call " + wds.getExecutable().printCallDebug());
            }
            
            // append new method call to existing agent
            wds.setJobID(new Long[] {this.session.getDescription().currentID(), wds.currentID()});
            this.session.getDescription().addChildren(wds);
            log.debug("appending job description " + wds);
            session.dispatch(wds);
            
            // wait until results are available or exception is thrown
            RemoteException exception = null;
            Object result = null;
            synchronized (this) {
                while (exception == null && result == null) {
                    try{
                        // get results and search the correct ID
                        for(WorkResult r : this.session.getResult().getChildren()) {
                            if(r.currentID().equals(wds.currentID())) {
                                // our method call has returned something
                                
                                if(method.getReturnType().equals(Void.TYPE) == false) {
                                    // we have got something to return, deserialise the object
                                    JobAttachment jr = r.firstResult();
                                    if(File.class.getName().equals(jr.getType())) {
                                        result = new File(jr.fileURL().getPath());
                                    } else {
                                        SerializedObject s = new SerializedObject();
                                        s.setBytes(jr.readContent());
                                        result = s.take();
                                    //} else {
                                    //    throw new UnsupportedEncodingException("Unknown result type " + jr.getType());
                                    }
                                } else {
                                    result = Void.TYPE;
                                }
                                log.info("found result for " + wds.currentID() + " : " + result);
                            }
                        }
                        
                        if(this.session.getStatus().getError() != null) {
                            exception = new RemoteException(this.session.getStatus().getError());
                        }
                        
                        if (exception == null && result == null)
                            wait();
                    } catch(Exception e) {
                        log.error("Failed to fetch results", e);
                    }
                }
            }
            
            AgentUndeployer undeployer = (AgentUndeployer)AnnotationElement.getAnyAnnotation(method, AgentUndeployer.class);
            if(undeployer != null) {
                // we are in the deconstructor method, the method is called so its safe to remove the agent now
                this.session.abort(this.session.getDescription().currentID(), ComputingInterface.ABORT_AND_REMOVE);
                AgentManager.removeSession(invocation.getTargetObject());
            }

            if(exception != null)
                throw exception;
            else 
                return result;
        } catch (RemoteException re) {
            String message = "Operation failed because of " + re.getMessage();
            log.error(message, re);
            throw re;
        }
    }

    /*
     * @see fi.hip.gb.net.ChangeListener#statusChanged(fi.hip.gb.core.WorkStatus)
     */
    public synchronized void statusChanged(WorkStatus status) throws RemoteException {
        log.debug("status changed");
        if(status.finished()) {
            notifyAll();
        }
    }

    /*
     * @see fi.hip.gb.net.ChangeListener#resultChanged(fi.hip.gb.core.WorkResult)
     */
    public synchronized void resultChanged(WorkResult result) throws RemoteException {
        log.debug("result changed " + result);
        for(WorkResult sub : result.getChildren()) {
            //this.result = ObjectSerializer.deSerialize(results.get(0).readContent());
            this.session.getResult().insertSubResult(sub);
        }
        notifyAll();
        log.debug("result notified");
    }
}



See more files for this project here

GridBlocks

GridBlocks builds a grid application framework via easy-to-use building blocks in distributed environment. The framework offers components for Grid security, distributed storage, computing, and Portlet web interfaces.

Project homepage: http://sourceforge.net/projects/gridblocks
Programming language(s): Java,JSP,XML
License: other

  AgentApi.java
  AgentInterceptor.java
  AgentManager.java
  AgentUndeployer.java
  MobileAgent.java
  package.html