Show DiscoveryService.java syntax highlighted
/*
* Copyright (c) 2005
* Helsinki Institute of Physics
* see LICENSE file for details
*
* DiscoveryService.java
* Created on Aug 5, 2004
*/
package fi.hip.gb.net.discovery;
import java.io.File;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.NetworkInterface;
import java.net.URL;
import java.net.UnknownHostException;
import java.rmi.RemoteException;
import java.util.Enumeration;
import java.util.Hashtable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import fi.hip.gb.core.Config;
import fi.hip.gb.net.Discovery;
import fi.hip.gb.utils.ArrayUtils;
import fi.hip.gb.utils.ClassLoaderUtils;
import fi.hip.gb.utils.TextUtils;
/**
* An default implementation of the {@link Discovery} interface
* used for discovering GBAgent servers and services. This class
* is accessible from outside using {@link fi.hip.gb.net.rpc.DiscoveryService}
* SOAP stub.
* <p>
* The service doesn't know its own address until this service is queried once
* through AXIS stack. The reason is we cannot discover the URL of the local
* service to be published for others using Servlet API.
*
* @author Juho Karppinen
*/
public class DiscoveryService implements Discovery {
/** known servers, key is the service URL and value is corresponding <code>DiscoveryPacket</code> */
private static Hashtable<String,DiscoveryPacket> knownServices = new Hashtable<String,DiscoveryPacket>();
/** caches the list of known agents found from JAR files, key is the agent ID
* and value is corresponding <code>AgentPacket</code> */
private static Hashtable<String,AgentPacket> knownAgents = new Hashtable<String,AgentPacket>();
/** modified times of JAR files, file URL is the key and value is timestamp */
private static Hashtable<URL,Long> modifiedTimes = new Hashtable<URL,Long>();
/** local context URL is the URL published to others */
private static String localContextURL = null;
/** at what time services were validated */
private static long lastValidation = -1;
/** static instance of the service */
private static DiscoveryService instance;
private static Log log = LogFactory.getLog(DiscoveryService.class);
/**
* New discovery service is created, initialize some stuff from
* Axis messagecontext.
*/
public DiscoveryService() {
/**
* Lets add the localhost to the list of known servers
* <p>
* When the first contact comes, discover the local service context URL.
* The reason for this is:
* - we cannot discover the URL before first contact
* - Because localhost isn't validated with positive callback as remote services are,
* we dont want to rely the port number in config file.
* <p>
* The published URL can be overriden with DISCOVERY_PROXY option from config file.
* Then calls from outside goes first to the proxy (such as frontend server of the cluster),
* who knows how to redirect messages to the real server.
*/
/*
if(localContextURL == null) {
MessageContext context = MessageContext.getCurrentContext();
String proxyURL = Config.getInstance().getDiscoveryProxy();
if(proxyURL != null) {
// trust the config file
localContextURL = proxyURL;
log.info("Setting local context to proxy URL " + proxyURL);
} else if (context != null) {
// get the url and remove services/discovery out
String url = context.getStrProp(MessageContext.TRANS_URL);
if(url.toLowerCase().indexOf("localhost") == -1) {
// don't use localhost
int index = url.indexOf("/services/discovery");
localContextURL = url.substring(0, index);
log.info("Localhost context URL replaced with " + localContextURL);
} else {
log.info("Skipping localhost context URL " + url);
}
}
if(localContextURL != null) {
DiscoveryPacket dp = new DiscoveryPacket(localContextURL);
dp.setAgents(getSupportedAgents());
dp.jars(DiscoveryService.getInstance().getDeployedJars());
knownServices.put(localContextURL, dp);
}
}*/
}
/**
* Gets the static instance of the discovery service.
* @return instance for the service
*/
public static DiscoveryService getInstance() {
if(instance == null) {
instance = new DiscoveryService();
}
return instance;
}
/**
* Gets local context URL. It is either automatically discovered or
* defined in config file.
* @see fi.hip.gb.net.Discovery#getLocalService()
*/
public String getLocalService() throws RemoteException {
if(localContextURL != null)
return localContextURL;
try {
String proxyURL = null;//Config.getInstance().getDiscoveryProxy();
if(proxyURL == null) {
String globala = ""; //store the address here
Enumeration<NetworkInterface> e1 =
(Enumeration<NetworkInterface>)NetworkInterface.getNetworkInterfaces();
while(e1.hasMoreElements()) {
NetworkInterface ni = e1.nextElement();
Enumeration<InetAddress> e2 = ni.getInetAddresses();
//get the "right" IP4 address, omit IP6's and localhost
while(e2.hasMoreElements()) {
InetAddress ia = e2.nextElement();
String ias = ia.toString();
if (ias.indexOf('.') > -1) {
if (ias.indexOf("127.0.0.1") == -1) {
//This is not localhost
globala = ias;
}
}
}
}
if ("".equals(globala))
throw new UnknownHostException("No global IP in interfaces");
proxyURL = "http:/" + globala + ":"
//+ Config.getInstance().getDiscoveryPort() + "/"
+ Config.getInstance().getContextPath();
log.debug("getLocalService returns "+proxyURL+" globala is "+globala);
}
return proxyURL;
} catch (Exception uhe) {
log.error("Could not find local IP");
}
//throw new RemoteException("Local context URL cannot be found");
return null;
}
/*
* @see fi.hip.gb.server.Discovery#addService(DiscoveryPacket)
*/
public void addService(DiscoveryPacket packet) throws RemoteException {
try {
// test if the new service really exists by calling back
if(packet.getServiceURL().indexOf("localhost") != -1) {
throw new RemoteException("Localhost is not valid ip-address");
}
fi.hip.gb.net.rpc.DiscoveryService discovery =
new fi.hip.gb.net.rpc.DiscoveryService(packet.getServiceURL());
DiscoveryPacket[] packets = discovery.callback(this.listServices());
// now we know that the new service is valid and (re)add it to the list
packet.resetResponseTime();
addValidatedService(packet);
// also add the sub-services known by our new service
for(int i=0; i < packets.length; i++) {
// don't override if local response time is lower
DiscoveryPacket oldPacket = (DiscoveryPacket) knownServices.get(packets[i].getServiceURL());
if(oldPacket == null || oldPacket.getLastResponse() > packets[i].getLastResponse()) {
addValidatedService(packets[i]);
}
}
} catch( RemoteException re) {
log.error("Could not validate the service " + packet.getServiceURL() + "\n"
+ re.getMessage());
throw re;
} catch (Exception e) {
// new service wasn't valid, skip it
log.error("Could not validate the service " + packet.getServiceURL() + "\n"
+ e.getMessage());
throw new RemoteException("Could not validate the service because of "
+ e.getClass().getName() + " : " + e.getMessage());
}
}
/**
* Adds already validated service to the database. Validation means that
* the server exists and is reachable.
* @param packet a validated discovery packet
*/
public void addValidatedService(DiscoveryPacket packet) {
if (knownServices.put(packet.getServiceURL(), packet) == null) {
log.info("Accepting a new service to known hosts " + packet.getServiceURL()
+ " last response " + packet.getLastResponse()/1000 + " s ago");
} else {
log.debug("Updating an already known host " + packet.getServiceURL()
+ " last response " + packet.getLastResponse()/1000 + " s ago");
}
}
/*
* @see fi.hip.gb.server.Discovery#removeService(String)
*/
public void removeService(String serviceURL) throws RemoteException {
DiscoveryPacket dp = knownServices.get(serviceURL);
log.info("Removing service " + serviceURL
+ " from the list of servers. Last response was "
+ dp.getLastResponse()
+ " ms ago");
knownServices.remove(serviceURL);
}
/*
* @see fi.hip.gb.server.Discovery#listServices()
*/
public DiscoveryPacket[] listServices() throws RemoteException {
validateServices();
DiscoveryPacket[] packets =
(DiscoveryPacket[]) knownServices.values().toArray(new DiscoveryPacket[knownServices.size()]);
for (int i = 0; i < packets.length; i++) {
//log.info("listing " + packets[i].getServiceURL()
// + " last response " + packets[i].getLastResponse()/1000 + " s ago");
}
return packets;
}
/*
* @see fi.hip.gb.server.Discovery#listAgents()
*/
public AgentPacket[] listAgents() throws RemoteException {
Hashtable<String,AgentPacket> agents = new Hashtable<String,AgentPacket>();
DiscoveryPacket[] servers = this.listServices();
for (int i = 0; i < servers.length; i++) {
AgentPacket[] localAgents = servers[i].getAgents();
for (int k = 0; k < localAgents.length; k++) {
// create copy of the server with only one jar to the agent package
DiscoveryPacket dp = new DiscoveryPacket(servers[i].getServiceURL());
dp.setJars(new String[] {localAgents[k].getJarURL()});
// check for duplicate services
AgentPacket packet = (AgentPacket) agents.get(localAgents[k].getServiceClass() + "_" + localAgents[k].getServiceID());
if(packet != null) {
// we have a new server for this service
String localURL = getLocalService();
if(localURL.equals(dp.getServiceURL())) {
// make sure that the local server always offers itself as a first choice
packet.setServers((DiscoveryPacket[])ArrayUtils.insert(packet.getServers(), 0, dp));
} else {
packet.setServers((DiscoveryPacket[])ArrayUtils.append(packet.getServers(), dp));
}
log.debug("duplicate version ("
+ packet.getServiceID()
+ ") of the agent "
+ packet.getServiceClass()
+ " found from " + dp.getServiceURL());
} else {
// not yet known agent service and/or version, append to the list
localAgents[k].setServers(new DiscoveryPacket[]{dp});
agents.put(localAgents[k].getServiceClass() + "_" + localAgents[k].getServiceID(), localAgents[k]);
}
}
/* log.inform("listing " + packets[i].getServiceURL()
+ " last response " + packets[i].getLastResponse()/1000 + " s ago"); */
}
return (AgentPacket[]) agents.values().toArray(new AgentPacket[agents.size()]);
}
/*
* @see fi.hip.gb.server.Discovery#callback(DiscoveryPacket[])
*/
public DiscoveryPacket[] callback(DiscoveryPacket[] packets) throws RemoteException {
DiscoveryPacket[] ownPackets = this.listServices();
//log.debug("Our discovery request was valid, adding new and sending back own known services");
for (int i = 0; i < packets.length; i++) {
// don't override if local response time is lower
DiscoveryPacket oldPacket = (DiscoveryPacket) knownServices.get(packets[i].getServiceURL());
if(oldPacket == null || oldPacket.getLastResponse() > packets[i].getLastResponse()) {
if(knownServices.put(packets[i].getServiceURL(), packets[i]) == null) {
log.info("Received a new service from our discovery server " + packets[i].getServiceURL()
+ " last response " + packets[i].getLastResponse()/1000 + " s ago");
} else {
//log.trace("Received old callback " + packets[i].getServiceURL()
// + " last response " + packets[i].getLastResponse()/1000 + " s ago");
}
}
}
return ownPackets;
}
/**
* If last information from server was made discovery rate + 400% ago,
* it will be considered as missing and removed from the list of services.
*/
public void validateServices() {
if(lastValidation == -1) {
lastValidation = System.currentTimeMillis();
}
for (Enumeration e = knownServices.elements(); e.hasMoreElements();) {
DiscoveryPacket packet = (DiscoveryPacket) e.nextElement();
if (localContextURL != null
&& packet.getServiceURL().toString().equals(localContextURL.toString())) {
// always trust the existence of localhost
packet.resetResponseTime();
} else {
// add the elapsed time
packet.setLastResponse(packet.getLastResponse() + (System.currentTimeMillis() - lastValidation));
/*
log.debug("validating " + packet.getServiceURL()
+ " last update was "
+ packet.getLastResponse()/1000 + " s ago");
*/
// check with 400% treshold
/*
if (Config.getInstance().getDiscoveryRate()>0 && packet.getLastResponse() > 4.0f * Config.getInstance().getDiscoveryRate() * 1000.f) {
try {
removeService(packet.getServiceURL());
} catch (RemoteException re) {
log.error("Failed to remove service " + packet.getServiceURL(), re);
}
}*/
}
}
lastValidation = System.currentTimeMillis();
}
/**
* Gets a list of deployed agent JAR files on this server.
* @return an array of URLs for agent jars accessible through http protocol
*/
public URL[] getDeployedJars() {
return getDeployedJarTimings().keySet().toArray(new URL[0]);
}
/**
* Gets a list of deployed agent JAR files on this server with last modified tag.
* Files are found inside plugin directories.
* @return hashtable of agent jars, keys are URLs for files accessible through http protocol, and
* the value is last modified timestamp of the file as <code>Long</code>
*/
public Hashtable<URL,Long> getDeployedJarTimings() {
// list of found files with access time
Hashtable<URL,Long> jars = new Hashtable<URL,Long>();
String localService = null;
try {
localService = getLocalService();
} catch (RemoteException re) {
log.error("Error getting local service URL", re);
}
if(localService == null)
return jars;
String[] deployDirs = Config.getInstance().getPluginDirectories();
for(int i=0; i < deployDirs.length; i++) {
File[] deployFiles = null;
if(deployDirs[i].startsWith("/"))
deployFiles = new File(deployDirs[i]).listFiles();
else
deployFiles = new File(Config.getInstance().getDataDirectory() + "/" + deployDirs[i]).listFiles();
for(int k=0; deployFiles != null && k < deployFiles.length; k++) {
try {
URL url = new URL(localService
+ "/dl/"
+ deployFiles[k].getName());
jars.put(url, new Long(deployFiles[k].lastModified()));
} catch (MalformedURLException e) {
log.error("Could not create URL for jar " + deployFiles[k]);
}
}
}
return jars;
}
/**
* Gets the supported agents by the local server. This information is taken
* from cache if timestamps of files has not changed
*
* @return an array of unique agents, may contain multiple items for the same
* agent class but with different versions.
*/
public AgentPacket[] getSupportedAgents() {
// current list of available JAR files
Hashtable<URL,Long> deployJars = getDeployedJarTimings();
// compare timestamps to cached values removing all identical files from the hashtable
for(Enumeration<URL> e = modifiedTimes.keys(); e.hasMoreElements();) {
URL cachedFile = e.nextElement();
Long cachedTime = modifiedTimes.get(cachedFile);
Long currentTime = deployJars.get(cachedFile);
if(currentTime == null) {
// file has been removed
log.debug("Agent JAR file " + cachedFile + " has been removed");
modifiedTimes.remove(cachedFile);
// remove the cache entries of agents
AgentPacket[] aps = (AgentPacket[])knownAgents.values().toArray(new AgentPacket[0]);
for(int i=0; i < aps.length; i++) {
if(aps[i].getJarURL().equals(cachedFile)) {
log.debug("Agent " + aps[i].getServiceID() + " is unloaded");
knownAgents.remove(aps[i].getServiceID());
}
}
} else if(cachedTime != null && cachedTime.longValue() == currentTime.longValue()){
// file has not changed, no need to look inside it
deployJars.remove(cachedFile);
}
}
// now we have only modified JARs left, check services inside them and add them into the cache
// to prevent the continuous reloading of JAR files
for(Enumeration<URL> jarEnum = deployJars.keys(); jarEnum.hasMoreElements();) {
URL deployFile = jarEnum.nextElement();
try {
// load list of agent classes
URL[] jars =
ClassLoaderUtils.prepareJars(new URL[] {deployFile}, Config.getWorkingDir(null));
ClassLoaderUtils loader = new ClassLoaderUtils(this, jars);
String[] classNames = loader.getClassNames(Class.class);
for (int k = 0; k < classNames.length; k++) {
Object agent = (Object)loader.loadInstance(classNames[k]);
AgentPacket packet = new AgentPacket(classNames[k]);
packet.setJarURL(deployFile.toExternalForm());
packet.setServiceDescription(agent.toString());
int id = agent.hashCode();
// if version is taken from CVS id, take of the ID: part
/*
if(id != null) {
int start = id.indexOf("Id: ");
if(start != -1) {
int end = id.indexOf("Exp $");
packet.setServiceID(id.substring(start + 4, end));
}
} else {*/
packet.setServiceID("" + id);
//}
log.debug("found agent " + packet.getServiceClass()
+ " from " + packet.getJarURL()
+ " with version " + packet.getServiceID()
+ " description " + packet.getServiceDescription()
+ " file lastModified " + TextUtils.getDateFormat(deployJars.get(deployFile)));
DiscoveryService.knownAgents.put(packet.getServiceID(), packet);
}
// add the cache entry of the file
DiscoveryService.modifiedTimes.put(deployFile, deployJars.get(deployFile));
} catch(Exception e) {
log.error("Could not locate installed agents in " + localContextURL, e);
}
}
return knownAgents.values().toArray(new AgentPacket[knownAgents.size()]);
}
}
See more files for this project here