Code Search for Developers
 
 
  

DiscoveryService.java from GridBlocks at Krugle


Show DiscoveryService.java syntax highlighted

/*
 * Copyright (c) 2005
 * Helsinki Institute of Physics
 * see LICENSE file for details
 *
 * DiscoveryService.java
 * Created on Aug 5, 2004 
 */
package fi.hip.gb.net.discovery;

import java.io.File;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.NetworkInterface;
import java.net.URL;
import java.net.UnknownHostException;
import java.rmi.RemoteException;
import java.util.Enumeration;
import java.util.Hashtable;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import fi.hip.gb.core.Config;
import fi.hip.gb.net.Discovery;
import fi.hip.gb.utils.ArrayUtils;
import fi.hip.gb.utils.ClassLoaderUtils;
import fi.hip.gb.utils.TextUtils;

/**
 * An default implementation of the {@link Discovery} interface
 * used for discovering GBAgent servers and services. This class
 * is accessible from outside using  {@link fi.hip.gb.net.rpc.DiscoveryService}
 * SOAP stub.
 * <p>
 * The service doesn't know its own address until this service is queried once
 * through AXIS stack. The reason is we cannot discover the URL of the local
 * service to be published for others using Servlet API.
 * 
 * @author Juho Karppinen
 */
public class DiscoveryService implements Discovery {
    /** known servers, key is the service URL and value is corresponding <code>DiscoveryPacket</code> */
    private static Hashtable<String,DiscoveryPacket> knownServices = new Hashtable<String,DiscoveryPacket>();

    /** caches the list of known agents found from JAR files, key is the agent ID
     * and value is corresponding <code>AgentPacket</code> */
    private static Hashtable<String,AgentPacket> knownAgents = new Hashtable<String,AgentPacket>();
    
    /** modified times of JAR files, file URL is the key and value is timestamp */
    private static Hashtable<URL,Long> modifiedTimes = new Hashtable<URL,Long>();    
    
    /** local context URL is the URL published to others */
    private static String localContextURL = null;
    
    /** at what time services were validated */
    private static long lastValidation = -1;
    
    /** static instance of the service */
    private static DiscoveryService instance;

    private static Log log = LogFactory.getLog(DiscoveryService.class);
    
    /**
     * New discovery service is created, initialize some stuff from
     * Axis messagecontext.
     */
    public DiscoveryService() {
        /** 
         * Lets add the localhost to the list of known servers
         * <p>
         * When the first contact comes, discover the local service context URL.
         * The reason for this is:
         * - we cannot discover the URL before first contact
         * - Because localhost isn't validated with positive callback as remote services are, 
         *   we dont want to rely the port number in config file.
         * <p>
         * The published URL can be overriden with DISCOVERY_PROXY option from config file.
         * Then calls from outside goes first to the proxy (such as frontend server of the cluster),
         * who knows how to redirect messages to the real server. 
         */
        /*
        if(localContextURL == null) {
	        MessageContext context = MessageContext.getCurrentContext();
	        String proxyURL = Config.getInstance().getDiscoveryProxy();
	        
	        if(proxyURL != null) {
	            // trust the config file 
                localContextURL = proxyURL;
                log.info("Setting local context to proxy URL " + proxyURL);
	        } else if (context != null) {
                // get the url and remove services/discovery out
                String url = context.getStrProp(MessageContext.TRANS_URL);
                if(url.toLowerCase().indexOf("localhost") == -1) {
                    // don't use localhost
	                int index = url.indexOf("/services/discovery");
                    localContextURL = url.substring(0, index);
                    log.info("Localhost context URL replaced with " + localContextURL);
                } else {
                    log.info("Skipping localhost context URL " + url);
                }
	        }
	        if(localContextURL != null) {
                DiscoveryPacket dp = new DiscoveryPacket(localContextURL);
                dp.setAgents(getSupportedAgents());
                dp.jars(DiscoveryService.getInstance().getDeployedJars());
                knownServices.put(localContextURL, dp);
	        }
        }*/
    }

    /**
     * Gets the static instance of the discovery service.
     * @return instance for the service
     */
    public static DiscoveryService getInstance() {
        if(instance == null) {
            instance = new DiscoveryService();
        }
        return instance;
    }
    
    /**
     * Gets local context URL. It is either automatically discovered or
     * defined in config file.
     * @see fi.hip.gb.net.Discovery#getLocalService() 
     */
    public String getLocalService() throws RemoteException {
        if(localContextURL != null)
            return localContextURL;
        try {
            String proxyURL = null;//Config.getInstance().getDiscoveryProxy();
            if(proxyURL == null) {
                String globala = ""; //store the address here
                Enumeration<NetworkInterface> e1 =
                    (Enumeration<NetworkInterface>)NetworkInterface.getNetworkInterfaces();
                while(e1.hasMoreElements()) {
                    NetworkInterface ni = e1.nextElement();
                    Enumeration<InetAddress> e2 = ni.getInetAddresses();
                    //get the "right" IP4 address, omit IP6's and localhost
                    while(e2.hasMoreElements()) {
                        InetAddress ia = e2.nextElement();
                        String ias = ia.toString();
                        if (ias.indexOf('.') > -1) {
                            if (ias.indexOf("127.0.0.1") == -1) {
                                //This is not localhost
                                globala = ias;
                            }
                        }
                    }
                }
                if ("".equals(globala)) 
                    throw new UnknownHostException("No global IP in interfaces");
                proxyURL = "http:/" + globala + ":" 
                    //+ Config.getInstance().getDiscoveryPort() + "/" 
                    + Config.getInstance().getContextPath();
                log.debug("getLocalService returns "+proxyURL+" globala is "+globala);
            }
            return proxyURL;
        } catch (Exception uhe) {
            log.error("Could not find local IP");
        }
        //throw new RemoteException("Local context URL cannot be found");
        return null;
    }
    
    /*
     * @see fi.hip.gb.server.Discovery#addService(DiscoveryPacket)
     */
    public void addService(DiscoveryPacket packet) throws RemoteException {
        try {
            // test if the new service really exists by calling back
            if(packet.getServiceURL().indexOf("localhost") != -1) {
                throw new RemoteException("Localhost is not valid ip-address");
            }
            fi.hip.gb.net.rpc.DiscoveryService discovery = 
                new fi.hip.gb.net.rpc.DiscoveryService(packet.getServiceURL());
            DiscoveryPacket[] packets = discovery.callback(this.listServices());
                
            // now we know that the new service is valid and (re)add it to the list
            packet.resetResponseTime();
            addValidatedService(packet);
            
            // also add the sub-services known by our new service
            for(int i=0; i < packets.length; i++) {
                // don't override if local response time is lower
                DiscoveryPacket oldPacket = (DiscoveryPacket) knownServices.get(packets[i].getServiceURL());
                if(oldPacket == null || oldPacket.getLastResponse() > packets[i].getLastResponse()) {
                    addValidatedService(packets[i]);
                }
            }
        } catch( RemoteException re) {
            log.error("Could not validate the service " + packet.getServiceURL() + "\n"
                    + re.getMessage());
            throw re;
        } catch (Exception e) {
            // new service wasn't valid, skip it
            log.error("Could not validate the service " + packet.getServiceURL() + "\n"
                    + e.getMessage());
            throw new RemoteException("Could not validate the service because of " 
                    + e.getClass().getName() + " : " + e.getMessage());
        }
    }
    
    /**
     * Adds already validated service to the database. Validation means that
     * the server exists and is reachable.
     * @param packet a validated discovery packet
     */
    public void addValidatedService(DiscoveryPacket packet) {
        if (knownServices.put(packet.getServiceURL(), packet) == null) {
            log.info("Accepting a new service to known hosts " + packet.getServiceURL()
                    + " last response " + packet.getLastResponse()/1000 + " s ago");
        } else {
            log.debug("Updating an already known host " + packet.getServiceURL()
                    + " last response " + packet.getLastResponse()/1000 + " s ago");
        }
    }

    /*
     * @see fi.hip.gb.server.Discovery#removeService(String)
     */
    public void removeService(String serviceURL) throws RemoteException {
        DiscoveryPacket dp = knownServices.get(serviceURL);
        log.info("Removing service " + serviceURL 
                + " from the list of servers. Last response was  "
                + dp.getLastResponse() 
                + " ms ago");
        knownServices.remove(serviceURL);
    }
    
    /*
     * @see fi.hip.gb.server.Discovery#listServices()
     */
    public DiscoveryPacket[] listServices() throws RemoteException {
        validateServices();
        DiscoveryPacket[] packets = 
            (DiscoveryPacket[]) knownServices.values().toArray(new DiscoveryPacket[knownServices.size()]);
        for (int i = 0; i < packets.length; i++) {
             //log.info("listing " + packets[i].getServiceURL() 
             //       + " last response " + packets[i].getLastResponse()/1000 + " s ago");
        }
        return packets;
    }
    
    /*
     * @see fi.hip.gb.server.Discovery#listAgents()
     */
    public AgentPacket[] listAgents() throws RemoteException {
        Hashtable<String,AgentPacket> agents = new Hashtable<String,AgentPacket>();
        DiscoveryPacket[] servers = this.listServices();
        
        for (int i = 0; i < servers.length; i++) {
            AgentPacket[] localAgents = servers[i].getAgents();
            for (int k = 0; k < localAgents.length; k++) {
                // create copy of the server with only one jar to the agent package
                DiscoveryPacket dp = new DiscoveryPacket(servers[i].getServiceURL());
                dp.setJars(new String[] {localAgents[k].getJarURL()});
                
                // check for duplicate services
                AgentPacket packet = (AgentPacket) agents.get(localAgents[k].getServiceClass()  + "_" + localAgents[k].getServiceID());
                if(packet != null) {

                    // we have a new server for this service
                    String localURL = getLocalService();
                    if(localURL.equals(dp.getServiceURL())) {
                        // make sure that the local server always offers itself as a first choice
                        packet.setServers((DiscoveryPacket[])ArrayUtils.insert(packet.getServers(), 0, dp));
                    } else {
                        packet.setServers((DiscoveryPacket[])ArrayUtils.append(packet.getServers(), dp));
                    }
                    log.debug("duplicate version (" 
                            + packet.getServiceID() 
                            + ") of the agent " 
                            + packet.getServiceClass()  
                            + " found from " + dp.getServiceURL());
                } else {
                    // not yet known agent service and/or version, append to the list
                    localAgents[k].setServers(new DiscoveryPacket[]{dp});
                    agents.put(localAgents[k].getServiceClass() + "_" + localAgents[k].getServiceID(), localAgents[k]);
                }
            }
            
            /* log.inform("listing " + packets[i].getServiceURL() 
                    + " last response " + packets[i].getLastResponse()/1000 + " s ago"); */
        }
        return (AgentPacket[]) agents.values().toArray(new AgentPacket[agents.size()]);
    }

    /*
     * @see fi.hip.gb.server.Discovery#callback(DiscoveryPacket[])
     */
    public DiscoveryPacket[] callback(DiscoveryPacket[] packets) throws RemoteException {
        DiscoveryPacket[] ownPackets =  this.listServices();
        
        //log.debug("Our discovery request was valid, adding new and sending back own known services");
        for (int i = 0; i < packets.length; i++) {
            // don't override if local response time is lower
            DiscoveryPacket oldPacket = (DiscoveryPacket) knownServices.get(packets[i].getServiceURL());
            if(oldPacket == null || oldPacket.getLastResponse() > packets[i].getLastResponse()) {
                if(knownServices.put(packets[i].getServiceURL(), packets[i]) == null) {
                    log.info("Received a new service from our discovery server " + packets[i].getServiceURL() 
                            + " last response " + packets[i].getLastResponse()/1000 + " s ago");
                } else {
                    //log.trace("Received old callback " + packets[i].getServiceURL()
                    //        + " last response " + packets[i].getLastResponse()/1000 + " s ago"); 
                }
            }
        }
        return ownPackets;
    }

    /**
     * If last information from server was made discovery rate + 400% ago, 
     * it will be considered as missing and removed from the list of services.
     */
    public void validateServices() {
        if(lastValidation == -1) {
            lastValidation = System.currentTimeMillis();
        }
        
        for (Enumeration e = knownServices.elements(); e.hasMoreElements();) {
            DiscoveryPacket packet = (DiscoveryPacket) e.nextElement();
            if (localContextURL != null 
                    && packet.getServiceURL().toString().equals(localContextURL.toString())) {
                // always trust the existence of localhost
                packet.resetResponseTime();
            } else {
                // add the elapsed time
                packet.setLastResponse(packet.getLastResponse() + (System.currentTimeMillis() - lastValidation));
                /*
                log.debug("validating " + packet.getServiceURL()
                        + " last update was "
                        + packet.getLastResponse()/1000 + " s ago");
                */
                // check with 400% treshold
                /*
                if (Config.getInstance().getDiscoveryRate()>0 && packet.getLastResponse() > 4.0f * Config.getInstance().getDiscoveryRate() * 1000.f) {
                    try {
                        removeService(packet.getServiceURL());
                    } catch (RemoteException re) {
                        log.error("Failed to remove service " + packet.getServiceURL(), re);
                    }
                }*/
            }
        }
        lastValidation = System.currentTimeMillis();
    }
    
    /**
     * Gets a list of deployed agent JAR files on this server.
     * @return an array of URLs for agent jars accessible through http protocol
     */
    public URL[] getDeployedJars() {
        return getDeployedJarTimings().keySet().toArray(new URL[0]);
    }

    /**
     * Gets a list of deployed agent JAR files on this server with last modified tag.
     * Files are found inside plugin directories.
     * @return hashtable of agent jars, keys are URLs for files accessible through http protocol, and
     * the value is last modified timestamp of the file as <code>Long</code>
     */
    public Hashtable<URL,Long> getDeployedJarTimings() {
        // list of found files with access time 
        Hashtable<URL,Long> jars = new Hashtable<URL,Long>();
        
        String localService = null;
        try {
            localService = getLocalService();
        } catch (RemoteException re) {
            log.error("Error getting local service URL", re);
        }
        if(localService == null)
            return jars;
        
        String[] deployDirs = Config.getInstance().getPluginDirectories();
        for(int i=0; i < deployDirs.length; i++) {
            File[] deployFiles = null;
            if(deployDirs[i].startsWith("/"))
                deployFiles = new File(deployDirs[i]).listFiles();
            else
                deployFiles = new File(Config.getInstance().getDataDirectory() + "/" + deployDirs[i]).listFiles();
            
            for(int k=0; deployFiles != null && k < deployFiles.length; k++) {
                try {
                    URL url = new URL(localService
                            + "/dl/"
                            + deployFiles[k].getName());
                    jars.put(url, new Long(deployFiles[k].lastModified()));
                } catch (MalformedURLException e) {
                    log.error("Could not create URL for jar " + deployFiles[k]);
                }
                
            }
        }
        return jars;
    }
    
    /**
     * Gets the supported agents by the local server. This information is taken
     * from cache if timestamps of files has not changed 
     * 
     * @return an array of unique agents, may contain multiple items for the same
     * agent class but with different versions.
     */
    public AgentPacket[] getSupportedAgents() {
        // current list of available JAR files
        Hashtable<URL,Long> deployJars = getDeployedJarTimings();
        
        // compare timestamps to cached values removing all identical files from the hashtable 
        for(Enumeration<URL> e = modifiedTimes.keys(); e.hasMoreElements();) {
            URL cachedFile = e.nextElement();
            Long cachedTime = modifiedTimes.get(cachedFile);
            Long currentTime = deployJars.get(cachedFile);
            if(currentTime == null) {
                // file has been removed
                log.debug("Agent JAR file " + cachedFile + " has been removed");
                modifiedTimes.remove(cachedFile);
                
                // remove the cache entries of agents
                AgentPacket[] aps = (AgentPacket[])knownAgents.values().toArray(new AgentPacket[0]);
                for(int i=0; i < aps.length; i++) {
                    if(aps[i].getJarURL().equals(cachedFile)) {
                        log.debug("Agent " + aps[i].getServiceID() + " is unloaded");
                        knownAgents.remove(aps[i].getServiceID());
                    }
                }
            } else if(cachedTime != null && cachedTime.longValue() == currentTime.longValue()){
                // file has not changed, no need to look inside it
                deployJars.remove(cachedFile);
            }
        }
        
        // now we have only modified JARs left, check services inside them and add them into the cache
        // to prevent the continuous reloading of JAR files
        for(Enumeration<URL> jarEnum = deployJars.keys(); jarEnum.hasMoreElements();) {
            URL deployFile = jarEnum.nextElement();
            try {
                // load list of agent classes
                URL[] jars = 
                    ClassLoaderUtils.prepareJars(new URL[] {deployFile}, Config.getWorkingDir(null));
                ClassLoaderUtils loader = new ClassLoaderUtils(this, jars);
                String[] classNames = loader.getClassNames(Class.class);
                for (int k = 0; k < classNames.length; k++) {
                    Object agent = (Object)loader.loadInstance(classNames[k]);
                    AgentPacket packet = new AgentPacket(classNames[k]);
                    packet.setJarURL(deployFile.toExternalForm());
                    packet.setServiceDescription(agent.toString());
                    
                    int id = agent.hashCode();
                    // if version is taken from CVS id, take of the ID: part
                    /*
                    if(id != null) {
                        int start = id.indexOf("Id: ");
                        if(start != -1) {
                            int end = id.indexOf("Exp $");
                            packet.setServiceID(id.substring(start + 4, end));
                        }
                    } else {*/
                      packet.setServiceID("" + id);
                    //}
                    
                    log.debug("found agent " + packet.getServiceClass()
                            + " from " + packet.getJarURL() 
                            + " with version " + packet.getServiceID()
                            + " description " + packet.getServiceDescription()
                            + " file lastModified " + TextUtils.getDateFormat(deployJars.get(deployFile)));
                    DiscoveryService.knownAgents.put(packet.getServiceID(), packet);
                }
                // add the cache entry of the file
                DiscoveryService.modifiedTimes.put(deployFile, deployJars.get(deployFile));
            } catch(Exception e) {
                log.error("Could not locate installed agents  in " + localContextURL, e);
            }
        }
        return knownAgents.values().toArray(new AgentPacket[knownAgents.size()]);
    }
}




See more files for this project here

GridBlocks

GridBlocks builds a grid application framework via easy-to-use building blocks in distributed environment. The framework offers components for Grid security, distributed storage, computing, and Portlet web interfaces.

Project homepage: http://sourceforge.net/projects/gridblocks
Programming language(s): Java,JSP,XML
License: other

  AgentPacket.java
  DiscoveryPacket.java
  DiscoveryService.java
  DiscoveryThread.java
  DiscoveryThreadMBean.java
  JAXR.java