Code Search for Developers
 
 
  

cs_self_cluster.c from EmStar at Krugle


Show cs_self_cluster.c syntax highlighted

/*
 *
 * Copyright (c) 2004 The Regents of the University of California.  All 
 * rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Neither the name of the University nor the names of its
 *   contributors may be used to endorse or promote products derived
 *   from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */
 

#include "cs_i.h"
#include "link/neighbor.h"
#include "topo_ctl/cluster.h"
#include <math.h>


static int cluster_elect_timeout(void *data, int interval, g_event_t *ev);
static int cluster_refresh_expected(void *data, int interval, g_event_t *ev);

static void cluster_search_mode(cs_per_link_t *csp);
static void cluster_head_mode(cs_per_link_t *csp);
static void cluster_isolated_mode(cs_per_link_t *csp);

static int cluster_refresh_timeout(void *data, int interval, g_event_t *ev);

void cluster_set_election_timer(cs_per_link_t *csp, int elect, char *reason);
void cluster_kill_election_timer(cs_per_link_t *csp);

int cs_source_get_link_index(source_t *s)
{
  return s->flow_id.src_if;
}


lu_multi_link_t *cs_source_get_link(source_t *s)
{
  /* lookup link by flow_id index */
  cs_state_t *cs = (cs_state_t *)ssync_source_get_global_data(s);
  return lu_multi_index_link(cs->link_ref, cs_source_get_link_index(s));
}


cs_per_link_t *cs_source_get_per_link(source_t *s)
{
  lu_multi_link_t *link = cs_source_get_link(s);
  if (link) return (cs_per_link_t *)lu_multi_link_get_data(link);
  return NULL;
}


int cs_timer(int avg, int var)
{
  return random_range(1000*(avg - var), 1000*(avg + var));
}


int cs_refresh_timer(cluster_mode_t mode, int bits, int jitter)
{
  /* $$$$ add mode dependencies.. CH mode, etc */

  if (bits == 0)
    return CLUSTER_FAST_REF_MIN + jitter;
  else {
    uint32_t mult = 1;
    mult = mult << (bits-1);
    return (CLUSTER_REFRESH_BASE_SECS * mult) + jitter;
  }
}
    

source_t *cluster_lookup(cs_per_link_t *csp, flow_id_t *fid)
{
  source_t *src = ssync_source_lookup(csp->parent->ref, fid);

  /* if doesn't exist, create it but mark it hidden.. */
  if (src == NULL) {
    if (fid->src == csp->if_id && fid->src_if == csp->iface) {
      elog(LOG_WARNING, "NOT creating our own source!!");
      return NULL;
    }
    src = ssync_source_lookup_create(csp->parent->ref, fid, 0, 0);
    ssync_source_set_hidden(src, 1);
  }

  return src;
}


int cluster_source_is_parent(cs_per_link_t *csp, source_t *ptr)
{
  int i;
  
  for (i=0; i<CS_CLUSTER_MAX; i++) 
    if (ptr == csp->heads[i]) return 1;
  return 0;
}


int cluster_source_is_connected(cs_per_link_t *csp, source_t *src)
{
  cs_per_flow_t *pf = cs_maybe_init_source(src);

  if (src == csp->our_source) return 1;

  if (csp->our_clustering_state.bits.cluster_head) {
    if (pf->cluster_index.byte != 0 &&
	pf->cluster_index.ci.head == 0)
      return 1;
  }

  if (cluster_source_is_parent(csp, src))
    return 1;

  return 0;
}


void cluster_kill_timers(cs_per_link_t *csp)
{
  g_event_destroy(csp->elect_timer);
  g_event_destroy(csp->refresh_timer);

  source_t *ptr;
  for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
       ptr=ssync_sources_next(ptr)) {
    if (cs_source_get_link_index(ptr) == csp->iface &&
	!cluster_source_is_parent(csp, ptr)) {
      cs_per_flow_t *pf = cs_maybe_init_source(ptr);
      g_event_destroy(pf->expect_refresh_timer);
      g_event_destroy(pf->join_complete_timer);
      g_event_destroy(pf->rejection_refractory);
    }
  }
}

void cluster_reset_sources(cs_per_link_t *csp)
{
  source_t *ptr;
  for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
       ptr=ssync_sources_next(ptr)) {
    if (cs_source_get_link_index(ptr) == csp->iface && 
	!cluster_source_is_parent(csp, ptr)) {
      cs_per_flow_t *pf = cs_maybe_init_source(ptr);

      /* hide all sources but ours on this link */
      if (ptr != csp->our_source)
	ssync_source_set_hidden(ptr, 1);

      /* clear the clustering state */
      pf->outgoing_clustering_state.byte = 0;
      pf->cluster_index.byte = 0;
      pf->reject = 0;
      pf->warn = 0;
    }
  }
}


void cluster_update_join_bit(cs_per_link_t *csp)
{
  int joining = 0;
  
  int i;
  for (i=0; i<CS_CLUSTER_MAX; i++) 
    if (csp->heads[i]) {
      cs_per_flow_t *pf = cs_maybe_init_source(csp->heads[i]);
      if (pf->outgoing_clustering_state.bits.joining) 
	joining = 1;
    }
  
  cs_per_flow_t *pf = cs_maybe_init_source(csp->our_source);
  if (pf->outgoing_clustering_state.bits.joining) 
    joining = 1;

  csp->our_clustering_state.bits.joining = joining;
}


void cluster_set_join_bit(cs_per_link_t *csp, int join)
{
  cs_per_flow_t *pf = cs_maybe_init_source(csp->our_source);
  pf->outgoing_clustering_state.bits.joining = join;
  cluster_update_join_bit(csp);
}


void cluster_set_member_join_bit(cs_per_link_t *csp, cs_per_flow_t *pf, int join)
{
  pf->outgoing_clustering_state.bits.joining = join;
  cluster_update_join_bit(csp);
}



/*
 *  INDEX ASSIGNMENT
 */

int cluster_assign_cluster_index(cs_per_link_t *csp)
{
  cs_state_t *cs = csp->parent;

  cs_per_flow_t *pf = cs_maybe_init_source(csp->our_source);
  pf->cluster_index.ci.head = 1;

  /* choose random starting point */
  int stop_at = (random() % 126) + 1;
  pf->cluster_index.ci.index = stop_at;

 retry:

  /* inc */
  pf->cluster_index.ci.index++;
  if (pf->cluster_index.ci.index == 127) 
    pf->cluster_index.ci.index = 1;

  /* time to stop? */
  if (pf->cluster_index.ci.index == stop_at) {
    elog(LOG_CRIT, "ran out of cluster indices");
    pf->cluster_index.ci.index = 0;
    return -1;
  }

  if (cs->latest_table && cs->latest_table_count > 0) {    
    int i;
    for (i=0; i<cs->latest_table_count; i++) {
      if (cs->latest_table[i].header.flow_id.src_if == csp->iface) {
	if (cs->latest_table[i].cluster.index.byte == 
	    pf->cluster_index.byte) {
	  goto retry;
	}
      }
    }
  }

  /* $$$ check to see if this is the same as other clusters of this node
   * $$$ on other interfaces... */

  return 0;
}


int cluster_assign_member_index(cs_per_link_t *csp, cs_per_flow_t *pf)
{
  int count = 0;

 retry:
  if (count++ > 128) return -1;
  
  /* increment */
  csp->last_member_index++;
  if ((csp->last_member_index == 0) || (csp->last_member_index >= 127))
    csp->last_member_index = 1;

  pf->cluster_index.ci.head = 0;
  pf->cluster_index.ci.index = csp->last_member_index;
  
  /* check */
  source_t *ptr;
  for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
       ptr=ssync_sources_next(ptr)) {
    if (ptr->flow_id.src_if == csp->iface) {
      cs_per_flow_t *pf2 = cs_maybe_init_source(ptr);
      if (pf == pf2) continue;
      if (pf->cluster_index.byte == pf2->cluster_index.byte)
	goto retry;
    }
  }
  
  return 0;
}


/*
 *  REJECT
 */


void cluster_reject(cs_per_flow_t *pf, int report, char *reason)
{
  cs_per_link_t *csp = cs_source_get_per_link(pf->parent);

  if (csp->our_source == pf->parent) {
    elog(LOG_CRIT, "rejecting ourselves??");
    return;
  }
  elog(LOG_WARNING, "rejecting %s because %s", 
       print_if_id(pf->parent->flow_id.src), reason);
  
  /* set reject bit */
  pf->why_rejected = reason;
  pf->warn = 0;
  g_timer_add(CLUSTER_REJECT_REFRACT * 1000, NULL, NULL, NULL,
	      &(pf->rejection_refractory));

  /* clear cluster index */
  pf->cluster_index.byte = 0;

  /* kill expect timer */
  g_event_destroy(pf->expect_refresh_timer);
  g_event_destroy(pf->join_complete_timer);

  /* set hidden */
  ssync_source_set_hidden(pf->parent, 1);
  
  /* clear this head if we're a cluster member */
  int i;
  for (i=0; i<CS_CLUSTER_MAX; i++)
    if (csp->heads[i] == pf->parent)
      csp->heads[i] = NULL;

  /* report this rejection? */
  if (report) {
    pf->reject = 1;
    cluster_refresh_reset(csp, 0, 1, "rejecting node", NULL);
  }

  /* reeval? */
  cluster_reeval(csp);
}


/*
 *  CLUSTER JOIN
 */

int cluster_join_timeout(void *data, int interval, g_event_t *ev)
{
  cluster_reject((cs_per_flow_t *)data, 1, "join timed out");
  return TIMER_DONE;
}


void cluster_maybe_set_join_timer(cs_per_flow_t *pf)
{
  if (pf->join_complete_timer == NULL) {
    g_timer_add(CLUSTER_JOIN_COMPLETE * 1000, cluster_join_timeout, pf,
		NULL, &(pf->join_complete_timer));
  }
}


void cluster_join(cs_per_link_t *csp, source_t *src)
{
  cs_per_flow_t *pf = cs_maybe_init_source(src);  

  int i;
  for (i=0; i<CS_CLUSTER_MAX; i++)
    if (csp->heads[i] == NULL) {
      csp->heads[i] = src;
      goto set;
    }
  
  elog(LOG_CRIT, "can't join cluster.. no room in table");
  return;

 set:

  /* set the outgoing cluster state */
  pf->outgoing_clustering_state.byte = 0;
  cluster_set_member_join_bit(csp, pf, 1);  

  /* enable this source */
  ssync_source_set_hidden(src, 0);
  
  /* set the join timer */
  g_event_destroy(pf->join_complete_timer);
  cluster_maybe_set_join_timer(pf);

  /* reset the backoff */
  cluster_refresh_reset(csp, 1, 1, "Trying to join", NULL);

  elog(LOG_NOTICE, "trying to join cluster head %s", 
       print_if_id(src->flow_id.src));
}


/*
 *   REEVALUATE
 */

void cluster_reeval(cs_per_link_t *csp)
{
  lu_multi_link_t *link = lu_multi_index_link(csp->parent->link_ref, csp->iface);
  cs_state_t *cs = csp->parent;
  cs_per_flow_t *our_pf = cs_maybe_init_source(csp->our_source);
  
  static int in_reeval = 0;
  if (in_reeval) return;
  in_reeval = 1;

  /*
   * check cluster table:
   *   detect acked joiners and assign indices
   *   detect cluster index collisions,
   *   store cluster_info
   */

  /* 
   * Preprocess neighbors: 
   *   + clear mark bits, 
   *   + update conn values,
   *   + count number of propects for a new cluster
   */

  source_t *ptr;
  csp->prospect_count = 0;
  csp->connect_count = 0;
  csp->max_reported_prospects = 0;
  csp->competitors = 0;
  for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
       ptr=ssync_sources_next(ptr)) {
    cs_per_flow_t *pf = cs_maybe_init_source(ptr);

    /* clear mark */
    pf->mark = 0;

    /* update connectivity unless we think this guy is dead */
    if (!pf->dead) {
      if (!cs->use_rssi)
	pf->last_conn = lu_multi_link_get_conn(link, pf->parent->flow_id.src);
    }
    else
      pf->last_conn = 0;

    /* if source is on our interface and not ourselves... */
    if (ptr->flow_id.src_if == csp->iface && !ssync_source_is_local(ptr)) {

      /*
       * if this source is a potential neighbor:
       *    + this node has the min required connectivity to join
       *    + if it is not isolated
       */

      if (pf->last_conn >= CLUSTER_MIN_JOIN_CONN &&
	  !pf->reported_clustering_state.bits.isolated) {

	/* find maxmimum reported prospect count among non-CHs, and
	 * count competitors */
	if (pf->reported_clustering_state.bits.cluster_head == 0) {
	  if (pf->reported_clustering_state.bits.member_count > 
	      csp->max_reported_prospects) {
	    csp->max_reported_prospects = pf->reported_clustering_state.bits.member_count;
	    csp->competitors = 1;
	    csp->for_instance = ptr->flow_id.src;
	  }
	  else if (pf->reported_clustering_state.bits.member_count == 
		   csp->max_reported_prospects) {
	    csp->competitors++;
	  }
	}
	
	/*
	 *  count as a prospect if 
	 *    + if we are a CH, this source is also a CH
	 *    + it does not appear in our tables
	 */
	
	if (csp->our_clustering_state.bits.cluster_head == 0 ||
	    (csp->our_clustering_state.bits.cluster_head == 1 &&
	     pf->reported_clustering_state.bits.cluster_head == 1)) {
	
	/* search the table for this node */
	int i;
	for (i=0; i<cs->latest_table_count; i++) {
	  /* if this is on this interface and the if ids match */
	  if (cs->latest_table[i].header.flow_id.src_if == csp->iface &&
	      cs->latest_table[i].cluster.if_id == ptr->flow_id.src) {
	    /* count this as a connected node */
	    csp->connect_count++;
	    goto found;
	  }
	}
	
	/* dear mr. prospect, */
	csp->prospect_count++;
	pf->is_prospect = 1;
	goto next_source_1;
	}
	
      found:
	pf->is_prospect = 0;
      }

      /* no prospect if bad connectivity */
      else
	pf->is_prospect = 0;
    }

  next_source_1:
    continue;
  }

#if 0
  /* metric weights prospects by total number of potential prospects */
  /* $$$ and sqrt to give more dynamic range at the low end? */
  csp->connect_metric = 16 * 
    (float)csp->prospect_count / 
    (float)(csp->connect_count + csp->prospect_count);
#else
  csp->connect_metric = csp->prospect_count - (csp->connect_count / 2);
  if (csp->connect_metric < 0)
    csp->connect_metric = 0;
#endif
  
  /*
   * Pass 1: we run thru the table and
   *   + if we are a CH, check for index collisions with other cluster heads 
   *   + save cluster info from possible members
   *   + if we are a CH, assign indices to new members 
   */

  int i;
  for (i=0; i<cs->latest_table_count; i++) {
    
    /* lookup the source of this record */
    source_t *ptr = ssync_source_lookup(csp->parent->ref, &(cs->latest_table[i].header.flow_id));
    if (ptr == NULL) {
      elog(LOG_WARNING, "no source for this record??");
      continue;
    }
    
    cs_per_flow_t *pf = cs_maybe_init_source(ptr);
    
    /* if we are a cluster head, check for an index collision */
    if (csp->our_clustering_state.bits.cluster_head) {
      if (cs->latest_table[i].cluster.node_id != my_node_id &&
	  cs->latest_table[i].cluster.index.ci.head &&
	  cs->latest_table[i].cluster.index.byte &&
	  cs->latest_table[i].cluster.index.byte == our_pf->cluster_index.byte &&
	  cs->latest_table[i].cluster.node_id < my_node_id) {
	cluster_assign_cluster_index(csp);
      }
    }

    /* if this record is ABOUT the source, and not our source */
    if (ptr->flow_id.src == cs->latest_table[i].cluster.if_id &&
	!ssync_source_is_local(ptr)) {
      
      /* copy over the cluster info */
      memmove(&(pf->cluster_info), &(cs->latest_table[i].cluster), sizeof(pf->cluster_info));

      /* is this our parent? */
      int is_parent = cluster_source_is_parent(csp, ptr);

      /* if we are a cluster head, and
       *   if this node is reporting as a cluster member, 
       *   or if it's a head and not one of our parents, 
       *   assign a new index to it if needed */
      if (csp->our_clustering_state.bits.cluster_head) {
	if ((pf->reported_clustering_state.bits.cluster_head == 0) || !is_parent) {
	  if (pf->cluster_index.byte == 0 || pf->cluster_index.ci.head)
	    cluster_assign_member_index(csp, pf);
	}
      }

      /* if we are not a cluster head, or it's our parent, then
       *  if it reported being a clusterhead then copy, else zero */
      if (csp->our_clustering_state.bits.cluster_head == 0 || is_parent) {
	if (pf->reported_clustering_state.bits.cluster_head &&
	    pf->cluster_info.index.byte != 0 && pf->cluster_info.index.ci.head)
	  pf->cluster_index.byte = pf->cluster_info.index.byte;
	else
	  pf->cluster_index.byte = 0;
      }
    }
  }

  /* 
   * Pass 2: check for acked JOINs
   *    + if a cluster-head, when we receive an entry that describes US, and we
   *      have already assigned a cluster index to the sender, we finalize
   *      the join.
   *    + if a cluster-member, when we receive an entry that describes US,
   *      we know that we've been accepted to the cluster and assigned an
   *      address
   */

  for (i=0; i<cs->latest_table_count; i++) {
    
    /* lookup the source of this record */
    source_t *ptr = ssync_source_lookup(csp->parent->ref, &(cs->latest_table[i].header.flow_id));
    if (ptr == NULL) {
      elog(LOG_WARNING, "no source for this record??");
      continue;
    }

    cs_per_flow_t *pf = cs_maybe_init_source(ptr);
    
    elog(LOG_DEBUG(0), "got record data.. %s: %s %s", 
	 ssync_flowid_to_str(&(cs->latest_table[i].header.flow_id)),
	 print_if_id(cs->latest_table[i].cluster.node_id), 
	 print_if_id(cs->latest_table[i].cluster.if_id));
      
    /* if this record is about US, but not FROM us,
     * we mark the source it's FROM -- acked join */
    if (csp->our_source != ptr && 
	cs->latest_table[i].cluster.node_id == my_node_id &&
	cs->latest_table[i].cluster.if_id == csp->if_id) {

      /* if we're a clusterhead, but we haven't assigned this member
       * an index already, skip it... */
      if (csp->our_clustering_state.bits.cluster_head && 
	  pf->cluster_index.byte == 0)
	goto next_entry;

      /* otherwise mark, unset the join bit and kill the join timer */
      pf->mark = 1;
      if (pf->join_complete_timer) {
	elog(LOG_NOTICE, "join complete: %s->%d", 
	     print_if_id(ptr->flow_id.src), my_node_id);
	g_event_destroy(pf->join_complete_timer);
      }
      cluster_set_member_join_bit(csp, pf, 0);
    }

  next_entry:
    continue;
  }

  /* 
   * Now, check for members/ch's that are unmarked, and reject them
   */

  /* if cluster head, check through sources */
  if (csp->our_clustering_state.bits.cluster_head) {
    source_t *ptr;
    for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
	 ptr=ssync_sources_next(ptr)) {
      cs_per_flow_t *pf = cs_maybe_init_source(ptr);

      /* if unmarked, and iface matches, and it's not ourselves, 
       * and it's not still joining, and we've assigned an index,
       & and it's not one of our parents */
      if (pf->mark == 0 && 
	  ptr->flow_id.src_if == csp->iface &&
	  ptr != csp->our_source &&
	  pf->join_complete_timer == NULL &&
	  pf->cluster_index.byte &&
	  !cluster_source_is_parent(csp, ptr)) {
	
	/* drop it */
	elog(LOG_NOTICE, "dropping member %s", print_if_id(ptr->flow_id.src));
	cluster_reject(pf, 1, "not joining but missing from table");
      }
    }
  }

  /* if cluster member, just check the clusters array */
  for (i=0; i<CS_CLUSTER_MAX; i++) {
    if (csp->heads[i]) {
      cs_per_flow_t *pf = cs_maybe_init_source(csp->heads[i]);
      
      /* it's not marked and our join completed.. */
      if (pf->mark == 0 &&
	  pf->join_complete_timer == NULL) {
	cluster_reject(pf, 1, "join completed and not present in table");
      }
    }
  }

  /*
   *  NOW, see if we can join anything... or should drop anything...
   */

  /* 
   *  Assess our position as a cluster MEMBER
   *    + count current number of heads and min connectivity value 
   *    + reject bad cluster heads
   */

  int head_count=0;
  int min_conn_value=0;
  int min_conn_index=-1;
  for (i=0; i<CS_CLUSTER_MAX; i++) {
    if (csp->heads[i]) {
      head_count++;
      
      cs_per_flow_t *pf = cs_maybe_init_source(csp->heads[i]);
      
      /* find min connectivity of current heads */
      int conn = pf->last_conn;
      
#ifdef NEEDY_CLUSTERS
      /* bump up the connectivity rating for "needy clusters" */
      if (pf->reported_clustering_state.bits.member_count < CLUSTER_JOIN_MEM) 
	conn += CLUSTER_NEEDY_CONN_BUMP;
#endif

      /* reject this parent if it's bad */
      if (conn < CLUSTER_DROP_CONN) {
	cluster_reject(pf, 1, "bad connectivity to cluster head");
      }

      /* otherwise count the min parent */
      else {
	if (min_conn_index < 0 || min_conn_value > conn) {
	  min_conn_index = i;
	  min_conn_value = conn;
	}	
      }
    }
  }
  csp->head_count = head_count;


  /* if we are a cluster HEAD... */
  if (csp->our_clustering_state.bits.cluster_head) {

    /* update members count, drop if poor connectivity */  
  repeat:
    csp->member_count = 0;
    source_t *ptr;
    source_t *cluster_min_conn = NULL;
    int cluster_min_conn_value = -1;
    for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
	 ptr=ssync_sources_next(ptr)) {
      if (cs_source_get_link_index(ptr) == csp->iface &&
	  csp->our_source != ptr &&
	  !cluster_source_is_parent(csp, ptr)) {
	cs_per_flow_t *pf = cs_maybe_init_source(ptr);
	
	/* if this is in our cluster.. */
	if (pf->cluster_index.byte) {
	  int conn = pf->last_conn;

	  if (pf->cluster_index.ci.head) 
	    elog(LOG_CRIT, "cluster head should be cluster member");

	  /* reject now if it's crap */
	  if (conn < CLUSTER_DROP_CONN) 
	    cluster_reject(pf, 1, "bad connectivity to cluster head");

	  /* else count it and record lowest acceptable conn value */
	  else {	
	    csp->member_count++;
	    if ((cluster_min_conn == NULL || cluster_min_conn_value > conn) &&
		(pf->reported_clustering_state.bits.member_count > 1)) {
	      cluster_min_conn_value = conn;
	      cluster_min_conn = ptr;
	    }
	  }
	}
      }
    }

    /* drop lowest if over max */
    if (csp->member_count > CLUSTER_MAX_MEM) {
      if (cluster_min_conn) {
	cluster_reject(cs_maybe_init_source(cluster_min_conn), 
		       1, "over max cluster size");
	goto repeat;
      }
    }
    
    /* if we are joining:
     *   if above min, stabilize now */
    if (csp->our_clustering_state.bits.searching) {
      if (csp->member_count >= CLUSTER_JOIN_MEM) {
	/* stabilize now */
	cluster_set_join_bit(csp, 0);
	cluster_kill_election_timer(csp);
      }
    }

    /* if we are stable but we drop below min,
     * transition to joining */
    else {
      if (csp->member_count < CLUSTER_DROP_MEM) {
	/* destabilize now */
	cluster_set_join_bit(csp, 1);
	cluster_set_election_timer(csp, 0, "Head's member count below drop min");
      }
    }
  }

  /*
   *  Choose to join another cluster?
   */
    
  /* find the best new clusterhead */
  source_t *max_conn = NULL;
  int max_conn_value = -1;
  int max_conn_members = -1;
  for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
       ptr=ssync_sources_next(ptr)) {
    
    cs_per_flow_t *pf = cs_maybe_init_source(ptr);
    
    /* try to join if 
     *   + it's on our link and not ourselves 
     *   + it's a cluster head, and it's a prospect
     *   + we're not already joining 
     *   + it's not recently rejected
     */

    if (cs_source_get_link_index(ptr) == csp->iface &&
	ptr != csp->our_source && 
	pf->reported_clustering_state.bits.cluster_head &&
	pf->is_prospect &&
	!pf->join_complete_timer &&
	!pf->rejection_refractory) {
      
      /* skip if already selected? */
      if (cluster_source_is_parent(csp, ptr))
	goto next_source_2;
	
      /* check connectivity */
      int conn = pf->last_conn;
      
      /* record highest conn value */
      if ((conn > CLUSTER_MIN_JOIN_CONN) && 
	  ((max_conn == NULL || conn > max_conn_value))) {
	max_conn_value = conn;
	max_conn_members = pf->reported_clustering_state.bits.member_count;
	max_conn = ptr;
      }
    }
      
  next_source_2:
    continue;
  }

  /* if we found an acceptable CH */
  if (max_conn) {
    
    elog(LOG_DEBUG(0), "found possible CH %s to join: %d",
	 print_if_id(max_conn->flow_id.src), max_conn_value);
    
    /* if the new one is better, or if we need cluster heads, 
     * or if this CH needs us */
    
    if ((((max_conn_value - min_conn_value) > CLUSTER_CHANGE_CONN) && 
	 min_conn_value < CLUSTER_MIN_JOIN_CONN) ||
	head_count < (csp->parent->mobile_node ? 1 : CLUSTER_MIN_HEADS) ||
	max_conn_members < CLUSTER_JOIN_MEM) {
      
      /* if we're maxed out, reject the crappiest one */
      if (head_count >= (csp->parent->mobile_node ? 1 : CS_CLUSTER_MAX)) {
	cluster_reject(cs_maybe_init_source(csp->heads[min_conn_index]),
		       1, "rejecting to make room");
	csp->head_count--;
      }
	
      /* now join the new one */
      cluster_join(csp, max_conn);
      csp->head_count++;
    }
  }
  
  /* if we are NOT a cluster head, update searching mode */
  if (!csp->our_clustering_state.bits.cluster_head) {

    /* maybe (re)start election timer if we have the most prospects */
    if (csp->connect_metric > CLUSTER_MIN_OK_METRIC &&
	csp->connect_metric >= csp->max_reported_prospects) {
      
      /* if we were not searching */
      if (!csp->our_clustering_state.bits.searching) {
	cluster_set_election_timer(csp, 1, "we have max prospects, not searching");
      }

      /* or if we see an increase in competitors */
      else if (csp->competitors > (csp->prev_competitors + CLUSTER_COMPETE_CHANGE))
	cluster_set_election_timer(csp, 1, "competitor count change");
    }
    
    /* otherwise, stop election timer */
    else {
      cluster_kill_election_timer(csp);
    }    
  }

  /* 
   * check for isolation (no neighbors with valid connectivity)
   *   if we are isolated, keep resetting the election timer
   *   $$$ might make sense to give up after a while
   */

  int isolated = 0;
  if (csp->member_count == 0 && csp->head_count == 0) {
    source_t *ptr;
    isolated = 1;
    for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
	 ptr=ssync_sources_next(ptr)) {
      cs_per_flow_t *pf = cs_maybe_init_source(ptr);
      if (pf->last_conn > CLUSTER_MIN_JOIN_CONN) {
	isolated = 0;
	break;
      }
    }
  }
   
  /* if we're isolated, no election timer.. */
  if (isolated) {
    cluster_isolated_mode(csp);
  }
  else {
    /* if we had been isolated enter search mode now */
    if (csp->our_clustering_state.bits.isolated) {
      cluster_search_mode(csp);
    }
  }

  /* update clustering state */

  /* $$$ ugly:
   *       setting bits all over the place
   *       two copies, one in csp one in our_source
   */

  /* update the member count entry and out source's 'outgoing' entry */
  int membership_value = 
    csp->our_clustering_state.bits.cluster_head ? 
    csp->member_count : csp->connect_metric;
  csp->our_clustering_state.bits.member_count = 
      (membership_value < 16) ? membership_value : 15;

  /* update the table */
  cs_table_schedule_repub(csp->parent);
  
  /* update the status device */
  cluster_status_update(csp);

  in_reeval = 0;
}


/*
 *  PROCESS CLUSTER MSG
 */

void cluster_process_msg(cs_per_link_t *csp, source_t *src, 
			 char *msg, int length, int rssi)
{
  cs_per_flow_t *pf = cs_maybe_init_source(src);

  if (length < sizeof(cluster_hdr_t)) {
    elog(LOG_WARNING, "cluster message too short");
    goto done;
  }
  cluster_hdr_t *hdr = (cluster_hdr_t *)msg;

  /* store the new state */
  pf->reported_clustering_state = hdr->mode;

  /* reset warn counter */
  if (pf->warn_count > 0) 
    elog(LOG_DEBUG(0), "Resetting warn counter for %d", src->flow_id.src);
  pf->warn_count = 0;
  pf->warn = 0;

  /* undead.. */
  pf->dead = 0;

  /* save rssi if we're using it for link est */
  if (csp->parent->use_rssi) {
    pf->last_conn = rssi;
  }

  /* process joins directed at us */
  int remain = length - sizeof(cluster_hdr_t);
  char *buf = msg + sizeof(cluster_hdr_t);
  while (remain > 0) {
    int ctrl_byte = buf[0];

    buf++;
    remain--;

    if (remain >= sizeof(if_id_t)) {
      if_id_t id;
      
      memmove(&id, buf, sizeof(id));
      buf += sizeof(id);
      remain -= sizeof(id);

      /* is it us? */
      if (id == csp->if_id) {
	switch (ctrl_byte) {
	case CLUSTER_MSG_JOIN:
	  /* process only if we're a clusterhead */
	  if (csp->our_clustering_state.bits.cluster_head) {
	    
	    elog(LOG_WARNING, "processing JOIN request from %s, total members %d",
		 print_if_id(src->flow_id.src), csp->member_count);
	    
	    /* reject on join if below min conn level */
	    int conn = pf->last_conn;
	    if (conn < CLUSTER_MIN_JOIN_CONN) 
	      cluster_reject(pf, 1, "bad connectivity to cluster member");
	    
	    else {
	      /* unhide this source */
	      ssync_source_set_hidden(src, 0);	
	      cluster_maybe_set_join_timer(pf);
	    }
	  }
	  else {
	    elog(LOG_WARNING, "JOIN request, but we're not a clusterhead");
	  }
	  break;

	case CLUSTER_MSG_WARN:
	  elog(LOG_WARNING, "processing WARN request from %s",
	       print_if_id(src->flow_id.src));	  
	  cluster_refresh_reset(csp, 0, 1, "replying warning", NULL);
	  break;

	case CLUSTER_MSG_REJECT:
	  elog(LOG_WARNING, "processing REJECT request from %s",
	       print_if_id(src->flow_id.src));
	  cluster_reject(pf, 0, "rejected by peer");
	  break;

	default:
	  elog(LOG_WARNING, "processing UNKNOWN request from %s",
	       print_if_id(src->flow_id.src));
	}
	
      }
    }
  }

  /* clear/reset expected refresh timer */
  if (hdr->refresh_backoff != 0 || hdr->refresh_jitter != 0) {
    int next_refresh = 
      cs_refresh_timer(hdr->mode, hdr->refresh_backoff, hdr->refresh_jitter) + 2;
    elog(LOG_DEBUG(0), "resetting src %d refresh timer for %d", 
	 src->flow_id.src, next_refresh);
    g_event_destroy(pf->expect_refresh_timer);
    g_timer_add(next_refresh * 1000, 
		cluster_refresh_expected, pf, NULL, &(pf->expect_refresh_timer));
  }

  else {
    elog(LOG_DEBUG(0), "no new refresh time defined");
  }

  cluster_reeval(csp);

 done:
  return;
}


/*
 *  REFRESH TIMER
 */


static
int cluster_refresh_expected(void *data, int interval, g_event_t *ev)
{
  cs_per_flow_t *pf = (cs_per_flow_t *)data;
  cs_per_link_t *csp = cs_source_get_per_link(pf->parent);
  int in_cluster = cluster_source_is_connected(csp, pf->parent);
  
  /* we expected a refresh but didn't get it.. */
  if (pf->cluster_index.byte)
    elog(LOG_WARNING, "expected a refresh for %s but didn't get one", 
	 print_if_id(pf->parent->flow_id.src));
    
  /* increment warning count, and if it's in our cluster request warning */
  if (in_cluster) pf->warn = 1;
  pf->warn_count++;  

  /* too many warnings? */
  if (pf->warn_count >= CLUSTER_MAX_WARNINGS) {

    /* hide this neighbor and mark as dead */
    ssync_source_set_hidden(pf->parent, 1);
    pf->dead = 1;

    /* if this was actually a member, reject it */
    if (in_cluster) {
      cluster_reject(pf, 1, "too many warnings");
    }

    return TIMER_DONE;
  }
  else {

    /* if this is in our cluster, trigger a fast refresh sending a warning */
    if (in_cluster) {
      cluster_refresh_reset(csp, 0, 1, "issuing warning", NULL);
    }
  }

  return TIMER_RENEW;
}


int cluster_refresh_reset(cs_per_link_t *csp, int reset_back, 
			  int fast, char *reason, cluster_hdr_t *set_header)
{
  if (reset_back) {
    elog(LOG_NOTICE, "Resetting backoff, because %s", reason);
    csp->refresh_bits = 0;
  }

  if (set_header) {
    set_header->refresh_backoff = 0;
    set_header->refresh_jitter = 0;
  }

  int delay = 0;

  /* fast mode.. go immediately */
  if (fast) {
    elog(LOG_NOTICE, "Fast refresh!");
  }

  else {
    int base_delay;
    int jitter;

    /* do we back off? */
    if ((csp->our_clustering_state.bits.joining == 0) &&
	(csp->our_clustering_state.bits.isolated == 0) &&
	(csp->our_clustering_state.bits.searching == 0)) {
      
      if (set_header) {
	set_header->refresh_backoff = csp->refresh_bits;
      }

      base_delay = 
	cs_refresh_timer(csp->our_clustering_state, csp->refresh_bits, 0);
      jitter = random_range(0, CLUSTER_FAST_REF_VAR * 1000);

      /* increment the backoff exponent */
      int refresh_max = 
	((csp->our_clustering_state.bits.cluster_head == 0) ?
	 CLUSTER_REFRESH_MAX_EXP : CLUSTER_REFRESH_MAX_HEAD_EXP);

      if (csp->refresh_bits < refresh_max)
	csp->refresh_bits++;
      else 
	csp->refresh_bits = refresh_max;
    }
    
    /* otherwise use this standard timing */
    else {
      
      /* in isolated mode, ultra-fast beacons */
      if (csp->our_clustering_state.bits.isolated) {
	base_delay = CLUSTER_ISOLATED_REF;
	jitter = random_range(0, 1000);
      }
      
      /* in other cases, fast beacons */
      else {
	base_delay = CLUSTER_FAST_REF_MIN;
	jitter = random_range(0, CLUSTER_FAST_REF_VAR * 1000);
      }

    }
    
    if (set_header) {
      set_header->refresh_jitter = (jitter / 1000) + 1;
    }
    
    delay = base_delay*1000 + jitter;
  }

  /* if the timer is already set, don't set the refresh for further in future */
  if (csp->refresh_timer) {
    int curr_timer = g_timer_time_remaining(csp->refresh_timer);
    if (delay > curr_timer) {

      /* blank header entry means "same as before" */
      if (set_header) {
	set_header->refresh_backoff = 0;
	set_header->refresh_jitter = 0;
      }

      return 0;
    }
  }

  /* otherwise, reset refresh timer */
  g_event_destroy(csp->refresh_timer);
  g_timer_add(delay, cluster_refresh_timeout, csp, NULL, &(csp->refresh_timer));
  
  return 0;
}


static
int cluster_refresh_timeout(void *data, int interval, g_event_t *ev)
{
  cs_per_link_t *csp = (cs_per_link_t *)data;

  /* set refresh bit */
  csp->our_source->log.refresh_pending = 1;

  return TIMER_DONE;  
}


void cluster_clear_cluster_msg(cs_per_link_t *csp)
{
  if (csp->outgoing_cluster_msg) 
    buf_free(csp->outgoing_cluster_msg);
  csp->outgoing_cluster_msg = NULL;

  /* send rejections, warnings and reset expected timers */
  source_t *ptr;
  for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
       ptr=ssync_sources_next(ptr)) {
    cs_per_flow_t *pf = cs_maybe_init_source(ptr);

    if (pf->warn) {
      pf->warn = 0;
      g_event_destroy(pf->expect_refresh_timer);
      g_timer_add(CLUSTER_REF_POST_WARN * 1000, 
		  cluster_refresh_expected, pf, NULL, &(pf->expect_refresh_timer));
    }

    if (pf->reject) {
      pf->reject = 0;
    }    
  }
}


void cluster_update_cluster_msg(cs_per_link_t *csp)
{
  /* check latest */
  cluster_reeval(csp);
  
  /* construct a clustering message */
  if (csp->outgoing_cluster_msg) 
    buf_free(csp->outgoing_cluster_msg);
  csp->outgoing_cluster_msg = buf_new();
  
  /* construct cluster header */
  cluster_hdr_t hdr = {
    mode: csp->our_clustering_state,
  };
  
  /* reset the refresh timer */
  cluster_refresh_reset(csp, 0, 0, "", &hdr);
  
  /* push the clustering message header */
  bufcpy(csp->outgoing_cluster_msg, &hdr, sizeof(hdr));
  elog(LOG_INFO, "sending clustering message: %d,%d", 
       hdr.refresh_backoff, hdr.refresh_jitter);
  
  /* push any join addresses */
  int i;
  cs_per_flow_t *pf;
  for (i=0; i<CS_CLUSTER_MAX; i++) {
    if (csp->heads[i]) {
      pf = cs_maybe_init_source(csp->heads[i]);
      if (pf->outgoing_clustering_state.bits.joining) {
	elog(LOG_WARNING, "emitting JOIN request");
	uint8_t tmp = CLUSTER_MSG_JOIN;
	bufcpy(csp->outgoing_cluster_msg, &tmp, sizeof(tmp));
	bufcpy(csp->outgoing_cluster_msg, &(pf->parent->flow_id.src), 
	       sizeof(if_id_t));
      }
    }
  }
  
  /* send rejections, warnings and reset expected timers */
  source_t *ptr;
  for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
       ptr=ssync_sources_next(ptr)) {
    pf = cs_maybe_init_source(ptr);
    
    /* warning needed? */
    if (pf->warn) {
      elog(LOG_WARNING, "emitting WARNING to %s", 
	   print_if_id(pf->parent->flow_id.src));
      uint8_t tmp = CLUSTER_MSG_WARN;
      bufcpy(csp->outgoing_cluster_msg, &tmp, sizeof(tmp));
      bufcpy(csp->outgoing_cluster_msg, &(pf->parent->flow_id.src), 
	     sizeof(if_id_t));
    }

    /* rejection needed? */
    if (pf->reject) {
      elog(LOG_WARNING, "emitting REJECT to %s", 
	   print_if_id(pf->parent->flow_id.src));
      uint8_t tmp = CLUSTER_MSG_REJECT;
      bufcpy(csp->outgoing_cluster_msg, &tmp, sizeof(tmp));
      bufcpy(csp->outgoing_cluster_msg, &(pf->parent->flow_id.src), 
	     sizeof(if_id_t));      
    }
  }
}


/*
 *  ELECTION TIMER
 */

/* when election timer expires, switch between searching and cluster head */
static
int cluster_elect_timeout(void *data, int interval, g_event_t *ev)
{
  cs_per_link_t *csp = (cs_per_link_t *)data;

  if (csp->our_clustering_state.bits.searching) {
    if (csp->our_clustering_state.bits.cluster_head || csp->parent->mobile_node) {
      elog(LOG_NOTICE, "Un-elect timer expired: going back to searching");
      cluster_search_mode(csp);
    }
    
    else {
      elog(LOG_NOTICE, "Election timer expired: becoming cluster head");
      cluster_head_mode(csp);
    }
  }

  else {
    elog(LOG_NOTICE, "Election timer expired: ignoring -- NOT SEARCHING");
  }

  return TIMER_DONE;
}


void cluster_kill_election_timer(cs_per_link_t *csp)
{
  /* force fast refresh on election timer change */
  if (csp->our_clustering_state.bits.searching) {
    cluster_refresh_reset(csp, 0, 1, "killed election timer", NULL);
  }

  elog(LOG_DEBUG(0), "Killing election timer");
  csp->our_clustering_state.bits.searching = 0;
  g_event_destroy(csp->elect_timer);
}


void cluster_set_election_timer(cs_per_link_t *csp, int elect, char *reason)
{
  int delay = CLUSTER_UNELECT * 1000;

  if (elect) {
    /* min delay of PER_COMP to allow for message arrival */
    /* $$$ ought to be exponential distribution */
    delay = cs_timer((csp->competitors + 3) * CLUSTER_ELECT_PER_COMP,
		     (csp->competitors + 1) * CLUSTER_ELECT_PER_COMP);
  }

  elog(LOG_NOTICE, "Resetting election timer for %.1f sec, because %s",
       (float)delay / 1000.0, reason);

  /* force fast refresh if we are starting to search */
  if (!csp->our_clustering_state.bits.searching) {
    cluster_refresh_reset(csp, 0, 1, "set election timer", NULL);
  }

  /* set searching bit and election timer */
  csp->our_clustering_state.bits.searching = 1;
  csp->prev_competitors = csp->competitors;
  g_event_destroy(csp->elect_timer);
  g_timer_add(delay, cluster_elect_timeout, csp, NULL, &(csp->elect_timer));
}


/*
 *   CLUSTER HEAD MODE
 */


static
void cluster_head_mode(cs_per_link_t *csp)
{
  /* set mode variable */
  csp->our_clustering_state.byte = 0;
  csp->our_clustering_state.bits.cluster_head = 1;
  cluster_set_join_bit(csp, 1);

  /* kill all timers, hide sources */
  cluster_kill_timers(csp);
  cluster_reset_sources(csp);

  /* assign random index */
  cluster_assign_cluster_index(csp);
  
  /* set election timer */
  cluster_set_election_timer(csp, 0, "Entering cluster head mode");
  
  /* set refresh timer */
  cluster_refresh_reset(csp, 1, 1, "Entered cluster head mode", NULL);

  cluster_status_update(csp);
}


/*
 *   ISOLATED MODE
 */


static
void cluster_isolated_mode(cs_per_link_t *csp)
{
  if (!csp->our_clustering_state.bits.isolated) {
    /* set mode variable */
    csp->our_clustering_state.byte = 0;
    csp->our_clustering_state.bits.isolated = 1;
    csp->our_clustering_state.bits.searching = 1;
      
    /* kill all timers, hide sources */
    cluster_kill_timers(csp);
    cluster_reset_sources(csp);
    
    /* set refresh timer */
    cluster_refresh_reset(csp, 1, 0, "Entered isolated mode", NULL);

    cluster_status_update(csp);
  }
}


/*
 *   SEARCH MODE
 */


static
void cluster_search_mode(cs_per_link_t *csp)
{
  /* set mode variable */
  csp->our_clustering_state.byte = 0;

  /* kill all timers, hide sources */
  cluster_kill_timers(csp);
  cluster_reset_sources(csp);
  
  /* set refresh timer */
  cluster_refresh_reset(csp, 1, 0, "Entered search mode", NULL);

  cluster_status_update(csp);
}


/*
 *  cluster status device
 */


char *cluster_mode_to_str(cluster_mode_t mode)
{
  DECLARE_STATIC_BUF_RING(buf, 10, 32);
  if (mode.byte == 0)
    sprintf(buf, "None");
  else
    sprintf(buf, "%s%s%s%s(%d)",
	    mode.bits.cluster_head ? "Head" : "Member",
	    mode.bits.isolated ? ",Isolated" : "",
	    mode.bits.searching ? ",Searching" : "",
	    mode.bits.joining ? ",Joining:" : "",
	    mode.bits.member_count
	    );
  return buf;
}


static
int cluster_mode_status_print(status_context_t *info, buf_t *buf)
{
  cs_per_link_t *csp = (cs_per_link_t *) sd_data(info);
  cs_per_flow_t *our_pf = cs_maybe_init_source(csp->our_source);      

  cluster_mode_t mode = {
    byte: csp->our_clustering_state.byte
  };

  if (mode.bits.isolated) {
    bufprintf(buf, "I(%.1f)",
	      g_timer_time_remaining(csp->refresh_timer) / 1000.0);
  }
  else {
    if (mode.bits.cluster_head) {
      bufprintf(buf, "H[%s]", cluster_map_index_to_str(our_pf->cluster_index));
    }
    else {
      bufprintf(buf, "M:%d/%d", 
		csp->prospect_count, csp->connect_count);
    }

    bufprintf(buf, "(%d)",
	      mode.bits.member_count);

    if (mode.bits.joining)
      bufprintf(buf, "J");

    if (mode.bits.searching) {
      bufprintf(buf, "S(%.1f)",
		g_timer_time_remaining(csp->elect_timer) / 1000.0);
    }
    else {
      if (csp->elect_timer) {
	bufprintf(buf, "S***(%.1f)",
		  g_timer_time_remaining(csp->elect_timer) / 1000.0);
      }
      else
	if (mode.bits.cluster_head == 0 && 
	    csp->connect_metric > CLUSTER_MIN_OK_METRIC &&
	    csp->competitors > 0) {
	  bufprintf(buf, "W(%s)", print_if_id(csp->for_instance));
	}
    }
  }
  bufprintf(buf, "\n");
    
  return STATUS_MSG_COMPLETE;
}


static
int cluster_status_print(status_context_t *info, buf_t *buf)
{
  cs_per_link_t *csp = (cs_per_link_t *) sd_data(info);
  cs_state_t *cs = csp->parent;

  bufprintf(buf, "Cluster Sync Clustering Status, node %s\n", 
	    print_if_id(my_node_id));
  bufprintf(buf, "[if_class=%s, neighbors=%s]\n",
	    cs->if_class, cs->neighbors);

  lu_multi_link_t *l;
  for (l=lu_multi_links_top(cs->link_ref); l; l=lu_multi_links_next(l)) {
    cs_per_link_t *csp = (cs_per_link_t *)lu_multi_link_get_data(l);
    bufprintf(buf, "iface=%d, if_id=%s\n", csp->iface, print_if_id(csp->if_id));
    bufprintf(buf, "=========================\n");
    cs_per_flow_t *pf = cs_maybe_init_source(csp->our_source);
    bufprintf(buf, "Mode: %s, index %s",
	      cluster_mode_to_str(csp->our_clustering_state),
	      cluster_map_index_to_str(pf->cluster_index));

    if (csp->elect_timer) 
      bufprintf(buf, ", elect in %.3fs", 
		g_timer_time_remaining(csp->elect_timer) / 1000.0);

    if (csp->refresh_timer) 
      bufprintf(buf, ", refresh in %.3fs", 
		g_timer_time_remaining(csp->refresh_timer) / 1000.0);
    
    bufprintf(buf, "\n\n");

    int pass;
    for (pass=0; pass<2; pass++) {
      source_t *ptr;
      for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
	   ptr=ssync_sources_next(ptr)) {

	if (pass == 0 && ptr->hidden) continue;
	if (pass == 1 && !ptr->hidden) continue;

	if (cs_source_get_link_index(ptr) == csp->iface) {
	  pf = cs_maybe_init_source(ptr);

	  char *type = "Neighbor";
	  
	  if (ptr == csp->our_source) {
	    type = "SELF";
	  }
	  else {
	    if (cluster_source_is_parent(csp, ptr)) {
	      type = "Parent";
	    }
	  }

	  bufprintf(buf, "  %s %s/%d[%s]: Conn=%d, Mode=%s, Reported=%s%s,\n",
		    type,
		    print_if_id(ptr->flow_id.src), ptr->flow_id.src_if,
		    cluster_map_index_to_str(pf->cluster_index),
		    pf->last_conn,
		    cluster_mode_to_str(pf->outgoing_clustering_state),
		    cluster_mode_to_str(pf->reported_clustering_state),
		    ptr->hidden ? ", Hidden" : "");
	  bufprintf(buf, "        %s%s%swcount=%d%s%s, info: %s/%s",
		    pf->is_prospect ? "PROSPECT, " : "",
		    pf->dead ? "DEAD, " : "",
		    pf->warn ? "warn, " : "",
		    pf->warn_count,
		    pf->reject ? ", reject" : "",
		    pf->rejection_refractory ? ", pr" : "",
		    print_if_id(pf->cluster_info.node_id), print_if_id(pf->cluster_info.if_id)
		    );
	  
	  if (pf->expect_refresh_timer)
	    bufprintf(buf, ", expect refresh in %.3fs", 
		      g_timer_time_remaining(pf->expect_refresh_timer) / 1000.0);
	  
	  if (pf->join_complete_timer)
	    bufprintf(buf, ", join in %.3fs", 
		      g_timer_time_remaining(pf->join_complete_timer) / 1000.0);
	  
	  bufprintf(buf, "\n");
	  
	  if (pf->why_rejected)
	    bufprintf(buf, "        Rejected because %s\n",
		      pf->why_rejected);

	}
      }

      bufprintf(buf, "\n");
    }
  }

  return STATUS_MSG_COMPLETE;
}


static
int cluster_status_bin(status_context_t *info, buf_t *buf)
{
  cs_per_link_t *csp = (cs_per_link_t *) sd_data(info);

  source_t *ptr;
  for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
       ptr=ssync_sources_next(ptr)) {

    if (cs_source_get_link_index(ptr) == csp->iface) {
      cs_per_flow_t *pf = cs_maybe_init_source(ptr);

      if (!cluster_source_is_connected(csp, pf->parent))
	continue;
      
      /* $$$$ DO SPECIAL EMVIEW MODULE!!! */
      neighbor_t ne = {
	node_id: LINK_BROADCAST,
	if_id: ptr->flow_id.src,
	interface: ptr->flow_id.src_if,
	state: pf->join_complete_timer ? ASYMMETRIC : ACTIVE,
	conn_from: pf->join_complete_timer ? 
	g_timer_time_remaining(pf->join_complete_timer) / 1000 : pf->last_conn,
	conn_to: 100
      };
      bufcpy(buf, (char*)&ne, sizeof(ne));
    }
  }

  neighbor_t ne = {};
  bufcpy(buf, (char*)&ne, sizeof(ne));
  
  return STATUS_MSG_COMPLETE;
}


int cluster_status_update(cs_per_link_t *csp)
{
  g_status_dev_notify(csp->cluster_stat);
  g_status_dev_notify(csp->cluster_mode_stat);
  return 0;
}


/*
 *  INITIALIZATION
 */ 

void cluster_link_init(lu_multi_link_t *link)
{
  cs_per_link_t *csp = lu_multi_link_get_data(link);
  
  status_dev_opts_t t_opts = {
    device: {
      devname: link_name_s(lu_multi_link_get_name(link), "cluster"),
      device_info: csp
    },
    printable: cluster_status_print,
    binary: cluster_status_bin
  };
  
  if (g_status_dev(&t_opts, &(csp->cluster_stat)) < 0) {
    elog(LOG_CRIT, "Unable to create status device %s: %m",
	 t_opts.device.devname);
    exit(1);
  }
  
  status_dev_opts_t t_opts2 = {
    device: {
      devname: link_name_s(lu_multi_link_get_name(link), "cluster_mode"),
      device_info: csp
    },
    printable: cluster_mode_status_print,
  };
  
  if (g_status_dev(&t_opts2, &(csp->cluster_mode_stat)) < 0) {
    elog(LOG_CRIT, "Unable to create status device %s: %m",
	 t_opts.device.devname);
    exit(1);
  }
  
  /* initialize */  

  /* create our outbound bcast stream */
  flow_id_t fid = {
    src: csp->if_id,
    dst: LINK_BROADCAST,
    dst_if: csp->iface,
    src_if: csp->iface,
    max_hops: 1
  };
  csp->our_source = ssync_source_lookup_create(csp->parent->ref, &fid, 0, 1);

  /* initialize our source */
  cs_per_flow_t *pf = cs_maybe_init_source(csp->our_source);
  pf->cluster_info.node_id = my_node_id;
  pf->cluster_info.if_id = csp->if_id;

  /* start in isolated mode */
  cluster_isolated_mode(csp);
}







See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  TODO
  cs_i.h
  cs_main.c
  cs_net.c
  cs_self_cluster.c
  cs_table.c