Code Search for Developers
 
 
  

sinkgradient_with_LENT.c from EmStar at Krugle


Show sinkgradient_with_LENT.c syntax highlighted

char sinkgradient_c_cvsid[] = "$Id: sinkgradient_with_LENT.c,v 1.1 2006/03/24 23:48:56 adparker Exp $";

#include <assert.h>
#include <stdint.h>
#include <glib.h>
#include <time.h>
#include "link/link.h"
#include "emrun/emrun.h"
#include "libmisc/misc.h"
#include "libmisc/misc_buf.h"
#include "devel/state/cluster_map.h"
#include "devel/state/ssync.h"
#include "libdev/status_dev.h"
#include "devel/block/rddhop.h"
 

#define SINKGRADIENT_TIMER 1000

cluster_map_t * cluster_map = NULL;
int use_mhsync = 0;
char * use_prefix = SSYNC_CLUSTER_PREFIX;

/** state sync stuff start **/


/*
static gint
rddhop_compare (gconstpointer a, gconstpointer b)
{
  const rddhop_t *aa = (const rddhop_t *) a;
  const rddhop_t *bb = (const rddhop_t *) b;
  return aa->hop_number - bb->hop_number;
}
*/
SSYNC_PUB_TYPESAFE_INLINES (rddhop, rddhop_t, "rddhop", sizeof (rddhop_t));

SSYNC_PUB_TYPESAFE_FUNCS (rddhop, rddhop_t);

static inline void
rddhop_unparse (buf_t * LENT buf, rddhop_t * LENT rddhop)
{
  bufprintf (buf, "\thop_number: %d, host: %s, ETX*100: %d\n",
             rddhop->hop_number, print_if_id (rddhop->host), rddhop->etx);
}

static inline void
rddhop_table_unparse (buf_t * LENT buf, rddhop_table_t * LENT table, int count)
{
  elog(LOG_NOTICE," ");
  int i;
  for (i = 0; i < count; ++i)
    {
      const char *seconds =
        misc_secs_to_str ((int) (time (0) - table[i].header.rcv_time));
      bufprintf (buf, " \n\tSRC:%-15s",
                 print_if_id (table[i].header.flow_id.src));
      bufprintf (buf, " HOPS: %d ", table[i].header.hops_away);
      bufprintf (buf, " SECONDS_OLD: %s \n", seconds);
      rddhop_unparse (buf, &table[i].rddhop);
    }
}

/** state sync stuff end **/

typedef struct sinkgradient_state_s
{
  rddhop_table_t *all_rddhops;
  int all_rddhops_count;

  ssync_sub_t *rddhops_sub;
  flow_id_t fid;
  uint8_t sinkstatus;
  status_client_context_t * status_context;
  status_context_t * status_dev_context;
  status_context_t * status_dev_up_bin_context;
  status_context_t * status_dev_down_bin_context;
  status_context_t * status_dev_up_ascii_context;
  status_context_t * status_dev_down_ascii_context;
  rddhop_t * etx_lower;
  int etx_lower_count;
  rddhop_t * etx_higher;
  int etx_higher_count;
  uint32_t myetx;
  lu_context_t * link_context;
  
} sinkgradient_state_t;


/****************** stuff for rddhop_path_* **/
typedef struct rddhop_path_key_s
{
  uint32_t src;
} rddhop_path_key_t;

static void
key_destroy_notify (gpointer data)
{
  if (data != NULL)
    free (data);
  return;
}

static gint
key_comp (gconstpointer LENT a, gconstpointer LENT b, gpointer user_data)
{
  rddhop_path_key_t *aa = (rddhop_path_key_t *) a;
  rddhop_path_key_t *bb = (rddhop_path_key_t *) b;
  return aa->src - bb->src;
}

typedef struct rddhop_path_data_s
{
  GArray *array;                // of rddhop_t
} rddhop_path_data_t;


static rddhop_path_data_t *
data_new ()
{
  rddhop_path_data_t *data =
    (rddhop_path_data_t *) malloc (sizeof (rddhop_path_data_t));
  memset (data, 0, sizeof (rddhop_path_data_t));
  data->array = g_array_new (0, 1, sizeof (rddhop_t));
  return data;
}


static void
data_destroy_notify (gpointer data)
{
  if (data)
    {
      rddhop_path_data_t *d = (rddhop_path_data_t *) data;
      if (d->array)
        g_array_free (d->array, 1);
    }
}

/********************** end rddhop_path_* **/

  

int
sinkgradient_printable(status_context_t * LENT info, buf_t * LENT buf)
{
  sinkgradient_state_t* LENT state=(sinkgradient_state_t*)sd_data(info);
  bufprintf(buf,"RDDHOPS_TABLE\n");
  rddhop_table_unparse(buf, state->all_rddhops, state->all_rddhops_count);
  return STATUS_MSG_COMPLETE;
}


static void
convert_src_to_id(rddhop_table_t * table, int count)
{
  int i;
  for (i = 0; i < count; ++i)
    {
      uint32_t * src = &table[i].header.flow_id.src;
      cl_id_t * cl_id = cluster_map_lookup_by_address(cluster_map, *src, 1, 0);
      if(cl_id)
	{
	  *src = cl_id->node_id;
	}
    }
}


static int
rddhop_sub_handler (ssync_sub_t * sub, rddhop_table_t * LENT table,
                    int count, void *data)
{
  elog (LOG_NOTICE, " ");
  sinkgradient_state_t *state = (sinkgradient_state_t *) data;
  if (state->all_rddhops != NULL)
    free (state->all_rddhops);
  state->all_rddhops = table;
  state->all_rddhops_count = count;
  if (!use_mhsync)
    convert_src_to_id(table,count);
  
  buf_t *buf = buf_new ();
  rddhop_table_unparse (buf, state->all_rddhops, state->all_rddhops_count);
  elog (LOG_NOTICE, "%s", buf->buf);
  buf_free (buf);

  g_status_dev_notify(state->status_dev_context);
  return EVENT_RENEW;
}


static sinkgradient_state_t
init_state ()
{
  sinkgradient_state_t state = {
  all_rddhops:NULL,
  all_rddhops_count:0,
  rddhops_sub:NULL,
  fid:{
    //    src:my_node_id, Do this later
    dst:LINK_BROADCAST,
    src_if: 1,
    dst_if: 1,
    max_hops:MHSYNC_ONE_HOP
  },
  sinkstatus:0,
  status_context:NULL,
  status_dev_context:NULL,
  status_dev_up_bin_context:NULL,
  status_dev_up_ascii_context:NULL,
  status_dev_down_bin_context:NULL,
  status_dev_down_ascii_context:NULL,
  
  etx_lower:NULL,
  etx_lower_count:0,
  etx_higher:NULL,
  etx_higher_count:0,
  myetx:-1,
  };

  return state;
}


static void
usage (char * LENT name)
{
  misc_print_usage
    (name, "-U <device>", "  --uses <device>: Specify link device to use.");
  exit (1);
}

static void
init_link (char * LENT uses, char ** LENT argv, sinkgradient_state_t * LENT state)
{
  if (uses == NULL)
    {
      elog (LOG_CRIT, "Please specify a link to use!");
      usage (argv[0]);
    }

  lu_opts_t lu_opts = {
  opts:{
    name:uses,
  data:state}
    ,
  };
  if (lu_open (&lu_opts, &state->link_context) < 0)
    {
      elog (LOG_CRIT, "Can't open %s: %m", link_name (&lu_opts.opts, NULL));
      exit (1);
    }
  
  if (lu_get_if_id(state->link_context, &state->fid.src) != 0)
    {
      elog (LOG_CRIT, "Error looking up my interface id.");
      exit(1);
    }

  if (use_mhsync) state->fid.src = my_node_id;
  
  return;
}


static void
rddhop_sub (sinkgradient_state_t * LENT state)
{
  ssync_sub_opts_t opts = { reread_period:REREAD_PERIOD, read_refractory:REFRACTORY };
  if (rddhop_sub_open_full (use_prefix, rddhop_sub_handler, state, &opts, &state->rddhops_sub)
      < 0)
    {
      elog (LOG_ERR, "Unable to subscribe to rddhop channel: %m");
      exit (1);
    }
  return;
}

static void
key_free (rddhop_path_key_t * key)
{
  key_destroy_notify (key);
}


static rddhop_path_key_t *
key_new ()
{
  rddhop_path_key_t *key =
    (rddhop_path_key_t *) malloc (sizeof (rddhop_path_key_t));
  memset (key, 0, sizeof (rddhop_path_key_t));
  return key;
}

static rddhop_path_key_t *
key_dup (const rddhop_path_key_t * LENT key)
{
  rddhop_path_key_t *newkey = key_new ();
  memcpy (newkey, key, sizeof (rddhop_path_key_t));
  return newkey;
}

static rddhop_path_key_t *
key_init (const rddhop_table_t table)
{
  rddhop_path_key_t *key = key_new ();
  key->src = table.header.flow_id.src;
  return key;
}

/* static rddhop_path_key_t
key_copy (const rddhop_path_key_t * key)
{
  return *key;
} */

static rddhop_path_key_t
key_copy_src (uint32_t src)
{
  rddhop_path_key_t key = {
  src:src
  };
  return key;
}

// returns pointer to memory owned by the tree
// caller still responsible for the key
rddhop_path_data_t *
key_get_data (GTree * LENT tree, const rddhop_path_key_t * LENT key)
{
  rddhop_path_key_t *mykey = key_dup (key);
  int free_key = 1;

  rddhop_path_data_t *data = g_tree_lookup (tree, key);
  if (!data)
    {
      data = data_new ();
      g_tree_insert (tree, mykey, data);
      // tree now owns key. don't have to free.
      free_key = 0;
    }
  if (free_key)
    key_free (mykey);
  return data;
}

static void
data_append_hop (rddhop_path_data_t * LENT data, rddhop_t hop)
{
  g_array_append_val (data->array, hop);
}

GTree *
rddhop_table_to_path_tree (const rddhop_table_t * LENT table, int count)
{
  // GTree key is rddhop_path_key_t
  // key.src = rddhop_table_t[i].header.flow_id.src

  // GTree data is rddhop_path_data_t
  // g_array_append (data.array, rddhop_table_t[i].rddhop)

  GTree *tree =
    g_tree_new_full (key_comp, NULL, key_destroy_notify, data_destroy_notify);
  int i;
  for (i = 0; i < count; ++i)
    {
      // create a rddhop_path_key from rddhop_table_t[i]
      rddhop_path_key_t *key = key_init (table[i]);
      // look up that key.
      rddhop_path_data_t *data = key_get_data (tree, key);
      // append copy of rddhop_t to result;
      data_append_hop (data, table[i].rddhop);
      key_free (key);
    }
  return tree;
}

static inline rddhop_table_t *
rddhop_add_empty_block (rddhop_table_t * table, int count)
{
  size_t elem_size = sizeof (rddhop_table_t);
  size_t old_table_size = elem_size * count;
  size_t new_table_size = elem_size * (count + 1);
  rddhop_table_t *new_table = (rddhop_table_t *) malloc (new_table_size);
  memset (new_table, 0, new_table_size);
  memcpy (new_table, table, old_table_size);
  return new_table;
}


static inline int
rddhop_pub_add_empty (char * LENT prefix,
                      rddhop_table_t * LENT table, int count, flow_id_t * LENT fid)
{
  int result = 0;
  //  rddhop_table_t *new_table = rddhop_add_empty_block (table, count);
   result = rddhop_pub (prefix, table, count , fid);
  //  free (new_table);
  return result;
}


static inline int
rddhop_pub_helper(char * LENT prefix,
		      rddhop_table_t * LENT table, int count, flow_id_t * LENT fid)
{
  int result = 0;
  if (count == 0)
    {
     result = rddhop_pub_add_empty(prefix, table, count, fid);
    }
  else
    {
      result = rddhop_pub(prefix,table,count,fid);
    }
  return result;
}


gboolean
traverse_reject_nulls (gpointer LENT key, gpointer LENT value, gpointer LENT data)
{
  GArray *reject_array = (GArray *) data;
  rddhop_path_key_t *k = (rddhop_path_key_t *) key;
  rddhop_path_data_t *d = (rddhop_path_data_t *) value;

  // reject if host is zero or equal to self
  int i;
  for (i = 0; i < d->array->len; ++i)
    {
      rddhop_t hop = g_array_index (d->array, rddhop_t, i);
      if (hop.host == 0)
        {
          // reject this key
          g_array_append_val (reject_array, *k);
          break;
        }
    }
  return FALSE;
}

gboolean
traverse_reject_containing_self (gpointer LENT key, gpointer LENT value, gpointer LENT data)
{
  GArray *reject_array = (GArray *) data;
  rddhop_path_key_t *k = (rddhop_path_key_t *) key;
  rddhop_path_data_t *d = (rddhop_path_data_t *) value;

  // reject if host is zero or equal to self
  int i;
  for (i = 0; i < d->array->len; ++i)
    {
      rddhop_t hop = g_array_index (d->array, rddhop_t, i);
      if ( hop.host == my_node_id)
        {
          // reject this key
          g_array_append_val (reject_array, *k);
          break;
        }
    }
  return FALSE;
}

static void
path_tree_remove_key_array (GTree * LENT tree, GArray * LENT array)
{
  if (!array || !tree)
    return;
  int i;
  for (i = 0; i < array->len; ++i)
    {
      rddhop_path_key_t key = g_array_index (array, rddhop_path_key_t, i);
      g_tree_remove (tree, &key);
    }
}

static void
rddhop_path_data_update_etx (rddhop_path_data_t * LENT data, uint32_t etx)
{
  assert (data);
  int i;
  for (i = 0; i < data->array->len; ++i)
    {
      rddhop_t *hop = &g_array_index (data->array, rddhop_t, i);
      hop->etx += etx;
    }
}

gboolean
traverse_update_etx (gpointer LENT key, gpointer LENT value, gpointer LENT data)
{

  rddhop_path_data_t *d = (rddhop_path_data_t *) value;
  // update the etxs.
  rddhop_path_data_update_etx (d, 1);
  return FALSE;
}

 // do this by traversing tree. 
  // add (src, etx) to array
  //    find etx by searching array for src

gboolean
traverse_find_src_hop (gpointer LENT key, gpointer LENT value, gpointer LENT data)
{
  GArray *nexthop_array = (GArray *) data;
  rddhop_path_key_t *k = (rddhop_path_key_t *) key;
  rddhop_path_data_t *d = (rddhop_path_data_t *) value;

  // find corresponding etx.
  int i;
  for (i = 0; i < d->array->len; ++i)
    {
      rddhop_t hop = g_array_index (d->array, rddhop_t, i);
      if (hop.host == k->src)
        {
          g_array_append_val (nexthop_array, hop);
          break;
        }
    }
  return FALSE;
}

uint32_t
rddhop_etx_if_next_hop(rddhop_t hop)
{
  return 1 + hop.etx;
}

// asserts array is not empty
// DON'T GIVE ME AN EMPTY ARRAY!
static rddhop_t
rddhop_array_min_hop (GArray * LENT array)
{
  assert (array->len > 0);
  rddhop_t minhop = g_array_index (array, rddhop_t, 0);

  int i;
  for (i = 0; i < array->len; ++i)
    {
      rddhop_t hop = g_array_index (array, rddhop_t, i);
      if (rddhop_etx_if_next_hop(hop)
	  < rddhop_etx_if_next_hop(minhop))
        {
          minhop = hop;
        }
    }
  return minhop;
}

static void
rddhop_path_data_append_self (rddhop_t nexthop, rddhop_path_data_t * LENT data)
{
  assert (data); assert(data->array);
  rddhop_t hop = {
  hop_number:data->array->len,
  host:my_node_id,
  etx: rddhop_etx_if_next_hop(nexthop)
  };
  g_array_append_val (data->array, hop);
}


static rddhop_path_data_t *
rddhop_path_data_copy (rddhop_path_data_t * LENT data)
{
  assert(data);
  rddhop_path_data_t *newdata = data_new ();
  int i;
  for (i = 0; i < data->array->len; ++i)
    {
      rddhop_t hop = g_array_index (data->array, rddhop_t, i);
      g_array_append_val (newdata->array, hop);
    }
  return newdata;
}

static gint
rddhop_compare_etx_low_first(gconstpointer LENT a, gconstpointer LENT b)
{
  // return a - b
  rddhop_t * aa = (rddhop_t *)a;
  rddhop_t * bb = (rddhop_t *)b;
  return (aa->etx - bb->etx);
}

static gint
rddhop_compare_etx_high_first(gconstpointer LENT a, gconstpointer LENT b)
{
  // return a - b
  rddhop_t * aa = (rddhop_t *)a;
  rddhop_t * bb = (rddhop_t *)b;
  return (bb->etx - aa->etx);
}



GArray *
rddhop_table_to_GArray(rddhop_table_t * LENT table, int count)
{
  GArray * array = g_array_new(0,1,sizeof(rddhop_t));
  int i;
  for (i = 0; i < count; ++i)
    {
      g_array_append_val(array,table[i].rddhop);
    }
  return array;
			       
}


static void
update_etx_arrays(sinkgradient_state_t * LENT state)
{
  GTree * tree = rddhop_table_to_path_tree(state->all_rddhops,
					   state->all_rddhops_count);
  //// create an array of rddhop_t nexthops, with etx
  // reflecting etx to their sink.
  GArray * neighbor_array = g_array_new(0,1,sizeof(rddhop_t));
  // create reject_array
  GArray * reject_array =  g_array_new(0,1,sizeof(rddhop_t));
  g_tree_foreach(tree, traverse_reject_nulls, reject_array);
  // remove array from tree
  path_tree_remove_key_array(tree, reject_array);
  g_tree_foreach(tree, traverse_find_src_hop, neighbor_array);

  
  // Copy neighbor_array
  GArray * array = g_array_new(0,1,sizeof(rddhop_t));
  int i;
  for (i = 0; i < neighbor_array->len; ++i)
    g_array_append_val(array, g_array_index(neighbor_array,rddhop_t,i) );
  
  
  g_array_sort(array, rddhop_compare_etx_low_first);
  
  int myetx = 0;
  int etxfound = 0;
  for (i = 0; i < array->len; ++i)
    {
      rddhop_t hop = g_array_index(array, rddhop_t, i);
      if (hop.host == my_node_id) {
	myetx = state->myetx = hop.etx;
	etxfound = 1;
      }
      if ((etxfound) && (hop.etx != myetx)) // prune the rest
	{
	  g_array_remove_range(array, i, array->len - i);
	  break;
	}
    }

  if (state->etx_lower) free(state->etx_lower);
  state->etx_lower_count = array->len;

  state->etx_lower = (rddhop_t*)malloc(sizeof(rddhop_t)*state->etx_lower_count);
  memset(state->etx_lower,0,sizeof(rddhop_t) * state->etx_lower_count);
  for (i = 0; i < array->len; ++i)
    {
      memcpy(&state->etx_lower[i], &g_array_index(array,rddhop_t,i), sizeof(rddhop_t));
    }

  g_array_free(array,1);
  ////////////////   copy array again
  array = g_array_new(0,1,sizeof(rddhop_t));
  for (i = 0; i < neighbor_array->len; ++i)
    g_array_append_val(array, g_array_index(neighbor_array,rddhop_t,i) );

  g_array_sort(array,rddhop_compare_etx_high_first);

  myetx = 0;
  etxfound = 0;
  for (i = 0; i < array->len; ++i)
    {
      rddhop_t hop = g_array_index(array, rddhop_t, i);
      
      if (hop.host == my_node_id) {
	myetx = state->myetx = hop.etx;
	etxfound = 1;
      }
      if ((etxfound) && (hop.etx != myetx)) // prune the rest
	{
	  g_array_remove_range(array, i, array->len - i);
	  break;
	}
    }
  if (state->etx_higher) free(state->etx_higher);
  state->etx_higher_count = array->len;
  state->etx_higher = (rddhop_t*)malloc(sizeof(rddhop_t)*state->etx_higher_count);
  memset(state->etx_higher,0,sizeof(rddhop_t)*state->etx_higher_count);
  for (i = 0; i < array->len; ++i)
    {
      memcpy(&state->etx_higher[i], &g_array_index(array,rddhop_t,i), sizeof(rddhop_t));
    }

  g_array_free(array, 1);
  g_array_free(neighbor_array,1);
  g_array_free(reject_array, 1);
  g_tree_destroy(tree);
}
  
  
int
up_bin(status_context_t * LENT info, buf_t * LENT buf)
{
  sinkgradient_state_t * state= (sinkgradient_state_t*)sd_data(info);
  update_etx_arrays(state);
  bufcpy(buf, state->etx_higher, state->etx_higher_count * sizeof(rddhop_t));
  return STATUS_MSG_COMPLETE;
}

int
down_bin(status_context_t * LENT info, buf_t * LENT buf)
{
  sinkgradient_state_t * state= (sinkgradient_state_t*)sd_data(info);
  update_etx_arrays(state);
  bufcpy(buf, state->etx_lower, state->etx_lower_count * sizeof(rddhop_t));
  return STATUS_MSG_COMPLETE;
}
  
int
up_ascii(status_context_t * LENT info, buf_t * LENT buf)
{
  sinkgradient_state_t * state= (sinkgradient_state_t*)sd_data(info);
  update_etx_arrays(state);
  bufprintf(buf, "My ETX: %d: Those that are HIGHER...\n", state->myetx);
  int i;
  for (i = 0; i < state->etx_higher_count; ++i)
    {
      bufprintf(buf, "host: %s, etx %d\n", print_if_id(state->etx_higher[i].host), state->etx_higher[i].etx);
    }
      
  return STATUS_MSG_COMPLETE;
}

int
down_ascii(status_context_t * LENT info, buf_t * LENT buf)
{
  sinkgradient_state_t * state= (sinkgradient_state_t*)sd_data(info);
  update_etx_arrays(state);
  bufprintf(buf, "My ETX: %d: Those that are LOWER...\n", state->myetx);
  int i;
  for (i = 0; i < state->etx_lower_count; ++i)
    {
      bufprintf(buf, "host: %s, etx %d\n", print_if_id(state->etx_lower[i].host), state->etx_lower[i].etx);
    }

  return STATUS_MSG_COMPLETE;
}


// returns allocated memory
// modifies tree
// assumes that i'm not the sink
// throws alway all paths with my node id since they're considered loops
// returns path of length zero if there's no match.
static rddhop_path_data_t * 
path_tree_create_best_path (GTree * LENT tree)
{
  assert (tree);
  rddhop_path_data_t *bestpath = NULL;
  GArray * nexthop_array = g_array_new (0, 1, sizeof(rddhop_t));
  // create reject_array (containing keys to remove);
  GArray *reject_array = g_array_new (0, 1, sizeof(rddhop_path_key_t));
  g_tree_foreach (tree, traverse_reject_nulls, reject_array);
  g_tree_foreach (tree, traverse_reject_containing_self, reject_array);
  // remove array from tree
  path_tree_remove_key_array (tree, reject_array);
  int found = 0;
  if (g_tree_nnodes(tree) > 0)
    {
      // update etx
      // g_tree_foreach (tree, traverse_update_etx, NULL);

      g_tree_foreach (tree, traverse_find_src_hop, nexthop_array);

      if (nexthop_array->len > 0) {
	found = 1;
	// find the min (src,etx)
	rddhop_t minhop = rddhop_array_min_hop (nexthop_array);
	// lookup src in tree and get array
	rddhop_path_key_t minkey = key_copy_src (minhop.host);
	rddhop_path_data_t *mindata = g_tree_lookup (tree, &minkey);
	rddhop_path_data_append_self (minhop, mindata);
	// copy this array;
	bestpath = rddhop_path_data_copy (mindata);
      }
    }
  if (found == 0)
    {
      // this is an empty path. I don't know how to reach the sink
      bestpath = data_new();
    }
  // free up memory
  g_array_free (reject_array, 1);
  g_array_free (nexthop_array, 1);
  return bestpath;
}


static rddhop_table_t *
rddhop_path_data_to_table (const rddhop_path_data_t * LENT data)
{
  assert(data);
  GArray LENT * array = data->array;
  assert(array);
  elog(LOG_INFO," ");
  int count = array->len;
  rddhop_table_t * table = (rddhop_table_t *)malloc(sizeof(rddhop_table_t)*count);
  memset(table,0,sizeof(rddhop_table_t)*count);
  int i;
  for (i = 0; i < count; ++i)
  {
    table[i].rddhop = g_array_index(array,rddhop_t, i);
  }
  return table;
}


static void
rddhop_path_data_free(rddhop_path_data_t * data)
{
  data_destroy_notify(data);
}


static rddhop_path_data_t *
sink_path(sinkgradient_state_t * LENT state)
{
  rddhop_path_data_t * data = data_new();
  rddhop_t hop = { hop_number : 0, host : my_node_id, etx : 0 };
  data_append_hop(data, hop);
  return data;
}


static int
sinkgradient_timer (gpointer LENT data, int interval, g_event_t * ev)
{
  elog (LOG_NOTICE, " ");
  sinkgradient_state_t *state = (sinkgradient_state_t *) data;
  rddhop_path_data_t * best_path = NULL;
  GTree * tree = NULL;
  
  if (state->sinkstatus)
    {
      // create a rddhop_path_data_t that just has me as sink
      elog(LOG_NOTICE,"I'm a sink, so pushing sink_path.");
      best_path = sink_path(state);
    }
  else
    {
      elog(LOG_NOTICE,"Not a sink. So I have to calculate...");
      // copy rddhop_table_t into a list of lists
      tree = rddhop_table_to_path_tree (state->all_rddhops,
					state->all_rddhops_count);
      // pass tree to something that returns an array we should publish
      best_path = path_tree_create_best_path (tree);
    }
  // convert path to table
  rddhop_table_t *table = rddhop_path_data_to_table (best_path);
  int count = best_path->array->len;
  // publish
  if (rddhop_pub_helper (use_prefix, table, count, &state->fid))
    {
      elog (LOG_CRIT, "Unable to publish rddhop_table");
    }
  buf_t *buf = buf_new ();
  bufprintf (buf, "Publishing the following RDDHOP_TABLE...");
  rddhop_table_unparse (buf, table, count);
  elog (LOG_NOTICE, "%s", buf->buf);
  buf_free (buf);

  free (table);
  rddhop_path_data_free(best_path);
  if (tree)
    g_tree_destroy(tree);


  g_status_dev_notify(state->status_dev_context);
  g_status_dev_notify(state->status_dev_up_bin_context);
  g_status_dev_notify(state->status_dev_down_bin_context);
  g_status_dev_notify(state->status_dev_up_ascii_context);
  g_status_dev_notify(state->status_dev_down_ascii_context);

  return TIMER_RENEW_MS(TIMER_DELAY(count));
}


static void
init_timer (sinkgradient_state_t * LENT state)
{
  if (g_timer_add (SINKGRADIENT_TIMER, sinkgradient_timer, state, NULL, NULL) < 0)
    {
      elog (LOG_CRIT, "Unable to create timer event.");
      exit (1);
    }
}


void
sinkgradient_shutdown (void * LENT data)
{
  elog (LOG_NOTICE, "sinkgradient shutting down.");
  exit (0);
}


static int
status_handler(void*new_buf, size_t size, void * LENT data)
{
  elog(LOG_INFO," ");
  if (size == 0)
    {
      elog(LOG_ERR,"Size is 0!");
      goto final;
    }
  if (!new_buf)
    {
      elog(LOG_ERR,"new_buf is NULL");
      goto final;
    }
  sinkgradient_state_t * state = (sinkgradient_state_t *)data;
  int new_status = atoi((char*)new_buf);
  if ((new_status == 0)||(new_status == 1))
    {
      if (new_status!=state->sinkstatus)
	{
	  elog(LOG_NOTICE,"Changing status to %d", new_status);
	  state->sinkstatus = new_status;
	}
      else
	{
	  elog(LOG_NOTICE,"Status is the same.");
	}
    }
  else
    {
      elog(LOG_ERR,"Unrecognized status");
      goto final;
    }
 final:
  if(new_buf)
    free(new_buf);
  return EVENT_RENEW;
}


static void
status_init (sinkgradient_state_t * LENT state)
{
  status_client_opts_t opts = {
    devname:sim_path("/dev/block/sinkstatus"),
    handler:status_handler,
    private_data:state
  };
  if (g_status_client_full(&opts, &state->status_context) < 0)
    {
      elog (LOG_ERR,"Unable to open status client to %s", opts.devname);
      exit(1);
    }
}

void
status_dev_init(sinkgradient_state_t * LENT state, char * LENT dev_name, status_response_cb_t printable_cb, status_response_cb_t binary_cb, status_context_t ** context)
{
  status_dev_opts_t status_opts = {
    device:{
      devname: sim_path (dev_name), device_info:state}
    , printable:printable_cb,
    binary: binary_cb
  };
  if (g_status_dev (&status_opts, context) < 0)
    {
      elog (LOG_CRIT, "Can't create status device %s: %m",
	    status_opts.device.devname);
      exit (1);
    }
  g_status_dev_notify(*context);
}


int
main (int argc, char ** LENT argv)
{
  misc_init (&argc, argv, CVSTAG);
  emrun_opts_t emrun_opts = {
  shutdown:sinkgradient_shutdown
  };
  emrun_init (&emrun_opts);
  char * uses = link_parse_uses(&argc, argv, NULL);
  if (argc > 2)
    usage (argv[0]);

  use_mhsync = misc_parse_out_switch(&argc, argv, "use_mhsync", '\0');
  if (use_mhsync) {
    use_prefix = SSYNC_MULTIHOP_PREFIX;
    elog (LOG_CRIT,"Using mhsync");
  }
  
  
  sinkgradient_state_t state = init_state ();
  rddhop_sub (&state);
  init_timer (&state);
  status_init (&state);
  status_dev_init(&state, "/dev/block/sinkgradient", sinkgradient_printable,
		  NULL, &state.status_dev_context);
  status_dev_init(&state,"/dev/block/up_bin", NULL, up_bin,&state.status_dev_up_bin_context);
  status_dev_init(&state,"/dev/block/down_bin", NULL, down_bin,&state.status_dev_down_bin_context);
  status_dev_init(&state,"/dev/block/up_ascii", up_ascii,NULL, &state.status_dev_up_ascii_context);
  status_dev_init(&state, "/dev/block/down_ascii", down_ascii, NULL, &state.status_dev_down_ascii_context);
  init_link(uses, argv, &state);

   /* cluster map event */
   cluster_map_opts_t cl_opts = {
     private_data: &state
   };
   if (cluster_map_new(&cl_opts, &cluster_map) < 0) {
     elog(LOG_CRIT, "Can't connect to cluster map: %m");
     exit(1);
   }
  
  g_status_dev_notify(state.status_dev_context);
  g_main ();
  return 1;
}




See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  sinkgradient.c
  sinkgradient_with_LENT.c