Code Search for Developers
 
 
  

RemoteServiceConsumer.java from Texai at Krugle


Show RemoteServiceConsumer.java syntax highlighted

/*
 * RemoteServiceConsumer.java
 *
 * Created on April 24, 2007, 1:37 PM
 *
 * Description: Provides a remote service consumer.
 *
 * Copyright (C) April 24, 2007 Stephen L. Reed, ported to ApacheMQ and derived from
 * substantially the same class written by Vitaly Tsaplin, 2007, http://jmsrmi.sourceforge.net/
 *
 * This program is free software; you can redistribute it and/or modify it under the terms
 * of the GNU General Public License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along with this program;
 * if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 */

package org.texai.kb.ejb.jmsrmi;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.EventListener;
import java.util.Map;
import java.util.WeakHashMap;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.log4j.Logger;
import org.texai.util.TexaiException;


/**
 * @author vtsaplin
 *
 */
public class RemoteServiceConsumer implements InvocationHandler, MessageListener {
  
  /** the logger */
  private static final Logger LOGGER = Logger.getLogger(RemoteServiceConsumer.class);
  
  /** the JMS connection factory */
  private ConnectionFactory connectionFactory;
  
  /** the JMS connection */
  private Connection connection;
  
  /** the JMS session */
  private Session session;
  
  /** the service destination */
  private Destination serviceDestination;
  
  /** the response destination */
  private Destination responseDestination;
  
  /** the message producer */
  private MessageProducer producer;
  
  /** the message consumer */
  private MessageConsumer consumer;
  
  /** the service interface class */
  private Class serviceInterface;
  
  /** the remote method invocation timeout seconds */
  private int timeout;
  
  /** the recipients dictionary, id --> RemoteListenerInvoker, ResultGetter or RemoteMessageRecipient */
  private final Map<Long, Object> recipientsDictionary;
  
  /** the remote argument cache, argument --> id */
  private final Map<Object, Long> cache;
  
  /** Creates a new RemoteServiceConsumer instance. */
  public RemoteServiceConsumer() {
    recipientsDictionary = Collections.synchronizedMap(new WeakHashMap<Long, Object> ());
    cache = Collections.synchronizedMap(new WeakHashMap<Object, Long> ());
    timeout = 30;
  }
  
  /** Gets the service interface class
   *
   * @return the service interface class
   */
  public Class getServiceInterface() {
    return serviceInterface;
  }
  
  /** Sets the service interface class
   *
   * @param the service interface class
   */
  public void setServiceInterface(final Class serviceInterface) {
    //Preconditions
    assert serviceInterface != null : "serviceInterface must not be null";
    
    this.serviceInterface = serviceInterface;
  }
  
  /** Gets the JMS connection factory
   *
   * @return the JMS connection factory
   */
  public ConnectionFactory getConnectionFactory() {
    return connectionFactory;
  }
  
  /** Sets the JMS connection factory
   *
   * @param connectionFactory the JMS connection factory
   */
  public void setConnectionFactory(final ConnectionFactory connectionFactory) {
    //Preconditions
    assert connectionFactory != null : "connectionFactory must not be null";
    
    this.connectionFactory = connectionFactory;
  }
  
  /** Gets the service destination
   *
   * @return the service destination
   */
  public Destination getServiceDestination() {
    return serviceDestination;
  }
  
  /** Sets the service destination
   *
   * @param destination the service destination
   */
  public void setServiceDestination(final Destination destination) {
    //Preconditions
    assert destination != null : "destination must not be null";
    
    this.serviceDestination = destination;
  }
  
  /** Gets the remote method invocation timeout seconds
   *
   * @return the remote method invocation timeout seconds
   */
  public synchronized int getTimeout() {
    return timeout;
  }
  
  /** Sets the remote method invocation timeout seconds
   *
   * @param timeout the remote method invocation timeout seconds
   */
  public synchronized void setTimeout(final int timeout) {
    //Preconditions
    assert timeout >= 0 : "timeout must not be negative";
    
    this.timeout = timeout;
  }
  
  /** Initializes this remote service consumer. */
  public void init() {
    try {
      // create a connection
      connection = connectionFactory.createConnection();
      connection.start();
      
      // create a session
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
      // create a MessageConsumer
      producer = session.createProducer(serviceDestination);
      producer.setTimeToLive(timeout);
      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
      
      // the back way channel
      responseDestination = session.createTemporaryQueue();
      consumer = session.createConsumer(responseDestination);
      consumer.setMessageListener(this);
    } catch (final JMSException ex) {
      throw new TexaiException(ex);
    }
  }
  
  /** Cleans up this remote service consumer. */
  public void cleanup() {
    try {
      consumer.close();
      producer.close();
      session.close();
      connection.close();
    } catch (JMSException ex) {
      throw new TexaiException(ex);
    }
  }
  
  /** Creates a remote proxy object.
   *
   * @return the remote proxy object
   */
  public Object createProxy() {
    return Proxy.newProxyInstance(
            getClass().getClassLoader(),
            new Class [] { serviceInterface },
            this);
  }
  
  /** Defines the interface for a remote message recipient
   * @author vtsaplin
   *
   */
  private interface RemoteMessageRecipient {
    
    /** Recieves a JMS object message.
     *
     * @param objectMessage the JMS object message
     * @param obj the object of the message
     */
    void messageReceived(ObjectMessage objectMessage, Object obj);
  }
  
  /** Provides a result getter which is a remote message recipient.
   * @author vtsaplin
   *
   */
  private class ResultGetter implements RemoteMessageRecipient {
    
    /** the remote method invocation result */
    public Object result = null;
    
    /** Returns the remote method invocation result.
     *
     * @return the remote method invocation result
     */
    public synchronized Object getResult() throws InterruptedException {
      if (result == null) {
        wait(timeout * 1000);
      }
      return result;
    }
    
    /** Recieves a JMS object message.
     *
     * @param objectMessage the JMS object message (ignored)
     * @param obj the object of the message
     */
    public synchronized void messageReceived(final ObjectMessage objectMessage, final Object obj) {
      final RemoteInvocationResult remoteInvocationResult = (RemoteInvocationResult)obj;
      result = remoteInvocationResult.getResult();
      notify();
    }
  }
  
  /** Provides a remote listener invoker.
   * @author vtsaplin
   *
   */
  private class RemoteListenerInvoker implements RemoteMessageRecipient {
    
    /** the event listener */
    private EventListener listener;
    
    /** Creates a new RemoteListenerInvoker instance.
     *
     * @param listener the event listener
     */
    public RemoteListenerInvoker(EventListener listener) {
      //Preconditions
      assert listener != null : "listener must not be null";
      
      this.listener = listener;
    }
    
    /** Recieves a JMS object message.
     *
     * @param objectMessage the JMS object message (ignored)
     * @param obj the object of the message
     */
    public void messageReceived(final ObjectMessage objectMessage, final Object obj) {
      final RemoteInvocation remoteInvocation = (RemoteInvocation)obj;
      for (Method method : listener.getClass().getMethods()) {
        if (method.getName().equals(remoteInvocation.getName())) {
          try {
            method.invoke(listener, remoteInvocation.getArgs());
          } catch (final IllegalArgumentException ex) {
            throw new TexaiException(ex);
          } catch (final IllegalAccessException ex) {
            throw new TexaiException(ex);
          } catch (final InvocationTargetException ex) {
            throw new TexaiException(ex);
          }
          break;
        }
      }
    }
  }
  
  /** Processes a method invocation on a proxy instance and returns the result.
   *
   * @param proxy the proxy instance that the method was invoked on
   * @param method the Method instance corresponding to the interface method invoked on the proxy instance
   * @param args an array of objects containing the values of the arguments passed in the method invocation
   * on the proxy instance, or null if interface method takes no arguments
   */
  public Object invoke(
          final Object proxy,
          final Method method,
          final Object [] args) throws Throwable {
    LOGGER.debug("invoking " + method.getName());
    Object result = null;
    
    if (args != null) {
      for (int i=0; i<args.length; i++) {
        if (args [i] instanceof EventListener) {
          long id = 0;
          if (!cache.containsKey(args [i])) {
            id = args [i].hashCode() + System.currentTimeMillis();
            recipientsDictionary.put(id, new RemoteListenerInvoker((EventListener)args [i]));
            cache.put(args [i], id);
          } else {
            id = cache.get(args [i]);
          }
          args [i] = new RemoteInvocationArgument(id, args [i].getClass());
        }
      }
    }
    
    // get current thread ID
    long id = Thread.currentThread().getId();
    
    // create a remote method
    final RemoteInvocation remoteInvocation = new RemoteInvocation(id, method, args);
    
    // create an object message
    final ObjectMessage objectMessage = session.createObjectMessage(remoteInvocation);
    objectMessage.setJMSReplyTo(responseDestination);
    
    if (Void.TYPE.equals(method.getReturnType())) {
      // if the method does not return a value
      // we don't expect a result
      producer.send(objectMessage);
    } else {
      // create a result getter and start listening
      ResultGetter ResultGetter = new ResultGetter();
      recipientsDictionary.put(id, ResultGetter);
      
      // send a message and wait for the result
      LOGGER.debug("sending " + objectMessage);
      producer.send(objectMessage);
      result = ResultGetter.getResult();
      
      // stop listening
      recipientsDictionary.remove(ResultGetter);
    }
    
    // if we have an exception, throw it
    if (result instanceof Throwable) {
      throw (Throwable)result;
    }
    
    return result;
  }
  
  /** Passes a message to the listener.
   *
   * @param msg the JMS message
   */
  public void onMessage(Message msg) {
    LOGGER.info("onMessage " + msg);
    try {
      if (msg instanceof ObjectMessage) {
        final Object obj = ((ObjectMessage)msg).getObject();
        if (obj instanceof RemoteTransferObject) {
          final RemoteTransferObject remoteTransferObject = (RemoteTransferObject)obj;
          final RemoteMessageRecipient remoteMessageRecipient = 
                  (RemoteMessageRecipient)recipientsDictionary.get(remoteTransferObject.getID());
          remoteMessageRecipient.messageReceived((ObjectMessage)msg, obj);
        }
      }
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }
  
  /** Return an instance (possibly shared or independent) of the object managed by this factory.
   *
   * @return an instance (possibly shared or independent) of the object managed by this factory
   */
  public Object getObject() throws Exception {
    return createProxy();
  }
  
  /** Return the type of object that this FactoryBean creates, or null  if not known in advance.
   *
   * @return the type of object that this FactoryBean creates, or null  if not known in advance
   */
  public Class getObjectType() {
    return serviceInterface;
  }
  
  /** Return whether the bean managed by this factory a singleton or a prototype.
   *
   * @return whether the bean managed by this factory a singleton or a prototype
   */
  public boolean isSingleton() {
    return true;
  }
}




See more files for this project here

Texai

Texai is an chatbot that intelligently seeks to acquire knowledge and friendly behaviors.

Project homepage: http://sourceforge.net/projects/texai
Programming language(s): Java,Shell Script,XML
License: other

  RemoteInvocation.java
  RemoteInvocationArgument.java
  RemoteInvocationResult.java
  RemoteServiceConsumer.java
  RemoteServiceProvider.java
  RemoteTransferObject.java