Code Search for Developers
 
 
  

PartitionManager.java from SmartFrog at Krugle


Show PartitionManager.java syntax highlighted

/** (C) Copyright 1998-2005 Hewlett-Packard Development Company, LP

This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.

This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

For more information: www.smartfrog.org

*/
package org.smartfrog.services.anubis.partition;


import java.net.InetAddress;
import java.rmi.RemoteException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.smartfrog.services.anubis.Anubis;
import org.smartfrog.services.anubis.locator.util.ActiveTimeQueue;
import org.smartfrog.services.anubis.locator.util.TimeQueueElement;
import org.smartfrog.services.anubis.partition.comms.MessageConnection;
import org.smartfrog.services.anubis.partition.protocols.partitionmanager.PartitionProtocol;
import org.smartfrog.services.anubis.partition.test.node.TestMgr;
import org.smartfrog.services.anubis.partition.util.Config;
import org.smartfrog.services.anubis.partition.util.Identity;
import org.smartfrog.services.anubis.partition.views.BitView;
import org.smartfrog.services.anubis.partition.views.View;
import org.smartfrog.sfcore.common.SmartFrogException;
import org.smartfrog.sfcore.compound.Compound;
import org.smartfrog.sfcore.compound.CompoundImpl;
import org.smartfrog.sfcore.logging.LogImplAsyncWrapper;
import org.smartfrog.sfcore.logging.LogSF;
import org.smartfrog.sfcore.prim.TerminationRecord;

public class PartitionManager
        extends CompoundImpl
        implements Compound, Partition {

    static final int   UNDEFINED_LEADER  = -1;

    PartitionProtocol  partitionProtocol = null;
    Identity           me                = null;
    Set                notificationSet   = new HashSet();
    View               notifiedView      = null;
    int                notifiedLeader    = UNDEFINED_LEADER;
    boolean            testable          = false;
    TestMgr            testManager       = null;
    LogSF              log               = null;
    ActiveTimeQueue    timer = null;
    boolean            terminated        = false;

    public PartitionManager() throws RemoteException {
        super();
    }

    public void sfDeploy() throws SmartFrogException, RemoteException  {
        try {
            super.sfDeploy();
            log               = new LogImplAsyncWrapper(this.sfGetApplicationLog());
            timer             = new ActiveTimeQueue();
            me                = Config.getIdentity(this, "identity");
            partitionProtocol = (PartitionProtocol)sfResolve("partitionProtocol");
        }
        catch (Exception ex) {
            throw (SmartFrogException)SmartFrogException.forward(ex);
        }
    }

    public void sfStart() throws SmartFrogException, RemoteException  {
        try {
            super.sfStart();
            timer.start();

            if( log.isInfoEnabled() )
                log.info("Started partition manager at " + me + " " + Anubis.version);
        }
        catch (Exception ex) {
            throw (SmartFrogException)SmartFrogException.forward(ex);
        }
    }

    public void sfTerminateWith(TerminationRecord status) {
        if( log.isInfoEnabled() )
            log.info("Terminating partition manager at " + me);
        timer.terminate();
        terminated = true;
        super.sfTerminateWith(status);
    }

    public synchronized void notify(View view, int leader) {

        if( view.isStable() &&
            leader == me.id &&
            notifiedLeader != leader &&
            notifiedLeader != UNDEFINED_LEADER &&
            log.isErrorEnabled() )
            log.error("Leader changed to me on stabalization, old leader = " + notifiedLeader +
                      ", new leader = " + leader + ", view = " + view);

        notifiedView = new BitView(view);
        notifiedLeader = leader;
        Iterator iter = ((Set)((HashSet)notificationSet).clone()).iterator();
        while( iter.hasNext() )
            safePartitionNotification((PartitionNotification)iter.next(),
                                      notifiedView,
                                      notifiedLeader);
    }


    /**
     * This method will invoke user code in the listener. It is timed, logs
     * timeliness errors and catches Throwables.
     *
     * @param listener
     */
    private void safePartitionNotification(PartitionNotification pn, View view, int leader) {
        long         timein  = System.currentTimeMillis();
        long         timeout = 0;
        class TimeoutErrorLogger extends TimeQueueElement {
            View view;
            int  leader;
            TimeoutErrorLogger(View v, int l) {
                view   = v;
                leader = l;
            }
            public void expired() {
                if( log.isErrorEnabled() )
                    log.error("User API Upcall took >200ms in " +
                              "partitionNotification(view, leader) where view=" +
                               view + ", leader=" + leader);
            }
        }
        TimeoutErrorLogger timeoutErrorLogger = new TimeoutErrorLogger(view, leader);


        timer.add(timeoutErrorLogger, (timein+200) );
        try {
            pn.partitionNotification(view, leader);
        } catch (Throwable ex) {
            if( log.isFatalEnabled() )
                log.fatal("User API Upcall threw Throwable in " +
                              "partitionNotification(view, leader) where view=" +
                               view + ", leader=" + leader, ex);
        }
        timeout = System.currentTimeMillis();
        timer.remove(timeoutErrorLogger);
        if( log.isTraceEnabled() )
            log.trace("User API Upcall took " + (timeout - timein) +
                      "ms in partitionNotification(view, leader) where view=" +
                      view + ", leader=" + leader);
    }




    public synchronized void receiveObject(Object obj, int sender, long time) {
        if( terminated )
            return;
        Iterator iter = ((Set)((HashSet)notificationSet).clone()).iterator();
        while( iter.hasNext() )
            safeObjectNotification((PartitionNotification)iter.next(), obj, sender, time);
    }

    /**
     * This method will invoke user code in the listener. It is timed, logs
     * timeliness errors and catches Throwables.
     *
     * @param listener
     */
    private void safeObjectNotification(PartitionNotification pn, Object obj, int sender, long time) {
        long         timein  = System.currentTimeMillis();
        long         timeout = 0;
        class TimeoutErrorLogger extends TimeQueueElement {
            Object obj;
            int    sender;
            long   time;
            TimeoutErrorLogger(Object o, int s, long t) {
                obj    = o;
                sender = s;
                time   = t;
            }
            public void expired() {
                if( log.isErrorEnabled() )
                    log.error("User API Upcall took >200ms in " +
                              "objectNotification(obj, sender, time) where obj=" +
                              obj + ", sender=" + sender + ", time=" +time );
            }
        }
        TimeoutErrorLogger timeoutErrorLogger = new TimeoutErrorLogger(obj, sender, time);


        timer.add(timeoutErrorLogger, (timein+200) );
        try {
            pn.objectNotification(obj, sender, time);
        } catch (Throwable ex) {
            if( log.isFatalEnabled() )
                log.fatal("User API Upcall threw Throwable in " +
                          "objectNotification(obj, sender, time) where obj=" +
                          obj + ", sender=" + sender + ", time=" +time, ex);
        }
        timeout = System.currentTimeMillis();
        timer.remove(timeoutErrorLogger);
        if( log.isTraceEnabled() )
            log.trace("User API Upcall took " + (timeout - timein) +
                      "ms in objectNotification(obj, sender, time) where obj=" +
                      obj + ", sender=" + sender + ", time=" +time);
    }



    public synchronized Status getStatus() {
        return new Status(notifiedView, notifiedLeader);
    }

    public synchronized void register(PartitionNotification pn) {
        notificationSet.add(pn);
    }

    public synchronized void deregister(PartitionNotification pn) {
        notificationSet.remove(pn);
    }

    public MessageConnection connect(int node) {
        return partitionProtocol.connect(node);
    }

    public InetAddress getNodeAddress(int node) {
        return partitionProtocol.getNodeAddress(node);
    }

    public int getId() {
        return me.id;
    }

}




See more files for this project here

SmartFrog

SmartFrog (Smart Framework for Object Groups) is a framework for configuring and automatically activating distributed applications. \r\nThe SmartFrog framework is released under LGPL license.\r\nMore info at: www.smartfrog.org

Project homepage: http://sourceforge.net/projects/smartfrog
Programming language(s): Java,XML
License: other

  comms/
    blocking/
      BlockingConnectionInitiator.java
      MessageConnectionImpl.java
      MessageConnectionServer.java
      MessageConnectionServerFactory.java
    multicast/
      HeartbeatComms.java
      HeartbeatCommsFactory.java
      HeartbeatCommsIntf.java
      HeartbeatConnection.java
      MessageHandler.java
    nonblocking/
      ConnectWorker.java
      MessageNioHandler.java
      MessageNioServer.java
      MessageNioServerFactory.java
      NonBlockingConnectionInitiator.java
      RxJob.java
      RxQueue.java
      RxQueueWorker.java
      SendingListener.java
    Connection.java
    IOConnection.java
    IOConnectionServer.java
    IOConnectionServerFactory.java
    MessageConnection.java
    SelfConnection.java
    components.sf
  protocols/
    heartbeat/
      ping/
        PingProtocolFactory.java
        PingProtocolImpl.java
      timed/
        TimedProtocolFactory.java
        TimedProtocolImpl.java
      HeartbeatProtocol.java
      HeartbeatProtocolAdapter.java
      HeartbeatProtocolFactory.java
      HeartbeatReceiver.java
      components.sf
    leader/
      Candidate.java
      CandidateImpl.java
      LeaderMgr.java
      LeaderProtocolFactory.java
      components.sf
    partitionmanager/
      ConnectionSet.java
      IntervalExec.java
      PartitionProtocol.java
    Sender.java
    Timed.java
  test/
    colors/
      ColorAllocator.java
      ColorMap.java
    mainconsole/
      AsymetryReportFrame.java
      Controller.java
      MainConsoleFrame.java
      NodeButton.java
      NodeData.java
      NodeFrame.java
      Snoop.java
      SnugTable.java
      TestConnection.java
    msg/
      GetStatsMsg.java
      GetThreadsMsg.java
      IgnoringMsg.java
      PartitionMsg.java
      SetIgnoringMsg.java
      SetTimingMsg.java
      StatsMsg.java
      TestConnectionMsg.java
      ThreadsMsg.java
      TimingMsg.java
    node/
      CommsTestIntf.java
      TestConnection.java
      TestMgr.java
      TestServer.java
    stats/
      AveCalculator.java
      Heartbeats.java
      Scheduling.java
      StatsManager.java
      TimedAveCalculator.java
  util/
    Config.java
    ConfigException.java
    Epoch.java
    Identity.java
    NodeIdSet.java
    Test.java
    components.sf
  views/
    BitView.java
    NumberedView.java
    View.java
    ViewListener.java
  wire/
    msg/
    Test.java
    Wire.java
    WireFormException.java
    WireMsg.java
    WireSizes.java
  PMTest.java
  Partition.java
  PartitionManager.java
  PartitionNotification.java
  Status.java
  components.sf