RemoteServiceConsumer.java from Texai at Krugle
Show RemoteServiceConsumer.java syntax highlighted
/*
* RemoteServiceConsumer.java
*
* Created on April 24, 2007, 1:37 PM
*
* Description: Provides a remote service consumer.
*
* Copyright (C) April 24, 2007 Stephen L. Reed, ported to ApacheMQ and derived from
* substantially the same class written by Vitaly Tsaplin, 2007, http://jmsrmi.sourceforge.net/
*
* This program is free software; you can redistribute it and/or modify it under the terms
* of the GNU General Public License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this program;
* if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
package org.texai.kb.ejb.jmsrmi;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.EventListener;
import java.util.Map;
import java.util.WeakHashMap;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.log4j.Logger;
import org.texai.util.TexaiException;
/**
* @author vtsaplin
*
*/
public class RemoteServiceConsumer implements InvocationHandler, MessageListener {
/** the logger */
private static final Logger LOGGER = Logger.getLogger(RemoteServiceConsumer.class);
/** the JMS connection factory */
private ConnectionFactory connectionFactory;
/** the JMS connection */
private Connection connection;
/** the JMS session */
private Session session;
/** the service destination */
private Destination serviceDestination;
/** the response destination */
private Destination responseDestination;
/** the message producer */
private MessageProducer producer;
/** the message consumer */
private MessageConsumer consumer;
/** the service interface class */
private Class serviceInterface;
/** the remote method invocation timeout seconds */
private int timeout;
/** the recipients dictionary, id --> RemoteListenerInvoker, ResultGetter or RemoteMessageRecipient */
private final Map<Long, Object> recipientsDictionary;
/** the remote argument cache, argument --> id */
private final Map<Object, Long> cache;
/** Creates a new RemoteServiceConsumer instance. */
public RemoteServiceConsumer() {
recipientsDictionary = Collections.synchronizedMap(new WeakHashMap<Long, Object> ());
cache = Collections.synchronizedMap(new WeakHashMap<Object, Long> ());
timeout = 30;
}
/** Gets the service interface class
*
* @return the service interface class
*/
public Class getServiceInterface() {
return serviceInterface;
}
/** Sets the service interface class
*
* @param the service interface class
*/
public void setServiceInterface(final Class serviceInterface) {
//Preconditions
assert serviceInterface != null : "serviceInterface must not be null";
this.serviceInterface = serviceInterface;
}
/** Gets the JMS connection factory
*
* @return the JMS connection factory
*/
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
/** Sets the JMS connection factory
*
* @param connectionFactory the JMS connection factory
*/
public void setConnectionFactory(final ConnectionFactory connectionFactory) {
//Preconditions
assert connectionFactory != null : "connectionFactory must not be null";
this.connectionFactory = connectionFactory;
}
/** Gets the service destination
*
* @return the service destination
*/
public Destination getServiceDestination() {
return serviceDestination;
}
/** Sets the service destination
*
* @param destination the service destination
*/
public void setServiceDestination(final Destination destination) {
//Preconditions
assert destination != null : "destination must not be null";
this.serviceDestination = destination;
}
/** Gets the remote method invocation timeout seconds
*
* @return the remote method invocation timeout seconds
*/
public synchronized int getTimeout() {
return timeout;
}
/** Sets the remote method invocation timeout seconds
*
* @param timeout the remote method invocation timeout seconds
*/
public synchronized void setTimeout(final int timeout) {
//Preconditions
assert timeout >= 0 : "timeout must not be negative";
this.timeout = timeout;
}
/** Initializes this remote service consumer. */
public void init() {
try {
// create a connection
connection = connectionFactory.createConnection();
connection.start();
// create a session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create a MessageConsumer
producer = session.createProducer(serviceDestination);
producer.setTimeToLive(timeout);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// the back way channel
responseDestination = session.createTemporaryQueue();
consumer = session.createConsumer(responseDestination);
consumer.setMessageListener(this);
} catch (final JMSException ex) {
throw new TexaiException(ex);
}
}
/** Cleans up this remote service consumer. */
public void cleanup() {
try {
consumer.close();
producer.close();
session.close();
connection.close();
} catch (JMSException ex) {
throw new TexaiException(ex);
}
}
/** Creates a remote proxy object.
*
* @return the remote proxy object
*/
public Object createProxy() {
return Proxy.newProxyInstance(
getClass().getClassLoader(),
new Class [] { serviceInterface },
this);
}
/** Defines the interface for a remote message recipient
* @author vtsaplin
*
*/
private interface RemoteMessageRecipient {
/** Recieves a JMS object message.
*
* @param objectMessage the JMS object message
* @param obj the object of the message
*/
void messageReceived(ObjectMessage objectMessage, Object obj);
}
/** Provides a result getter which is a remote message recipient.
* @author vtsaplin
*
*/
private class ResultGetter implements RemoteMessageRecipient {
/** the remote method invocation result */
public Object result = null;
/** Returns the remote method invocation result.
*
* @return the remote method invocation result
*/
public synchronized Object getResult() throws InterruptedException {
if (result == null) {
wait(timeout * 1000);
}
return result;
}
/** Recieves a JMS object message.
*
* @param objectMessage the JMS object message (ignored)
* @param obj the object of the message
*/
public synchronized void messageReceived(final ObjectMessage objectMessage, final Object obj) {
final RemoteInvocationResult remoteInvocationResult = (RemoteInvocationResult)obj;
result = remoteInvocationResult.getResult();
notify();
}
}
/** Provides a remote listener invoker.
* @author vtsaplin
*
*/
private class RemoteListenerInvoker implements RemoteMessageRecipient {
/** the event listener */
private EventListener listener;
/** Creates a new RemoteListenerInvoker instance.
*
* @param listener the event listener
*/
public RemoteListenerInvoker(EventListener listener) {
//Preconditions
assert listener != null : "listener must not be null";
this.listener = listener;
}
/** Recieves a JMS object message.
*
* @param objectMessage the JMS object message (ignored)
* @param obj the object of the message
*/
public void messageReceived(final ObjectMessage objectMessage, final Object obj) {
final RemoteInvocation remoteInvocation = (RemoteInvocation)obj;
for (Method method : listener.getClass().getMethods()) {
if (method.getName().equals(remoteInvocation.getName())) {
try {
method.invoke(listener, remoteInvocation.getArgs());
} catch (final IllegalArgumentException ex) {
throw new TexaiException(ex);
} catch (final IllegalAccessException ex) {
throw new TexaiException(ex);
} catch (final InvocationTargetException ex) {
throw new TexaiException(ex);
}
break;
}
}
}
}
/** Processes a method invocation on a proxy instance and returns the result.
*
* @param proxy the proxy instance that the method was invoked on
* @param method the Method instance corresponding to the interface method invoked on the proxy instance
* @param args an array of objects containing the values of the arguments passed in the method invocation
* on the proxy instance, or null if interface method takes no arguments
*/
public Object invoke(
final Object proxy,
final Method method,
final Object [] args) throws Throwable {
LOGGER.debug("invoking " + method.getName());
Object result = null;
if (args != null) {
for (int i=0; i<args.length; i++) {
if (args [i] instanceof EventListener) {
long id = 0;
if (!cache.containsKey(args [i])) {
id = args [i].hashCode() + System.currentTimeMillis();
recipientsDictionary.put(id, new RemoteListenerInvoker((EventListener)args [i]));
cache.put(args [i], id);
} else {
id = cache.get(args [i]);
}
args [i] = new RemoteInvocationArgument(id, args [i].getClass());
}
}
}
// get current thread ID
long id = Thread.currentThread().getId();
// create a remote method
final RemoteInvocation remoteInvocation = new RemoteInvocation(id, method, args);
// create an object message
final ObjectMessage objectMessage = session.createObjectMessage(remoteInvocation);
objectMessage.setJMSReplyTo(responseDestination);
if (Void.TYPE.equals(method.getReturnType())) {
// if the method does not return a value
// we don't expect a result
producer.send(objectMessage);
} else {
// create a result getter and start listening
ResultGetter ResultGetter = new ResultGetter();
recipientsDictionary.put(id, ResultGetter);
// send a message and wait for the result
LOGGER.debug("sending " + objectMessage);
producer.send(objectMessage);
result = ResultGetter.getResult();
// stop listening
recipientsDictionary.remove(ResultGetter);
}
// if we have an exception, throw it
if (result instanceof Throwable) {
throw (Throwable)result;
}
return result;
}
/** Passes a message to the listener.
*
* @param msg the JMS message
*/
public void onMessage(Message msg) {
LOGGER.info("onMessage " + msg);
try {
if (msg instanceof ObjectMessage) {
final Object obj = ((ObjectMessage)msg).getObject();
if (obj instanceof RemoteTransferObject) {
final RemoteTransferObject remoteTransferObject = (RemoteTransferObject)obj;
final RemoteMessageRecipient remoteMessageRecipient =
(RemoteMessageRecipient)recipientsDictionary.get(remoteTransferObject.getID());
remoteMessageRecipient.messageReceived((ObjectMessage)msg, obj);
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
/** Return an instance (possibly shared or independent) of the object managed by this factory.
*
* @return an instance (possibly shared or independent) of the object managed by this factory
*/
public Object getObject() throws Exception {
return createProxy();
}
/** Return the type of object that this FactoryBean creates, or null if not known in advance.
*
* @return the type of object that this FactoryBean creates, or null if not known in advance
*/
public Class getObjectType() {
return serviceInterface;
}
/** Return whether the bean managed by this factory a singleton or a prototype.
*
* @return whether the bean managed by this factory a singleton or a prototype
*/
public boolean isSingleton() {
return true;
}
}
See more files for this project here