Code Search for Developers
 
 
  

sinkgradient.c from EmStar at Krugle


Show sinkgradient.c syntax highlighted

char sinkgradient_c_cvsid[] = "$Id: sinkgradient.c,v 1.6 2005/06/18 01:09:49 adparker Exp $";

#include <assert.h>
#include <stdint.h>
#include <glib.h>
#include <time.h>
#include "link/link.h"
#include "emrun/emrun.h"
#include "libmisc/misc.h"
#include "libmisc/misc_buf.h"
#include "devel/state/cluster_map.h"
#include "devel/state/ssync.h"
#include "libdev/status_dev.h"
#include "devel/block_tree/rddhop.h"
#include "devel/block_tree/parent_child.h"


#define SINKGRADIENT_TIMER 1000

typedef struct sinkgradient_state_s
{
  rddhop_table_t *all_rddhops;
  int all_rddhops_count;

  ssync_sub_t *rddhops_sub;
  flow_id_t fid;
  uint8_t sinkstatus;
  status_client_context_t * status_context;
  status_context_t * status_dev_context;
  status_context_t * status_dev_up_bin_context;
  status_context_t * status_dev_down_bin_context;
  status_context_t * status_dev_up_ascii_context;
  status_context_t * status_dev_down_ascii_context;
  rddhop_t * etx_lower;
  int etx_lower_count;
  rddhop_t * etx_higher;
  int etx_higher_count;
  uint32_t myetx;
  lu_context_t * link_context;

  parent_info_t * parent;
  int parent_count;
  child_info_t * children;
  int children_count;

  status_context_t * status_dev_parent_bin_context;
  status_context_t * status_dev_parent_ascii_context;
  status_context_t * status_dev_children_bin_context;
  status_context_t * status_dev_children_ascii_context;
  
} sinkgradient_state_t;


/* static inline int rddhop2_sub_open_full(char *prefix, rddhop_sub_cb_t cb, void *data, ssync_sub_opts_t *opts, ssync_sub_t **ref) */
/* { */
/*   ssync_sub_opts_t _opts = { */
/*     prefix_name: prefix, */
/*     type_name: "rddhop", */
/*     record_len: sizeof(rddhop_t), */
/*     key_len: sizeof(rddhop_t), */
/*     cb: rddhop_adaptor, */
/*     data: data, */
/*     data2: cb, */
/*     reread_period: opts->reread_period, */
/*     read_refractory: opts->read_refractory */
/*   }; */
/*   elog(LOG_DEBUG_0, "cb is %p = %p\n", _opts.cb, rddhop_adaptor); */
/*   return ssync_sub_open(&_opts, ref); */
/* } */


/****************** stuff for rddhop_path_* **/
typedef struct rddhop_path_key_s
{
  uint32_t src;
} rddhop_path_key_t;

typedef struct rddhop_path_data_s
{
  GArray *array;                // of rddhop_t
} rddhop_path_data_t;


cluster_map_t * cluster_map = NULL;
int use_mhsync = 0;
char * use_prefix = SSYNC_CLUSTER_PREFIX;


static inline void
rddhop_unparse (buf_t * buf, rddhop_t * rddhop);

static inline void
rddhop_table_unparse (buf_t * buf, rddhop_table_t * table, int count);

static void
key_destroy_notify (gpointer data);

static gint
key_comp (gconstpointer a, gconstpointer b, gpointer user_data);

static rddhop_path_data_t *
data_new ();

static void
data_destroy_notify (gpointer data);

int
sinkgradient_printable(status_context_t *info, buf_t *buf);

static void
convert_src_to_id(rddhop_table_t * table, int count);

static int
rddhop_sub_handler (ssync_sub_t * sub, rddhop_table_t * table,
                    int count, void *data);

static sinkgradient_state_t
init_state ();

static void
usage (char *name);

static void
init_link (char *uses, char **argv, sinkgradient_state_t * state);

static void
rddhop_sub (sinkgradient_state_t * state);

static void
key_free (rddhop_path_key_t * key);

static rddhop_path_key_t *
key_new ();

static rddhop_path_key_t *
key_dup (const rddhop_path_key_t * key);

static rddhop_path_key_t *
key_init (const rddhop_table_t table);

static rddhop_path_key_t
key_copy_src (uint32_t src);

rddhop_path_data_t *
key_get_data (GTree * tree, const rddhop_path_key_t * key);

static void
data_append_hop (rddhop_path_data_t * data, rddhop_t hop);

GTree *
rddhop_table_to_path_tree (const rddhop_table_t * table, int count);

static inline rddhop_table_t *
rddhop_add_empty_block (rddhop_table_t * table, int count);

static inline int
rddhop_pub_add_empty (char *prefix,
                      rddhop_table_t * table, int count, flow_id_t * fid);

static inline int
rddhop_pub_helper(char *prefix,
		  rddhop_table_t * table, int count, flow_id_t *fid);


gboolean
traverse_reject_nulls (gpointer key, gpointer value, gpointer data);

gboolean
traverse_reject_containing_self (gpointer key, gpointer value, gpointer data);

static void
path_tree_remove_key_array (GTree * tree, GArray * array);

static void
rddhop_path_data_update_etx (rddhop_path_data_t * data, uint32_t etx);

gboolean
traverse_update_etx (gpointer key, gpointer value, gpointer data);

gboolean
traverse_find_src_hop (gpointer key, gpointer value, gpointer data);

uint32_t
rddhop_etx_if_next_hop(rddhop_t hop);

static rddhop_t
rddhop_array_min_hop (GArray * array);

static void
rddhop_path_data_append_self (rddhop_t nexthop, rddhop_path_data_t * data);

static rddhop_path_data_t *
rddhop_path_data_copy (rddhop_path_data_t * data);

static gint
rddhop_compare_etx_low_first(gconstpointer a, gconstpointer b);

static gint
rddhop_compare_etx_high_first(gconstpointer a, gconstpointer b);

GArray *
rddhop_table_to_GArray(rddhop_table_t * table, int count);

static rddhop_path_data_t *
sink_path(sinkgradient_state_t * state);

static rddhop_path_data_t *
get_best_path(sinkgradient_state_t * state);

static void
update_parent_state(sinkgradient_state_t * state);

static void
update_children_state(sinkgradient_state_t * state);

static void
update_etx_arrays(sinkgradient_state_t * state);

int
up_bin(status_context_t * info, buf_t * buf);

int
down_bin(status_context_t * info, buf_t * buf);

int
up_ascii(status_context_t * info, buf_t * buf);

int
down_ascii(status_context_t * info, buf_t * buf);

int
parent_ascii (status_context_t * info, buf_t * buf);

int
parent_bin(status_context_t * info, buf_t * buf);

int
children_ascii (status_context_t * info, buf_t * buf);

int
children_bin(status_context_t * info, buf_t * buf);

static GArray * 
path_tree_create_children_array(GTree * tree);

static rddhop_path_data_t * 
path_tree_create_best_path (GTree * tree);

static rddhop_table_t *
rddhop_path_data_to_table (const rddhop_path_data_t * data);

static void
rddhop_path_data_free(rddhop_path_data_t * data);

static int
sinkgradient_timer (gpointer data, int interval, g_event_t * ev);

static void
init_timer (sinkgradient_state_t * state);

void
sinkgradient_shutdown (void *data);

static int
status_handler(void*new_buf, size_t size, void * data);

static void
status_init (sinkgradient_state_t * state);

void
status_dev_init(sinkgradient_state_t * state, char * dev_name, status_response_cb_t printable_cb,
		status_response_cb_t binary_cb, status_context_t ** context);

int
main (int argc, char **argv);


/** state sync stuff start **/


/*
static gint
rddhop_compare (gconstpointer a, gconstpointer b)
{
  const rddhop_t *aa = (const rddhop_t *) a;
  const rddhop_t *bb = (const rddhop_t *) b;
  return aa->hop_number - bb->hop_number;
}
*/


static inline void
rddhop_unparse (buf_t * buf, rddhop_t * rddhop)
{
  bufprintf (buf, "\thop_number: %d, host: %s, ETX*100: %d\n",
             rddhop->hop_number, print_if_id (rddhop->host), rddhop->etx);
}

static inline void
rddhop_table_unparse (buf_t * buf, rddhop_table_t * table, int count)
{
  elog(LOG_NOTICE," ");
  int i;
  for (i = 0; i < count; ++i)
    {
      const char *seconds =
        misc_secs_to_str ((int) (time (0) - table[i].header.rcv_time));
      bufprintf (buf, " \n\tSRC:%-15s",
                 print_if_id (table[i].header.flow_id.src));
      bufprintf (buf, " HOPS: %d ", table[i].header.hops_away);
      bufprintf (buf, " SECONDS_OLD: %s \n", seconds);
      rddhop_unparse (buf, &table[i].rddhop);
    }
}

/** state sync stuff end **/

static void
key_destroy_notify (gpointer data)
{
  if (data != NULL)
    free (data);
  return;
}

static gint
key_comp (gconstpointer a, gconstpointer b, gpointer user_data)
{
  rddhop_path_key_t *aa = (rddhop_path_key_t *) a;
  rddhop_path_key_t *bb = (rddhop_path_key_t *) b;
  return aa->src - bb->src;
}



static rddhop_path_data_t *
data_new ()
{
  rddhop_path_data_t *data =
    (rddhop_path_data_t *) malloc (sizeof (rddhop_path_data_t));
  memset (data, 0, sizeof (rddhop_path_data_t));
  data->array = g_array_new (0, 1, sizeof (rddhop_t));
  return data;
}


static void
data_destroy_notify (gpointer data)
{
  if (data)
    {
      rddhop_path_data_t *d = (rddhop_path_data_t *) data;
      if (d->array)
        g_array_free (d->array, 1);
    }
}

/********************** end rddhop_path_* **/

  

int
sinkgradient_printable(status_context_t *info, buf_t *buf)
{
  sinkgradient_state_t*state=(sinkgradient_state_t*)sd_data(info);
  bufprintf(buf,"RDDHOPS_TABLE\n");
  rddhop_table_unparse(buf, state->all_rddhops, state->all_rddhops_count);
  return STATUS_MSG_COMPLETE;
}


static void
convert_src_to_id(rddhop_table_t * table, int count)
{
  int i;
  for (i = 0; i < count; ++i)
    {
      uint32_t * src = &table[i].header.flow_id.src;
      cl_id_t * cl_id = cluster_map_lookup_by_address(cluster_map, *src, 1, 0);
      if(cl_id)
	{
	  *src = cl_id->node_id;
	}
    }
}


static int
rddhop_sub_handler (ssync_sub_t * sub, rddhop_table_t * table,
                    int count, void *data)
{
  elog (LOG_NOTICE, " ");
  sinkgradient_state_t *state = (sinkgradient_state_t *) data;
  if (state->all_rddhops != NULL)
    free (state->all_rddhops);
  state->all_rddhops = table;
  state->all_rddhops_count = count;
  if (!use_mhsync)
    convert_src_to_id(table,count);
  
  buf_t *buf = buf_new ();
  rddhop_table_unparse (buf, state->all_rddhops, state->all_rddhops_count);
  elog (LOG_NOTICE, "%s", buf->buf);
  buf_free (buf);

  g_status_dev_notify(state->status_dev_context);
  return EVENT_RENEW;
}


static sinkgradient_state_t
init_state ()
{
  sinkgradient_state_t state = {
  all_rddhops:NULL,
  all_rddhops_count:0,
  rddhops_sub:NULL,
  fid:{
    //    src:my_node_id, Do this later
    dst:LINK_BROADCAST,
    src_if: 1,
    dst_if: 1,
    max_hops:MHSYNC_ONE_HOP
  },
  sinkstatus:0,
  status_context:NULL,
  status_dev_context:NULL,
  status_dev_up_bin_context:NULL,
  status_dev_up_ascii_context:NULL,
  status_dev_down_bin_context:NULL,
  status_dev_down_ascii_context:NULL,
  
  etx_lower:NULL,
  etx_lower_count:0,
  etx_higher:NULL,
  etx_higher_count:0,
  myetx:-1,
  link_context:NULL,  
  parent:NULL,
  parent_count:0,
  children:NULL,
  children_count:0,
  status_dev_parent_bin_context:NULL,
  status_dev_parent_ascii_context:NULL,
  status_dev_children_bin_context:NULL,
  status_dev_children_ascii_context:NULL,


  };

  return state;
}


static void
usage (char *name)
{
  misc_print_usage
    (name, "-U <device>", "  --uses <device>: Specify link device to use.");
  exit (1);
}

static void
init_link (char *uses, char **argv, sinkgradient_state_t * state)
{
  if (uses == NULL)
    {
      elog (LOG_CRIT, "Please specify a link to use!");
      usage (argv[0]);
    }

  lu_opts_t lu_opts = {
    opts:{
      name:uses,
      data:state
    },
  };
  if (lu_open (&lu_opts, &state->link_context) < 0)
    {
      elog (LOG_CRIT, "Can't open %s: %m", link_name (&lu_opts.opts, NULL));
      exit (1);
    }
  
  if (lu_get_if_id(state->link_context, &state->fid.src) != 0)
    {
      elog (LOG_CRIT, "Error looking up my interface id.");
      exit(1);
    }

  if (use_mhsync) state->fid.src = my_node_id;
  
  return;
}


static void
rddhop_sub (sinkgradient_state_t * state)
{
  ssync_sub_opts_t opts = { reread_period:REREAD_PERIOD, read_refractory:REFRACTORY };
  if (rddhop_sub_open_full (use_prefix, rddhop_sub_handler, state, &opts, &state->rddhops_sub)
      < 0)
    {
      elog (LOG_ERR, "Unable to subscribe to rddhop channel: %m");
      exit (1);
    }
  return;
}

static void
key_free (rddhop_path_key_t * key)
{
  key_destroy_notify (key);
}


static rddhop_path_key_t *
key_new ()
{
  rddhop_path_key_t *key =
    (rddhop_path_key_t *) malloc (sizeof (rddhop_path_key_t));
  memset (key, 0, sizeof (rddhop_path_key_t));
  return key;
}

static rddhop_path_key_t *
key_dup (const rddhop_path_key_t * key)
{
  rddhop_path_key_t *newkey = key_new ();
  memcpy (newkey, key, sizeof (rddhop_path_key_t));
  return newkey;
}

static rddhop_path_key_t *
key_init (const rddhop_table_t table)
{
  rddhop_path_key_t *key = key_new ();
  key->src = table.header.flow_id.src;
  return key;
}

/* static rddhop_path_key_t
key_copy (const rddhop_path_key_t * key)
{
  return *key;
} */

static rddhop_path_key_t
key_copy_src (uint32_t src)
{
  rddhop_path_key_t key = {
  src:src
  };
  return key;
}

// returns pointer to memory owned by the tree
// caller still responsible for the key
rddhop_path_data_t *
key_get_data (GTree * tree, const rddhop_path_key_t * key)
{
  rddhop_path_key_t *mykey = key_dup (key);
  int free_key = 1;

  rddhop_path_data_t *data = g_tree_lookup (tree, key);
  if (!data)
    {
      data = data_new ();
      g_tree_insert (tree, mykey, data);
      // tree now owns key. don't have to free.
      // I know. It's weird.
      free_key = 0;
    }
  if (free_key)
    key_free (mykey);
  return data;
}
 
static void
data_append_hop (rddhop_path_data_t * data, rddhop_t hop)
{
  g_array_append_val (data->array, hop);
}

GTree *
rddhop_table_to_path_tree (const rddhop_table_t * table, int count)
{
  // GTree key is rddhop_path_key_t
  // key.src = rddhop_table_t[i].header.flow_id.src

  // GTree data is rddhop_path_data_t
  // g_array_append (data.array, rddhop_table_t[i].rddhop)

  GTree *tree =
    g_tree_new_full (key_comp, NULL, key_destroy_notify, data_destroy_notify);
  int i;
  for (i = 0; i < count; ++i)
    {
      // create a rddhop_path_key from rddhop_table_t[i]
      rddhop_path_key_t *key = key_init (table[i]);
      // look up that key.
      rddhop_path_data_t *data = key_get_data (tree, key);
      // append copy of rddhop_t to result;
      data_append_hop (data, table[i].rddhop);
      key_free (key);
    }
  return tree;
}

static inline rddhop_table_t *
rddhop_add_empty_block (rddhop_table_t * table, int count)
{
  size_t elem_size = sizeof (rddhop_table_t);
  size_t old_table_size = elem_size * count;
  size_t new_table_size = elem_size * (count + 1);
  rddhop_table_t *new_table = (rddhop_table_t *) malloc (new_table_size);
  memset (new_table, 0, new_table_size);
  memcpy (new_table, table, old_table_size);
  return new_table;
}


static inline int
rddhop_pub_add_empty (char *prefix,
                      rddhop_table_t * table, int count, flow_id_t * fid)
{
  int result = 0;
  //  rddhop_table_t *new_table = rddhop_add_empty_block (table, count);
   result = rddhop_pub (prefix, table, count , fid);
  //  free (new_table);
  return result;
}


static inline int
rddhop_pub_helper(char *prefix,
		      rddhop_table_t * table, int count, flow_id_t *fid)
{
  int result = 0;
  if (count == 0)
    {
     result = rddhop_pub_add_empty(prefix, table, count, fid);
    }
  else
    {
      result = rddhop_pub(prefix,table,count,fid);
    }
  return result;
}


gboolean
traverse_reject_nulls (gpointer key, gpointer value, gpointer data)
{
  GArray *reject_array = (GArray *) data;
  rddhop_path_key_t *k = (rddhop_path_key_t *) key;
  rddhop_path_data_t *d = (rddhop_path_data_t *) value;

  // reject if host is zero
  int i;
  for (i = 0; i < d->array->len; ++i)
    {
      rddhop_t hop = g_array_index (d->array, rddhop_t, i);
      if (hop.host == 0)
        {
          // reject this key
          g_array_append_val (reject_array, *k);
          break;
        }
    }
  return FALSE;
}

gboolean
traverse_reject_containing_self (gpointer key, gpointer value, gpointer data)
{
  GArray *reject_array = (GArray *) data;
  rddhop_path_key_t *k = (rddhop_path_key_t *) key;
  rddhop_path_data_t *d = (rddhop_path_data_t *) value;

  // reject if host is zero or equal to self
  int i;
  for (i = 0; i < d->array->len; ++i)
    {
      rddhop_t hop = g_array_index (d->array, rddhop_t, i);
      if ( hop.host == my_node_id)
        {
          // reject this key
          g_array_append_val (reject_array, *k);
          break;
        }
    }
  return FALSE;
}

static void
path_tree_remove_key_array (GTree * tree, GArray * array)
{
  if (!array || !tree)
    return;
  int i;
  for (i = 0; i < array->len; ++i)
    {
      rddhop_path_key_t key = g_array_index (array, rddhop_path_key_t, i);
      g_tree_remove (tree, &key);
    }
}

static void
rddhop_path_data_update_etx (rddhop_path_data_t * data, uint32_t etx)
{
  assert (data);
  int i;
  for (i = 0; i < data->array->len; ++i)
    {
      rddhop_t *hop = &g_array_index (data->array, rddhop_t, i);
      hop->etx += etx;
    }
}

gboolean
traverse_update_etx (gpointer key, gpointer value, gpointer data)
{

  rddhop_path_data_t *d = (rddhop_path_data_t *) value;
  // update the etxs.
  rddhop_path_data_update_etx (d, 1);
  return FALSE;
}

 // do this by traversing tree. 
  // add (src, etx) to array
  //    find etx by searching array for src

gboolean
traverse_find_src_hop (gpointer key, gpointer value, gpointer data)
{
  GArray *nexthop_array = (GArray *) data;
  rddhop_path_key_t *k = (rddhop_path_key_t *) key;
  rddhop_path_data_t *d = (rddhop_path_data_t *) value;

  // find corresponding etx.
  int i;
  for (i = 0; i < d->array->len; ++i)
    {
      rddhop_t hop = g_array_index (d->array, rddhop_t, i);
      if (hop.host == k->src)
        {
          g_array_append_val (nexthop_array, hop);
          break;
        }
    }
  return FALSE;
}

uint32_t
rddhop_etx_if_next_hop(rddhop_t hop)
{
  return 1 + hop.etx;
}

// asserts array is not empty
// DON'T GIVE ME AN EMPTY ARRAY!
static rddhop_t
rddhop_array_min_hop (GArray * array)
{
  assert (array->len > 0);
  rddhop_t minhop = g_array_index (array, rddhop_t, 0);

  int i;
  for (i = 0; i < array->len; ++i)
    {
      rddhop_t hop = g_array_index (array, rddhop_t, i);
      if (rddhop_etx_if_next_hop(hop)
	  < rddhop_etx_if_next_hop(minhop))
        {
          minhop = hop;
        }
    }
  return minhop;
}

static void
rddhop_path_data_append_self (rddhop_t nexthop, rddhop_path_data_t * data)
{
  assert (data); assert(data->array);
  rddhop_t hop = {
  hop_number:data->array->len,
  host:my_node_id,
  etx: rddhop_etx_if_next_hop(nexthop)
  };
  g_array_append_val (data->array, hop);
}


static rddhop_path_data_t *
rddhop_path_data_copy (rddhop_path_data_t * data)
{
  assert(data);
  rddhop_path_data_t *newdata = data_new ();
  int i;
  for (i = 0; i < data->array->len; ++i)
    {
      rddhop_t hop = g_array_index (data->array, rddhop_t, i);
      g_array_append_val (newdata->array, hop);
    }
  return newdata;
}

static gint
rddhop_compare_etx_low_first(gconstpointer a, gconstpointer b)
{
  // return a - b
  rddhop_t * aa = (rddhop_t *)a;
  rddhop_t * bb = (rddhop_t *)b;
  return (aa->etx - bb->etx);
}

static gint
rddhop_compare_etx_high_first(gconstpointer a, gconstpointer b)
{
  // return a - b
  rddhop_t * aa = (rddhop_t *)a;
  rddhop_t * bb = (rddhop_t *)b;
  return (bb->etx - aa->etx);
}



GArray *
rddhop_table_to_GArray(rddhop_table_t * table, int count)
{
  GArray * array = g_array_new(0,1,sizeof(rddhop_t));
  int i;
  for (i = 0; i < count; ++i)
    {
      g_array_append_val(array,table[i].rddhop);
    }
  return array;
			       
}

static rddhop_path_data_t *
sink_path(sinkgradient_state_t * state)
{
  rddhop_path_data_t * data = data_new();
  rddhop_t hop = { hop_number : 0, host : my_node_id, etx : 0 };
  data_append_hop(data, hop);
  return data;
}


// works even if we're the sink.
// returns allocated memory.
static rddhop_path_data_t *
get_best_path(sinkgradient_state_t * state)
{
  rddhop_path_data_t * best_path = NULL;
  GTree * tree = NULL;
  if (state->sinkstatus)
    {
      // best path just has me listed.
      best_path = sink_path(state);
    }
  else
    {
      // otherwise we have to actually calculate the best path.
      tree = rddhop_table_to_path_tree (state->all_rddhops,
					state->all_rddhops_count);
      best_path = path_tree_create_best_path (tree);
    }
  if (tree)
    g_tree_destroy(tree);
  return best_path;
}

static if_id_t
node_id_to_if_id(node_id_t node_id)
{
  // Note: yes, node_id may have multiple if_ids. But for this applications, I'm 
  // assuming that we want the one that matches interface 1.
  //
  cl_id_t * cl_id = cluster_map_lookup_by_node_and_iface(cluster_map, node_id, 1);
  if (!cl_id) return 0;
  return cl_id->if_id;
}

static void
update_parent_state(sinkgradient_state_t * state)
{
  rddhop_path_data_t * best_path = get_best_path(state);
  if (state->parent)
    {
      free(state->parent);
      state->parent = NULL;
      state->parent_count = 0;
    }
  if (best_path->array->len > 1)
    {
      state->parent_count = 1;
      state->parent = (parent_info_t*) malloc (sizeof (parent_info_t));
      memset (state->parent, 0, sizeof (parent_info_t));
      // the parent is the second-to-last entry in the path
      state->parent->rddhop = g_array_index (best_path->array, rddhop_t,
					     best_path->array->len - 2);
      state->parent->if_id = node_id_to_if_id(state->parent->rddhop.host);
    }
  rddhop_path_data_free(best_path);
}

static void
update_children_state(sinkgradient_state_t * state)
{
  if (state->children)
    {
      free(state->children);
      state->children = NULL;
      state->children_count = 0;
    }

  GTree * tree =  rddhop_table_to_path_tree(state->all_rddhops,
				    state->all_rddhops_count);
  // Array of rddhop_path_key_t
  GArray * children_array = path_tree_create_children_array(tree);
  state->children_count = children_array->len;
  ssize_t total_size = children_array->len * sizeof(child_info_t);  
  state->children = (child_info_t*)malloc(total_size);
  memset(state->children, 0, total_size);
  // I'm not sure if this memcpy works.
  // memcpy (state->children, children_array->data, total_size);

  int i;
  for (i = 0; i < children_array->len; ++i)
    {
      rddhop_path_key_t * child_key = &g_array_index(children_array, rddhop_path_key_t, i);
      rddhop_path_data_t * child_data = g_tree_lookup(tree, child_key);
      // The rddhop info for the child is the last one in the path.
      state->children[i].rddhop = g_array_index(child_data->array,
						rddhop_t,
						child_data->array->len - 1);
      state->children[i].if_id = node_id_to_if_id(state->children[i].rddhop.host);
    }
  g_array_free (children_array, 1);
  g_tree_destroy (tree);
}



static void
update_etx_arrays(sinkgradient_state_t * state)
{
  GTree * tree = rddhop_table_to_path_tree(state->all_rddhops,
					   state->all_rddhops_count);
  //// create an array of rddhop_t nexthops, with etx
  // reflecting etx to their sink.
  
  GArray * neighbor_array = g_array_new(0,1,sizeof(rddhop_t));
  
  // create reject_array
  GArray * reject_array =  g_array_new(0,1,sizeof(rddhop_t));
  g_tree_foreach(tree, traverse_reject_nulls, reject_array);

  // remove array from tree
  path_tree_remove_key_array(tree, reject_array);
  g_tree_foreach(tree, traverse_find_src_hop, neighbor_array);
  
  // Copy neighbor_array into 'array'
  GArray * array = g_array_new(0,1,sizeof(rddhop_t));
  int i;
  for (i = 0; i < neighbor_array->len; ++i)
    g_array_append_val(array, g_array_index(neighbor_array,rddhop_t,i) );
  
  
  /// Remove nodes with ETX values higher than me.
  g_array_sort(array, rddhop_compare_etx_low_first);
  
  int myetx = 0;
  int etxfound = 0;
  for (i = 0; i < array->len; ++i)
    {
      rddhop_t hop = g_array_index(array, rddhop_t, i);
      if (hop.host == my_node_id) {
	myetx = state->myetx = hop.etx;
	etxfound = 1;
      }
      if ((etxfound) && (hop.etx != myetx)) // prune the rest
	{
	  int number_left = array->len - i;
	  while (number_left-- > 0)
	    g_array_remove_index(array, i);
	  break;
	}
    }

  // Copy lower etx nodes into the state variable and free temp variables.
  if (state->etx_lower) free(state->etx_lower);
  state->etx_lower_count = array->len;
  state->etx_lower = (rddhop_t*)malloc(sizeof(rddhop_t)*state->etx_lower_count);
  memset(state->etx_lower,0,sizeof(rddhop_t) * state->etx_lower_count);
  for (i = 0; i < array->len; ++i)
    {
      memcpy(&state->etx_lower[i], &g_array_index(array,rddhop_t,i), sizeof(rddhop_t));
    }
  g_array_free(array,1);


  ////////////////   copy array again, but this time do it for the 
  // nodes with higher etx values.
  array = g_array_new(0,1,sizeof(rddhop_t));
  for (i = 0; i < neighbor_array->len; ++i)
    g_array_append_val(array, g_array_index(neighbor_array,rddhop_t,i) );

  g_array_sort(array,rddhop_compare_etx_high_first);

  myetx = 0;
  etxfound = 0;
  for (i = 0; i < array->len; ++i)
    {
      rddhop_t hop = g_array_index(array, rddhop_t, i);
      
      if (hop.host == my_node_id) {
	myetx = state->myetx = hop.etx;
	etxfound = 1;
      }
      if ((etxfound) && (hop.etx != myetx)) // prune the rest
	{
	  int number_left = array->len - i;
	  while (number_left-- > 0)
	    g_array_remove_index(array, i);
	  break;
	}
    }
  if (state->etx_higher) free(state->etx_higher);
  state->etx_higher_count = array->len;
  state->etx_higher = (rddhop_t*)malloc(sizeof(rddhop_t)*state->etx_higher_count);
  memset(state->etx_higher,0,sizeof(rddhop_t)*state->etx_higher_count);
  for (i = 0; i < array->len; ++i)
    {
      memcpy(&state->etx_higher[i], &g_array_index(array,rddhop_t,i), sizeof(rddhop_t));
    }

  g_array_free(array, 1);
  g_array_free(neighbor_array,1);
  g_array_free(reject_array, 1);
  g_tree_destroy(tree);
}
  
  
int
up_bin(status_context_t * info, buf_t * buf)
{
  sinkgradient_state_t * state= (sinkgradient_state_t*)sd_data(info);
  update_etx_arrays(state);
  bufcpy(buf, state->etx_higher, state->etx_higher_count * sizeof(rddhop_t));
  return STATUS_MSG_COMPLETE;
}

int
down_bin(status_context_t * info, buf_t * buf)
{
  sinkgradient_state_t * state= (sinkgradient_state_t*)sd_data(info);
  update_etx_arrays(state);
  bufcpy(buf, state->etx_lower, state->etx_lower_count * sizeof(rddhop_t));
  return STATUS_MSG_COMPLETE;
}
  
int
up_ascii(status_context_t * info, buf_t * buf)
{
  sinkgradient_state_t * state= (sinkgradient_state_t*)sd_data(info);
  update_etx_arrays(state);
  bufprintf(buf, "My ETX: %d: Those that are HIGHER...\n", state->myetx);
  int i;
  for (i = 0; i < state->etx_higher_count; ++i)
    {
      bufprintf(buf, "host: (%d) %s, etx %d\n", state->etx_higher[i].host, print_if_id(state->etx_higher[i].host), state->etx_higher[i].etx);
    }
      
  return STATUS_MSG_COMPLETE;
}

int
down_ascii(status_context_t * info, buf_t * buf)
{
  sinkgradient_state_t * state= (sinkgradient_state_t*)sd_data(info);
  update_etx_arrays(state);
  bufprintf(buf, "My ETX: %d: Those that are LOWER...\n", state->myetx);
  int i;
  for (i = 0; i < state->etx_lower_count; ++i)
    {
      bufprintf(buf, "host: (%d) %s, etx %d\n", state->etx_lower[i].host, print_if_id(state->etx_lower[i].host), state->etx_lower[i].etx);
    }

  return STATUS_MSG_COMPLETE;
}

int
parent_ascii (status_context_t * info, buf_t * buf)
{
  sinkgradient_state_t * state=(sinkgradient_state_t*)sd_data(info);
  update_etx_arrays(state);
  bufprintf(buf, "My ETX: %d\n", state->myetx);
  if (state->parent_count)
    {
      bufprintf(buf, "My parent node_id: %d, if_id: %s, etx: %d\n", state->parent->rddhop.host,
		print_if_id(state->parent->if_id), state->parent->rddhop.etx);
    }
  else
    {
      bufprintf(buf, "I have no parent.");
    }
  return STATUS_MSG_COMPLETE;
}

int
parent_bin(status_context_t * info, buf_t * buf)
{
  sinkgradient_state_t * state = (sinkgradient_state_t*)sd_data(info);
  update_etx_arrays(state);
  bufcpy(buf, state->parent, state->parent_count * sizeof(parent_info_t));
  return STATUS_MSG_COMPLETE;
}

int
children_ascii (status_context_t * info, buf_t * buf)
{
  sinkgradient_state_t * state=(sinkgradient_state_t*)sd_data(info);
  update_etx_arrays(state);
  bufprintf(buf, "My ETX: %d\n", state->myetx);
  int i;
  if (state->children_count != 0) {
    for (i = 0; i < state->children_count; ++i)
      {
	bufprintf(buf, "My children node_id: %d, if_id: %s, etx: %d\n",
		  state->children[i].rddhop.host, print_if_id(state->children[i].if_id),
		  state->children[i].rddhop.etx);
      }
  }
  else
    {
      bufprintf(buf, "I have no children.");
    }
  return STATUS_MSG_COMPLETE;
}

int
children_bin(status_context_t * info, buf_t * buf)
{
  sinkgradient_state_t * state = (sinkgradient_state_t*)sd_data(info);
  update_etx_arrays(state);
  bufcpy(buf, state->children, state->children_count * sizeof(child_info_t));
  return STATUS_MSG_COMPLETE;
}

// Returns allocated memory
// Assumes that I'm not the sink
// May return an array of length zero.
static GArray * 
path_tree_create_children_array(GTree * tree)
{
  assert (tree);
  GArray * children_array = g_array_new(0,1,sizeof(rddhop_path_key_t));
  // children_array may actually contain myself.
  g_tree_foreach (tree, traverse_reject_containing_self, children_array);
  // filter self out.
  int i;
  for (i = 0; i < children_array->len; ++i)
    {
      rddhop_path_key_t hop = g_array_index(children_array, rddhop_path_key_t, i);
      if (hop.src == my_node_id) {
	g_array_remove_index(children_array, i);
	break;
      }
    }
  return children_array;
}
  
  


// MODIFIES THE TREE
// returns allocated memory
// assumes that i'm not the sink
// throws alway all paths with my node id since they're considered loops
// returns path of length zero if there's no match.
static rddhop_path_data_t * 
path_tree_create_best_path (GTree * tree)
{
  assert (tree);
  rddhop_path_data_t *bestpath = NULL;
  GArray * nexthop_array = g_array_new (0, 1, sizeof(rddhop_t));
  // create reject_array (containing keys to remove);
  GArray *reject_array = g_array_new (0, 1, sizeof(rddhop_path_key_t));
  g_tree_foreach (tree, traverse_reject_nulls, reject_array);
  g_tree_foreach (tree, traverse_reject_containing_self, reject_array);
  // remove array from tree
  path_tree_remove_key_array (tree, reject_array);
  int found = 0;
  if (g_tree_nnodes(tree) > 0)
    {
      // update etx
      // g_tree_foreach (tree, traverse_update_etx, NULL);

      g_tree_foreach (tree, traverse_find_src_hop, nexthop_array);

      if (nexthop_array->len > 0) {
	found = 1;
	// find the min (src,etx)
	rddhop_t minhop = rddhop_array_min_hop (nexthop_array);
	// lookup src in tree and get array
	rddhop_path_key_t minkey = key_copy_src (minhop.host);
	rddhop_path_data_t *mindata = g_tree_lookup (tree, &minkey);
	rddhop_path_data_append_self (minhop, mindata);
	// copy this array;
	bestpath = rddhop_path_data_copy (mindata);
      }
    }
  if (found == 0)
    {
      // this is an empty path. I don't know how to reach the sink
      bestpath = data_new();
    }
  // free up memory
  g_array_free (reject_array, 1);
  g_array_free (nexthop_array, 1);
  return bestpath;
}


static rddhop_table_t *
rddhop_path_data_to_table (const rddhop_path_data_t * data)
{
  assert(data);
  GArray * array = data->array;
  assert(array);
  elog(LOG_INFO," ");
  int count = array->len;
  rddhop_table_t * table = (rddhop_table_t *)malloc(sizeof(rddhop_table_t)*count);
  memset(table,0,sizeof(rddhop_table_t)*count);
  int i;
  for (i = 0; i < count; ++i)
  {
    table[i].rddhop = g_array_index(array,rddhop_t, i);
  }
  return table;
}


static void
rddhop_path_data_free(rddhop_path_data_t * data)
{
  data_destroy_notify(data);
}



static int
sinkgradient_timer (gpointer data, int interval, g_event_t * ev)
{
  elog (LOG_NOTICE, " ");
  sinkgradient_state_t *state = (sinkgradient_state_t *) data;
  rddhop_path_data_t * best_path = get_best_path(state);
  update_parent_state(state);
  update_children_state(state);
  
  // convert path to table
  rddhop_table_t *table = rddhop_path_data_to_table (best_path);
  int count = best_path->array->len;
  
// publish
  if (rddhop_pub_helper (use_prefix, table, count, &state->fid))
    {
      elog (LOG_CRIT, "Unable to publish rddhop_table");
    }
  buf_t *buf = buf_new ();
  bufprintf (buf, "Publishing the following RDDHOP_TABLE...");
  rddhop_table_unparse (buf, table, count);
  elog (LOG_NOTICE, "%s", buf->buf);
  buf_free (buf);

  free (table);
  rddhop_path_data_free(best_path);

  g_status_dev_notify(state->status_dev_context);
  g_status_dev_notify(state->status_dev_up_bin_context);
  g_status_dev_notify(state->status_dev_down_bin_context);
  g_status_dev_notify(state->status_dev_up_ascii_context);
  g_status_dev_notify(state->status_dev_down_ascii_context);
  g_status_dev_notify(state->status_dev_parent_ascii_context);
  g_status_dev_notify(state->status_dev_parent_bin_context);
  g_status_dev_notify(state->status_dev_children_ascii_context);
  g_status_dev_notify(state->status_dev_children_bin_context);

  return TIMER_RENEW_MS(TIMER_DELAY(count));
}


static void
init_timer (sinkgradient_state_t * state)
{
  if (g_timer_add (SINKGRADIENT_TIMER, sinkgradient_timer, state, NULL, NULL) < 0)
    {
      elog (LOG_CRIT, "Unable to create timer event.");
      exit (1);
    }
}


void
sinkgradient_shutdown (void *data)
{
  elog (LOG_NOTICE, "sinkgradient shutting down.");
  exit (0);
}


static int
status_handler(void*new_buf, size_t size, void * data)
{
  elog(LOG_INFO," ");
  if (size == 0)
    {
      elog(LOG_ERR,"Size is 0!");
      goto final;
    }
  if (!new_buf)
    {
      elog(LOG_ERR,"new_buf is NULL");
      goto final;
    }
  sinkgradient_state_t * state = (sinkgradient_state_t *)data;
  int new_status = atoi((char*)new_buf);
  if ((new_status == 0)||(new_status == 1))
    {
      if (new_status!=state->sinkstatus)
	{
	  elog(LOG_NOTICE,"Changing status to %d", new_status);
	  state->sinkstatus = new_status;
	}
      else
	{
	  elog(LOG_NOTICE,"Status is the same.");
	}
    }
  else
    {
      elog(LOG_ERR,"Unrecognized status");
      goto final;
    }
 final:
  if(new_buf)
    free(new_buf);
  return EVENT_RENEW;
}


static void
status_init (sinkgradient_state_t * state)
{
  status_client_opts_t opts = {
    devname:sim_path("/dev/block/sinkstatus"),
    handler:status_handler,
    private_data:state
  };
  if (g_status_client_full(&opts, &state->status_context) < 0)
    {
      elog (LOG_ERR,"Unable to open status client to %s", opts.devname);
      exit(1);
    }
}

void
status_dev_init(sinkgradient_state_t * state, char * dev_name, status_response_cb_t printable_cb, status_response_cb_t binary_cb, status_context_t ** context)
{
  status_dev_opts_t status_opts = {
    device:{
      devname: sim_path (dev_name), device_info:state}
    , printable:printable_cb,
    binary: binary_cb
  };
  if (g_status_dev (&status_opts, context) < 0)
    {
      elog (LOG_CRIT, "Can't create status device %s: %m",
	    status_opts.device.devname);
      exit (1);
    }
  g_status_dev_notify(*context);
}

static int
one_shot_timer_cb (gpointer data, int interval, g_event_t * ev)
{
  elog (LOG_INFO, " ");
  sinkgradient_state_t * state = (sinkgradient_state_t *) data;
  g_status_dev_notify(state->status_dev_context);
  return TIMER_DONE;
}
      
int
main (int argc, char **argv)
{
  misc_init (&argc, argv, CVSTAG);
  emrun_opts_t emrun_opts = {
    shutdown:sinkgradient_shutdown
  };
  emrun_init (&emrun_opts);
  char * uses = link_parse_uses(&argc, argv, NULL);
  if (argc > 2)
    usage (argv[0]);

  use_mhsync = misc_parse_out_switch(&argc, argv, "use_mhsync", '\0');
  if (use_mhsync) {
    use_prefix = SSYNC_MULTIHOP_PREFIX;
    elog (LOG_CRIT,"Using mhsync");
  }
  
  sinkgradient_state_t state = init_state ();
  rddhop_sub (&state);
  init_timer (&state);
  status_init (&state);
  status_dev_init(&state, "/dev/block/sinkgradient", sinkgradient_printable,
		  NULL, &state.status_dev_context);
  
  status_dev_init(&state, "/dev/block/up_bin", NULL, up_bin,&state.status_dev_up_bin_context);
  status_dev_init(&state, "/dev/block/down_bin", NULL, down_bin,&state.status_dev_down_bin_context);
  status_dev_init(&state, "/dev/block/up_ascii", up_ascii,NULL, &state.status_dev_up_ascii_context);
  status_dev_init(&state, "/dev/block/down_ascii", down_ascii, NULL, &state.status_dev_down_ascii_context);
  status_dev_init(&state, "/dev/block/parent_ascii", parent_ascii, NULL, &state.status_dev_parent_ascii_context);
  status_dev_init(&state, "/dev/block/parent_bin", NULL, parent_bin, &state.status_dev_parent_bin_context);
  status_dev_init(&state, "/dev/block/children_ascii", children_ascii, NULL, &state.status_dev_children_ascii_context);
  status_dev_init(&state, "/dev/block/children_bin", NULL, children_bin, &state.status_dev_children_bin_context);
  
  init_link(uses, argv, &state);

   /* cluster map event */
   cluster_map_opts_t cl_opts = {
     private_data: &state
   };
   if (cluster_map_new(&cl_opts, &cluster_map) < 0) {
     elog(LOG_CRIT, "Can't connect to cluster map: %m");
     exit(1);
   }

   if (g_timer_add (1000, one_shot_timer_cb, &state, NULL, NULL) < 0)
     {
       elog (LOG_CRIT, "Unable to create timer event.");
       exit (1);
     }

  g_main ();
  return 1;
}




See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  sinkgradient.c
  sinkgradient.h
  sinkgradient_type.c