Code Search for Developers
 
 
  

sympathy_main.c from EmStar at Krugle


Show sympathy_main.c syntax highlighted

/* ex: set tabstop=2 expandtab shiftwidth=2 softtabstop=2: */
/*
 *
 * Copyright (c) 2003 The Regents of the University of California.  All 
 * rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Neither the name of the University nor the names of its
 *   contributors may be used to endorse or promote products derived
 *   from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */

 /*
  *
  * Author: Nithya Ramanthan
  *
  */

/* NR TODO: dont set check_for_failures always in insert_neighbors - only
 * set it if recorded neighbors/routes differs from reported.
 * *** Dont just look for solid epochs, instead calculate time since
 * last heard from a node - set a timer every time get a packet, and 
 * if timer triggers, then havent heard from that node in x minutes so 
 * indicate it! */
#include "sympathy.h"
#include "sympathy_dev.h"
#include <tos-contrib/sympathy/tos/lib/Sympathy.h>

#define DEFAULT_ERRORS_LINK "mote0"
enum {
  S_ROLLOVER_FINE = 0,
  S_ROLLOVER_IS_REBOOT,
};

int sn_size = sizeof(Sneighbor_info_t);
int stats_size = sizeof(Sympathy_stats_t);
int app_stats_size = sizeof(Sympathy_comp_stats_t);
sympathy_ctx_t sink = {};
uint8_t seqno = 0;
uint8_t expect_ctr = 0;
int TRACK_FAILURE_WINDOW_SIZE = TRACK_FAILURE_WINDOW_SIZE_DEFAULT;
int NUM_ITERATIONS_FOR_FAIL;

static sympathy_node_info_t* add_src(int src);

/*** Non-static functions ***/
sympathy_node_info_t* find_status_ptr(if_id_t src)
{
  int sctr = find_status(src);
  if (sctr < 0) {
    if (src == my_node_id) {
      elog(LOG_ERR, "ERROR can't even find pointer for me: %d\n", 
        my_node_id);
      exit(1);
    }
    return NULL;
  }
  return &sink.status_srcs[sctr];
}

int find_status(if_id_t src)
{
  int i;
	if (src == 0) return -1;
	for (i = 0; i < sink.num_srcs; i++) 
	{
    if (sink.status_srcs[i].addr == src) return i;
	}
  return -1;
}

/**** STATIC FUNCS ****/

static
int duplicate_seqno(sympathy_node_info_t* stat, int my_seqno) {
  int i;
  for (i = 0; i < SNUM_SEQNO_STORE; i++) {
    if (stat->seqno[i] == my_seqno) {
      elog(LOG_DEBUG(1), "found duplicate!\n");
      return 1;
    }
  }
  stat->seqno[stat->seqno_ctr] = my_seqno;
  inc_mod_int(&stat->seqno_ctr, 1, SNUM_SEQNO_STORE);
  return 0;
}


/* To address issue where: if an update to a variable came during period x, 
 * and then the next update only comes at x + 10, then 
 * agg_prev_metric_pd[window] gets the whole difference between x and x+10, 
 * which causes a false inflation:
 *   so, if interpolate set: if the number of pds since last update is > 
 * windows stored, interpolate between the previous value and current value
 * to estimate what value should be stored in each window.
 * ** Include everything when calculating the final total (in the ctr field)
 * but, just don't put whole total in one window - spread it out amongst past 
 * windows. 
 * NOTE: This assumes that packets are expected to be regularly transmitted
 * by the node. Interpolate is only set for stats_ctrs corresponding to stats
 * coming from the node
 */
static 
void stats_ctr_inc(stats_ctr_t* recorded_val, uint new_val, 
    int treat_rollover_as_reboot, uint8_t size_field, uint8_t interpolate)
{
  gettimeofday(&recorded_val->last_updated, NULL);

  /* Check if we have ever updated this counter before */
  if ((recorded_val->iter_updated > 0) || (recorded_val->ctr > 0))
  {
    /* If we detected a reboot- subtraction falsely inflates values, so instead
     * just treat the new value as an absolute */
    if ((new_val < recorded_val->prev_val) && (treat_rollover_as_reboot))
    {
      elog(LOG_DEBUG(1), "Rebooted!\n");
      recorded_val->agg_prev_metric_pd[sink.window] = new_val;
      recorded_val->ctr = new_val;
    }
    else
    {
      uint inc_val;

      /* If the size of the struct is 1B, then use uint8_t */
      if (size_field == 1) {
        inc_val = (uint8_t) (new_val - recorded_val->prev_val);
      }
      else if (size_field == 2) {
        inc_val = (uint16_t) (new_val - recorded_val->prev_val);
      }
      else if (size_field == 4) {
        inc_val = (uint) (new_val - recorded_val->prev_val);
      }

      /* We see if the last update we got is before the previous period.
       * If it is, then - if desired - we will interpolate to fill in empty
       * windows */
      if ((interpolate) && (1 + recorded_val->iter_updated < sink.metric_pd)) {
        uint num_pds = sink.metric_pd - recorded_val->iter_updated;
        uint start_ind = sink.window;
        uint max_pds = min(num_pds, TRACK_FAILURE_WINDOW_SIZE);

        /* Calculate the value that should be in each window */
        uint interpolate_val = inc_val / num_pds;
        elog(LOG_DEBUG(1), "num_pds %d, start-index %d, max_pds %d, interpolated val %d\n", 
            num_pds, start_ind, max_pds, interpolate_val);
        while (max_pds > 0) {
          if ((recorded_val->agg_prev_metric_pd[start_ind] > 0) 
              || (recorded_val->agg_prev_metric_pd_updated[start_ind])) {
            elog(LOG_ERR, "ERROR max_pds=%d, start_index=%d, counter val is %d, should be 0!!\n", 
                max_pds, start_ind,
                recorded_val->agg_prev_metric_pd[start_ind]);
          }
          else {
            recorded_val->agg_prev_metric_pd[start_ind] = interpolate_val;
            recorded_val->agg_prev_metric_pd_updated[start_ind] = 1;
          }
          max_pds--;
          start_ind = dec_mod(start_ind, TRACK_FAILURE_WINDOW_SIZE);
          elog(LOG_DEBUG(1), "%d:start_ind %d\n", max_pds, start_ind);
        }
      }
      else {
        recorded_val->agg_prev_metric_pd[sink.window] += inc_val;
      }
      recorded_val->ctr += inc_val;
    }
  }
  else 
  {
    recorded_val->ctr = new_val;
    recorded_val->agg_prev_metric_pd[sink.window] = new_val; 
  }
  recorded_val->agg_prev_metric_pd_updated[sink.window] = 1;
  recorded_val->iter_updated = sink.metric_pd;
  recorded_val->prev_val = new_val;
}

static
int non_generic(int type)
{
  if (type > SCOMP_LAST) return (type - SCOMP_LAST);
  return type;
}

/* this function will send the packet to the routing component
 * which will send it out on the correct layer */
static
int sympathy_send_pkt(Spkt_t* pkt, void* data, ssize_t data_len)
{
  link_pkt_t lhdr = {
    dst: {
      id: LINK_BROADCAST,
    },
    src: {
      id: my_node_id,
    },
    type: STO_NETWORK,
  };
  buf_t* buf = buf_new();
  bufcpy(buf, &lhdr, sizeof(link_pkt_t));
  bufcpy(buf, pkt, sizeof(Spkt_t));
  if (data_len > 0) bufcpy(buf, data, data_len);
  elog(LOG_DEBUG(1), "will send pkt of size %d\n", buf->len - sizeof(link_pkt_t));
  if ((lu_send(sink.sympathy_link, (link_pkt_t*)buf->buf, buf->len - sizeof(link_pkt_t))) < 0)
  {
    elog(LOG_ERR, "ERROR Unable to send packet on sympathy link: %m!\n");
  }
  buf_free(buf);
  return 0;
}

static
void ping_node(int node)
{
  Srequest_t pd = {
    seqno: seqno++,
    val: node,
  };
  Spkt_t pkt = {
    type: SREQUEST_PING_NODE,
  };
  sympathy_send_pkt(&pkt, (void *)&pd, sizeof(Srequest_t));
}

static
void change_auto_period(uint16_t period_secs)
{
  Srequest_t pd = {
    seqno: seqno++,
    val: (period_secs / 60),
  };
  Spkt_t pkt = {
    type: SREQUEST_CHANGE_AUTO_PD,
  };

  g_timer_resched(sink.auto_pd_timer, period_secs * 1000);
  sympathy_send_pkt(&pkt, (void *)&pd, sizeof(Srequest_t));
}

static
int check_for_failures(void* data, int interval, g_event_t* event)
{
  if (sink.check_for_failures) {
    track_lost_nodes();
  }
  sink.check_for_failures = 0;
  return EVENT_RENEW;
}

static
int expect_smetric(void* data, int interval, g_event_t* event)
{
  if (expect_ctr < 3) expect_ctr++;
  stats_ctr_inc(&sink.expected_num_sympathy_metrics,
    sink.expected_num_sympathy_metrics.ctr + 1, 0, 
    sizeof(sink.expected_num_sympathy_metrics.ctr),0);
  return EVENT_RENEW;
}

int find_neighbor(sympathy_node_info_t* stat, Saddr_t neighbor)
{
  int i;
  for (i = 0; i < stat->num_neighbors; i++)
  {
    if (stat->neighbors[i].node_id == neighbor) return i;
  }
  return -1;
}

static void insert_neighbors(Sneighbor_info_t* neighbor_table, 
    int num_neighbors, sympathy_node_info_t* stat)
{
  int t, i;
  Squality_t my_egress = 0;
  sympathy_node_info_t tmp;
  struct timeval now;

  memcpy(&tmp, stat, sizeof(sympathy_node_info_t));
  gettimeofday(&now, NULL);

  if (num_neighbors > MAX_NEIGHBORS) {
    elog(LOG_CRIT, "WARNING, Cannot add %d neighbors, max is: %d!!\n", 
        num_neighbors, MAX_NEIGHBORS);
  }

  stat->num_neighbors = 0;
  elog(LOG_DEBUG(1), "node %d, num-neighbors: %d\n", 
      stat->addr, num_neighbors);

  for (i = 0; i < num_neighbors; i++) {
    sneighbor_info_t tmp_neighbor = {
      time_added: now,
      time_max_change: now,
    };

    if (stat->num_neighbors >= MAX_NEIGHBORS) {
      stat->num_neighbors = MAX_NEIGHBORS;
      goto done;
    }

    if (neighbor_table[i].addr > 0) {

      /* If there is a previous entry for this node:
       *  - get old egress value if current info is not providing it
       *  - don't update the time_added, but update other info */
      if ((t = find_neighbor(&tmp, neighbor_table[i].addr)) >= 0) {

        if (my_egress == 0) my_egress = stat->neighbors[t].conn_to;

        tmp_neighbor.num_samples = stat->neighbor_info[t].num_samples + 1;
        tmp_neighbor.time_added = stat->neighbor_info[t].time_added;
      }

      stat->neighbors[stat->num_neighbors].node_id = neighbor_table[i].addr;
      memcpy(&stat->neighbor_info[stat->num_neighbors], &tmp_neighbor,
          sizeof(sneighbor_info_t));
      stat->neighbors[stat->num_neighbors].if_id = neighbor_table[i].addr;
      stat->neighbors[stat->num_neighbors].conn_to = my_egress;
      stat->neighbors[stat->num_neighbors].state = ACTIVE;
      stat->num_neighbors++;

      /* Sympathy should be tracking all nodes that are on all neighbor
       * lists. So, if a node is on a neighbor list, but we haven't 
       * heard it directly, still include it */
      {
        sympathy_node_info_t* tmp = find_status_ptr(neighbor_table[i].addr);
        if (!tmp) add_src(neighbor_table[i].addr);
      }
    }
  }

done:
  gettimeofday(&stat->neighbors[stat->num_neighbors].last_heard, NULL);
  elog(LOG_DEBUG(1), "CHECK node %d, recorded num-neighbors: %d\n", 
      stat->addr, stat->num_neighbors);

  /* Push neighbor information to emview */
  if (stat->num_neighbors > 0) sympathy_emview_neighbors(stat);

  /* Call failure diagnosis algorithm because we have new info now */
  sink.check_for_failures = 1;
}

/* Called when sympathy receives a PATH_ADVERT packet */
static
void handle_route_info(sympathy_node_info_t* stat, Snext_hop_t* route)
{
  if (route->next_hop > 0) {
    if (stat->next_hop.next_hop != route->next_hop) {
      gettimeofday(&stat->next_hop_info.time_changed, NULL);
      stat->next_hop_info.num_changes++;

      /* Only trigger failure detection if route has changed */
      sink.check_for_failures = 1;
    }
    stat->next_hop.next_hop = route->next_hop;
  }
  stat->next_hop.sink = route->sink;
  stat->next_hop.quality = route->quality;
  sympathy_emview_route(stat);
}

/* If returns -1, then packet is garbage */
static
int check_rollover(struct timeval *time_prev_pkt, int reported_time, int recorded_time)
{
  struct timeval now;
  unsigned long diff_usec;
  int diff_mins;
  uint diff = reported_time - recorded_time;

  gettimeofday(&now, NULL);
  diff_usec = misc_tv_offset(&now, time_prev_pkt);
  diff_mins = diff_usec / (MILLION_I * 60);

  /* Get diff between reported_time and our recorded time to see if 
   * rollover is possible */
  if (diff <= diff_mins) return S_ROLLOVER_FINE;

  /* Get difference between reported_time and 0 to see if reboot is possible */
  if (reported_time <= diff_mins) return S_ROLLOVER_IS_REBOOT;
  return -1;
}

/* Set the last_epoch value.
 * If a value was not accumulated during a certain window, then if
 * interpolate is set, we take the average value and assume it for the
 * empty window. WE would only use this for expected values*/
void stats_ctr_update(stats_ctr_t* ctr, uint8_t clear, uint8_t interpolate)
{
  int i;
  int num_ctr = 0;

  /* If clear == 1, then clear value aggregated during the prev metric-pd
   * which has already been advanced, so its pointing to the "next" window */
  if (clear) {
    ctr->agg_prev_metric_pd[sink.window] = 0;
    ctr->agg_prev_metric_pd_updated[sink.window] = 0;
  }

  else {
    ctr->agg_prev_epoch = 0;
    for (i = 0; i < TRACK_FAILURE_WINDOW_SIZE; i++)
    {
      if (ctr->agg_prev_metric_pd_updated[i]) {
        num_ctr++;
        ctr->agg_prev_epoch += ctr->agg_prev_metric_pd[i];
      }
    }

    if ((interpolate) && (num_ctr > 0)) {
      int avg = ctr->agg_prev_epoch/num_ctr;
      elog(LOG_DEBUG(1), "avg %d\n", avg);

      /* If any slot hasn't been updated, then we just add the average to
       * the total aggregate for this epoch! */
      for (i = 0; i < TRACK_FAILURE_WINDOW_SIZE; i++) {
        if (!ctr->agg_prev_metric_pd_updated[i]) {
          elog(LOG_DEBUG(1), "adding avg to window %d\n", i);
          ctr->agg_prev_epoch += avg;
        }
      }
    }
  }
}

static
int find_app_type(int type)
{
  int i, munged_type = non_generic(type);
  if (type == SCOMP_STATS_SYMPATHY) {
    elog(LOG_ERR, "ERROR Something is wrong - why are you sending Sympathy stats type %d!\n", type);
    return -1;
  }

  for (i = 0; i < sink.num_apps_registered; i++)
  {
    if (munged_type == sink.app_type[i]) return i;
  }

  if (sink.num_apps_registered < SCOMP_LAST - 1)
  {
    sink.app_type[sink.num_apps_registered] = munged_type;
    sink.num_apps_registered++;
    return sink.num_apps_registered - 1;
  }
  return -1;
}

static
void record_app_stats(sympathy_node_info_t* stat, Sympathy_comp_stats_t* stats,
    stats_pkt_type_t type, int reboot)
{
  int actr = find_app_type(type);
  sympathy_node_app_info_t* sninfo;

  if (actr < 0)
  {
    elog(LOG_ERR, "Can't fit app: %d\n", type);
    return;
  }
  sninfo = &stat->node_app_info[actr];
  stats_ctr_inc(&sninfo->app_stats_rx_from_node, 
      sninfo->app_stats_rx_from_node.ctr+1,0,
      sizeof(sninfo->app_stats_rx_from_node.ctr), 0);

  stats_ctr_inc(&sninfo->node_num_pkts_rx,
      (uint) stats->num_pkts_rx, reboot,
      sizeof(stats->num_pkts_rx), 1);
  stats_ctr_inc(&sninfo->node_send_failures,
      (uint) stats->send_failures, reboot,
      sizeof(stats->send_failures), 1);
  stats_ctr_inc(&sninfo->node_max_queue_occupancy,
      (uint) stats->max_queue_occupancy, reboot,
      sizeof(stats->max_queue_occupancy), 1);
  stats_ctr_inc(&sninfo->node_num_pkts_dropped,
      (uint) stats->num_pkts_dropped, reboot,
      sizeof(stats->num_pkts_dropped), 1);
  stats_ctr_inc(&sninfo->node_num_pkts_tx,
      (uint) stats->num_pkts_tx, reboot,
      sizeof(stats->num_pkts_tx), 1);
}

static
int record_sympathy_stats(sympathy_node_info_t* stat, Sympathy_stats_t* stats)
{
  int accept = S_ROLLOVER_FINE;

  if (stats->time_awake_mins < stat->time_awake_mins.prev_val)
  {
    /* Check if the packet is garbage by detecting false roll-over warnings 
     * by looking at its time awake */
    accept = check_rollover(&stat->sympathy_stats_rx.last_updated, 
        stats->time_awake_mins, stat->time_awake_mins.ctr);
    if (accept < S_ROLLOVER_FINE) 
    {
      elog(LOG_WARNING, "WARNING: Packet is garbage!\n");
      return -1;
    }
  }

  if (accept == S_ROLLOVER_IS_REBOOT) stat->rebooted = 1;

  stats_ctr_inc(&stat->time_awake_mins, stats->time_awake_mins,  
      stat->rebooted, sizeof(stats->time_awake_mins), 1);
  stats_ctr_inc(&stat->num_metrics_tx, stats->num_metrics_tx,  
      stat->rebooted, sizeof(stats->num_metrics_tx), 1);
  stats_ctr_inc(&stat->num_stats_tx, stats->num_stats_tx,  
      stat->rebooted, sizeof(stats->num_stats_tx), 1);
  stats_ctr_inc(&stat->num_pkts_tx_succeeded, stats->num_pkts_tx_succeeded,  
      stat->rebooted, sizeof(stats->num_pkts_tx_succeeded), 1);
  stats_ctr_inc(&stat->num_pkts_rx, stats->num_pkts_rx,  
      stat->rebooted, sizeof(stats->num_pkts_rx), 1);
  stats_ctr_inc(&stat->num_pkts_dropped, stats->num_pkts_dropped,  
      stat->rebooted, sizeof(stats->num_pkts_dropped), 1);
  stats_ctr_inc(&stat->num_pkts_tx_failed, stats->num_pkts_tx_failed,  
      stat->rebooted, sizeof(stats->num_pkts_tx_failed), 1);
   stats_ctr_inc(&stat->num_pkts_crc_error, stats->num_pkts_crc_error,  
  stat->rebooted, sizeof(stats->num_pkts_crc_error), 1);

  stats_ctr_inc(&stat->sympathy_stats_rx, stat->sympathy_stats_rx.ctr + 1, 
      0, sizeof(stat->sympathy_stats_rx.ctr), 0);
  return accept;
}

static
void send_stats_pkt(int addr, void* stats, int type, int len)
{
  buf_t* buf = buf_new();
  link_pkt_t pkt = {
    dst: {
      id: LINK_BROADCAST
    },
    src: {
      id: addr
    },
    ext_type: type,
    type: SCOMP_STATS,
  };

  bufcpy(buf, &pkt, sizeof(link_pkt_t));
  bufcpy(buf, stats, len);

  elog(LOG_DEBUG(1), "buflen: %d\n", buf->len);
  if (lu_send(sink.sympathy_link, (link_pkt_t *)buf->buf, buf->len - sizeof(link_pkt_t)) < 0)
  {
    elog(LOG_ERR, "Unable to send pkt: %m!\n");
  }
  buf_free(buf);
}

static
void handle_sympathy_stats(sympathy_node_info_t* stat, uint8_t* data, ssize_t len)
{
  stats_pkt_type_t type;
  int offset = 0;
  Sympathy_stats_t* s_stats;
  Sympathy_comp_stats_t* a_stats;
  int reboot = 0;
  generic_comp_stats_t* gen_pkt;

  /* This is a stats-packet of the format: {type, data} */
  if (len == 0) return;

  while (offset < len-1)
  {
    type = data[offset++];
    elog(LOG_DEBUG(1), "offset %d, pkt len: %d, first type: %d\n", offset,
        len, type);

    switch (type)
    {
      case (SCOMP_STATS_SYMPATHY):
        s_stats = (Sympathy_stats_t *)&data[offset];
        if (len - offset >= stats_size)
        {
          reboot = record_sympathy_stats(stat, s_stats);
          offset += stats_size;
        }
        else elog(LOG_ERR, "Type is %d, but remaining size: %d\n", type,
            len - offset);
      break;
      case (SCOMP_STATS2):
      case (SCOMP_STATS3):
      case (SCOMP_STATS4):
      case (SCOMP_STATS5):
      case (SCOMP_STATS6):
        a_stats = (Sympathy_comp_stats_t *)&data[offset];
        if (len - offset >= app_stats_size)
        {
          record_app_stats(stat, a_stats, type, reboot);
          offset += app_stats_size;
        }
        else elog(LOG_ERR, "Type is %d, but remaining size: %d\n", type,
            len - offset);
        break;
      case (SCOMP_STATS2_GEN):
      case (SCOMP_STATS3_GEN):
      case (SCOMP_STATS4_GEN):
      case (SCOMP_STATS5_GEN):
      case (SCOMP_STATS6_GEN):
        gen_pkt = (generic_comp_stats_t *) &data[offset];
        elog(LOG_DEBUG(1), "comp-specific stats with len: %d!\n", 
            gen_pkt->len);
        send_stats_pkt(stat->addr, gen_pkt->data, type, gen_pkt->len);
        offset += gen_pkt->len;
        break;
      default:
        elog(LOG_ERR, "Unidintifiable stats-type: %d\n", type);
        break;
    }
  }
}

static
void handle_sympathy_metrics(sympathy_node_info_t* stat, Smetrics_t *metrics, 
    ssize_t data_len)
{
  stats_ctr_inc(&stat->metrics_rx, stat->metrics_rx.ctr + 1, 0,
      sizeof(stat->metrics_rx.ctr), 0);

  /* First check if we are even supposed to be getting these! */
  if (sink.sympathy_auto_metric_period== 0)
  {
    elog(LOG_ERR, "auto-period = %d! Shouldn't get auto-metric pkts Send another cancel!\n",
        sink.sympathy_auto_metric_period);
    bufprintf(stat->fault_buf, 
        "Node %d BAD: auto-period = %d! Shouldn't get auto-metric pkts. Send another cancel!\n",
      stat->addr, sink.sympathy_auto_metric_period);
    change_auto_period(0);
  }

	/* Get the next-hop info */
  /* First sanity check */
  if (data_len < sizeof(Smetrics_t)) {
    elog(LOG_ERR, "Pkt-size: %d < Smetrics_t size (%d)\n", 
        data_len, sizeof(Smetrics_t));
    return;
  }

  handle_route_info(stat, &metrics->route);

	/* Get the neighbor-list info */
  elog(LOG_DEBUG(1), "node %d, data-len %d, metrics num-neighbors: %d\n", 
      stat->addr, data_len, metrics->num_neighbors);

  /* First sanity check */
  if (data_len < sizeof(Smetrics_t)
      + (sizeof(Sneighbor_info_t) * metrics->num_neighbors)) {
    elog(LOG_ERR, "Pkt-size: %d < Smetrics_t + neighbors size (%d)\n", data_len,
        sizeof(Smetrics_t) + sizeof(Sneighbor_info_t) * metrics->num_neighbors);
    return;
  }

  insert_neighbors(metrics->neighbors, metrics->num_neighbors, stat);
}

void handle_sympathy_data(sympathy_node_info_t* stat, 
    Spkt_t* spkt, ssize_t pkt_len)
{
  uint8_t spkt_len = pkt_len - sizeof(Spkt_t);
  switch (spkt->type)
  {
    case(SMETRICS_UPDATE):
      handle_sympathy_metrics(stat, (Smetrics_t *)spkt->data, spkt_len);
      break;
    case (SYMPATHY_STATS):
      handle_sympathy_stats(stat, spkt->data, spkt_len);
      break;
    default:
      elog(LOG_WARNING, "Got unexpected packet-type: %d\n", spkt->type);
  }
}

static sympathy_node_info_t* add_src(int src)
{
  int sctr = sink.status_ctr;
  if (src == 0) {
    elog(LOG_ERR, "ERROR: Unable to add nodes with address %d!!\n", src);
    return NULL;
  }

  sink.status_srcs[sctr].addr = src;
  inc_mod(&sink.status_ctr, 1, SMAX_SRCS);
	sink.num_srcs = min(sink.num_srcs + 1, SMAX_SRCS);
  sympathy_update_emview(src);
  return &sink.status_srcs[sctr];
}

/* NR Move this to routing layer! */
static
int handle_error_packets(void *pkt, ssize_t len, 
    pd_client_context_t* pdc)
{
  link_pkt_t *hdr = (link_pkt_t *)pkt;
  sympathy_node_info_t* stat;
  sympathy_node_info_t* mystat = find_status_ptr(my_node_id);

  elog(LOG_DEBUG(1), "error from: %d\n", hdr->src.id);

  /* If we dont have an entry for this node, then record it in 
   * the general category for the sink! We record
   * it from the last-hop source, not from the source recorded in the 
   * multihop header! */
	if (!(stat = find_status_ptr(hdr->src.id))) {
    stat = mystat;
  }
  stats_ctr_inc(&stat->errs_rx, stat->errs_rx.ctr + 1, 0, 
  sizeof(stat->errs_rx.ctr), 0);

  /* Increment counter to check for congestion at the sink */
  stats_ctr_inc(&mystat->num_pkts_crc_error, 
      mystat->num_pkts_crc_error.ctr + 1, 0, 
      sizeof(mystat->num_pkts_crc_error.ctr), 0);

  g_free(pkt);
  return EVENT_RENEW;
}

/* Receive a packet from sympathy link status device */
static
void usage(char *name)
{
  buf_t* buf = buf_new();
  bufprintf(buf, "--uses <link> to monitor packets received with errors [default=%s]\n \t-e number metric-pds per epoch [default=%d]\n", 
     DEFAULT_ERRORS_LINK, TRACK_FAILURE_WINDOW_SIZE_DEFAULT);
  misc_print_usage
    (name, "[-U <link> -e <failure-window>]\n", buf->buf);
  buf_free(buf);
  exit(1);
}

static
void sympathy_shutdown(void *data)
{
  elog(LOG_NOTICE, "sympathy-sink shutting down");
  exit(0);
}

static
int sympathy_command(char* cmd, size_t size, void* data)
{
  parser_state_t *ps = misc_parse_init(cmd, MISC_PARSE_COLON_SCHEME);
  int retval = EVENT_RENEW;

  elog(LOG_DEBUG(1), "command: %s\n", cmd);
  /* parse message */
  while (misc_parse_next_kvp(ps) >= 0)
  {
    if( strcmp(ps->key, "ping") == 0 ) {
      elog(LOG_DEBUG(1), "key: %s is ping %d\n", 
          ps->key, atoi(ps->value));
      ping_node(atoi(ps->value));
    }
    else if( strcmp(ps->key, "auto_period") == 0 )
    {
      elog(LOG_DEBUG(1), "key: %s is auto_period %d\n", 
          ps->key, atoi(ps->value));
      sink.sympathy_auto_metric_period = (uint16_t) atoi(ps->value);
      change_auto_period(sink.sympathy_auto_metric_period);
    }
    else {
      elog(LOG_ERR, "Unrecognized key: %s\n", ps->key);
      retval = EVENT_ERROR(EINVAL);
      goto done;
    }
  }
done:
  misc_parse_cleanup(ps);
  return retval;
}

static char* sympathy_usage(void *data)
{
  buf_t buf = {};
  int i;
  bufprintf(&buf, "Sympathy command interface:\n");
  bufprintf(&buf, "  ping=<node-id>             Gather info about a node\n");
  bufprintf(&buf, "  auto_period=<period-in-seconds> Change period of automatic metric floods\n");
  bufprintf(&buf, "                              (pd=0 => stop metric floods)\n");
  bufprintf(&buf, "\nCurrent values:\nnum-apps-registered=%d\tauto_period=%d[secs]\n", sink.num_apps_registered, sink.sympathy_auto_metric_period);
  bufprintf(&buf, "\n**** Data Returned From Pings:\n");
  for (i=0; i < sink.num_srcs; i++) {
    if (sink.status_srcs[i].neighbor_buf->len > 0) {
      bufprintf(&buf, "%s", sink.status_srcs[i].neighbor_buf->buf);
    }
  }
  return buf.buf;
}

static
void update_sympathy_sink(sympathy_node_info_t* stat, 
    sympathy_status_info_t* info, int pkt_type, int update_only_pkt_tx)
{
  sympathy_node_app_info_t* sninfo;
  int actr;

  if ((actr = find_app_type(pkt_type)) < 0)
  {
    elog(LOG_ERR, "Can't find struct for app_type: %d!\n", 
        pkt_type);
    return;
  }

  sninfo = &stat->node_app_info[actr];
  stats_ctr_inc(&sninfo->sink_pkt_tx, info->pkt_tx, 0, sizeof(info->pkt_tx), 0);

  if (!update_only_pkt_tx)
  {
    stats_ctr_inc(&sninfo->sink_pkt_rx, info->pkt_rx, 0, sizeof(info->pkt_rx), 0);
    sninfo->sink_pkt_rx.last_updated = info->time_rx_last_pkt;
    stats_ctr_inc(&sninfo->sink_pkt_expected_rx, info->pkt_expected_rx, 0,  
        sizeof(info->pkt_expected_rx), 0);
    if (strlen(info->component_name) > 0) {
      if (strlen(info->component_name) > SMAX_COMP_NAME_SIZE) {
        elog(LOG_WARNING,"WARN len of component-name: %s(%d) is > max (%d)\n",
            info->component_name, strlen(info->component_name), 
            SMAX_COMP_NAME_SIZE);
        snprintf(sninfo->comp_name, SMAX_COMP_NAME_SIZE, "%s", 
            info->component_name);
      }
      else sprintf(sninfo->comp_name, "%s", info->component_name);
    }
    if ((info->pkt_reception_percent > 0) 
        && (!sninfo->pkt_reception_locked)) {
      sninfo->pkt_reception_percent = info->pkt_reception_percent;
    }
  }
}
static
int handle_good_packets(lu_context_t* lu, link_pkt_t* pkt, ssize_t data_len)
{
  sympathy_node_info_t* stat = find_status_ptr(my_node_id);

  /* Increment counter to check for congestion at the sink */
  stats_ctr_inc(&stat->num_pkts_rx, 
      stat->num_pkts_rx.ctr + 1, 0, 
      sizeof(stat->num_pkts_rx.ctr), 0);

  g_free(pkt);
  return EVENT_RENEW;
}

static
int status_receive(lu_context_t* lu, link_pkt_t* pkt, ssize_t data_len)
{
  sympathy_status_info_t* info = (sympathy_status_info_t *)pkt->data;
  sympathy_node_info_t* stat = NULL;
  sympathy_node_app_info_t* sninfo;
  int ictr;
  int msg_len = data_len;

  elog(LOG_DEBUG(1), "src %d, Pkt type/comp: %d/%d, datalen: %d, msg_len %d\n",
          pkt->src.id, pkt->type, pkt->ext_type, data_len, msg_len);

  if (pkt->src.id == LINK_BROADCAST) {
    if (pkt->type != SSINK_UPDATE) {
      elog(LOG_ERR, "ERROR, Need valid source addr for all pkt-type but %d\n",
        SSINK_UPDATE);
      goto done;
    }
  }

  else {
    /* Find index for original source of packet */
	  if (!(stat = find_status_ptr(pkt->src.id))) {
      if (!(stat = add_src(pkt->src.id))) {
        elog(LOG_ERR, "Can't find struct for node %d\n", pkt->src.id);
        goto done;
      }
    }
	}

  switch(pkt->type) {
    case (SCOMP_ASCII_STATS):
      elog(LOG_DEBUG(2), "pkt_type: %d\n", pkt->type);
      if ((ictr = find_app_type(pkt->type)) < 0)
      {
        elog(LOG_ERR, "Can't find info for app-type %d for src %d!\n", 
            pkt->type, pkt->src.id);
        goto done;
      }
      sninfo = &stat->node_app_info[ictr];
      bufcpy(sninfo->generic_stats_buf, pkt->data, msg_len);
      break;

    case (SSINK_UPDATE):

      /* Treat this case differently - update all nodes */
      if (pkt->src.id == LINK_BROADCAST) {
        int i;
        for (i = 0; i < sink.num_srcs; i++) {
          update_sympathy_sink(&sink.status_srcs[i], info, 
              pkt->ext_type, 1);
        }
        goto done;
      }

      else {
        update_sympathy_sink(stat, info, pkt->ext_type, 0);
      }
      break;

    case (SNETWORK_PKT):
      elog(LOG_DEBUG(2), "have network packet!\n");
      /* First check for duplicate sequence number */
      if ((((pkt->ext_type >= SNON_ROUTING_PKT) 
          && (!duplicate_seqno(stat, pkt->seqno))) 
          || pkt->ext_type < SNON_ROUTING_PKT) 

        /* Last case is special case - when we get a neighbor-advert
         * from my_node_id - with seqno of -1, then this is nto a packet
         * the sink is sending, instead this is a neighbor-update 
         * received from a status device on the sink */
          && (!((stat->addr == my_node_id) 
              && (pkt->seqno == 255)))) {

          stats_ctr_inc(&stat->packet, stat->packet.ctr+1, 0, 
            sizeof(stat->packet.ctr), 0);
          stats_ctr_inc(&stat->tos_packets[pkt->ext_type], 
            stat->tos_packets[pkt->ext_type].ctr+1, 0, 
            sizeof(stat->tos_packets[pkt->ext_type].ctr), 0);
      }

      /* We don't need a separate case for beacon and non-sympathy 
       * app packets because we just need a count of those */
      switch (pkt->ext_type)
      {
        case(SNEIGHBOR_ADVERT): /* neighbor data */
          {
            sympathy_neighbor_t* n = (sympathy_neighbor_t *) pkt->data;
            insert_neighbors(n->neighbor, n->num_neighbors, stat);
          }
          break;
        case(SPATH_ADVERT): /* route data */
          handle_route_info(stat, (Snext_hop_t *) (pkt->data));
          break;
        case(SYMPATHY_PKT): /* Multihop app layer */
          handle_sympathy_data(stat, (Spkt_t*)pkt->data, data_len);
          break;
        default:
          elog(LOG_DEBUG(3), "Ignoring packet w ext-type: %d\n", pkt->ext_type);
          break;
      }
    default:
      elog(LOG_DEBUG(3), "Ignoring pkt type %d\n", pkt->type);
      break;
  }

done:
  g_free(pkt);
  return EVENT_RENEW;
}

int main(int argc, char **argv)
{
  char *epoch; 
#ifdef USE_BAYES
  char *emstar_root;
#endif
  emrun_opts_t emrun_opts = {
    shutdown: sympathy_shutdown,
    silent: 1
  };

  cmd_dev_opts_t copts = {
    device: {
		},
    command: sympathy_command,
    usage: sympathy_usage,
	};

  /* Open command device */
  copts.device.devname = sympathy_device(SCMD_DEVICE);
  if (g_command_dev(&copts, &sink.cmd) < 0) {
    elog(LOG_ERR, "Unable to open command-device %s: %m\n", copts.device.devname);
  }
  
  sink.sympathy_auto_metric_period = METRICS_PERIOD_MSEC/1000;

  /* generic initialization common to all programs */
  misc_init(&argc, argv, CVSTAG);
  gettimeofday(&start_time, NULL);

  initialize_devices();

  /* Link-user to pass status between components on the sink */
  {
    lu_opts_t lopts = {
      opts: {
        name: SYMPATHY_STATS_DEVICE,
        pkt_type: PKT_TYPE_ALL
      },
      receive: status_receive
    };
    if (lu_open(&lopts, &sink.sympathy_link) < 0)
    {
      elog(LOG_ERR, "UNable to open status device: %s: %m!\n",
        lopts.opts.name);
      usage(argv[0]);
    }
  }

  {
    lu_opts_t lopts = {
      opts: {
        pkt_type: PKT_TYPE_ALL
      },
      receive: handle_good_packets,
    };

    pd_client_opts_t pd_opts = {
      data: &sink,
      receive: handle_error_packets,
    };

    /* Get the --uses name */
    if (!(lopts.opts.name = link_parse_uses(&argc, argv, NULL))) {
      lopts.opts.name = DEFAULT_ERRORS_LINK;
    }

    /* Open regular link */
    if (lu_open(&lopts, NULL) < 0)
    {
      elog(LOG_ERR, "UNable to open link device %s: %m!\n",
        lopts.opts.name);
      usage(argv[0]);
    }

    /* Open errors link */
    pd_opts.devname = link_name_s(lopts.opts.name, "errors");
    if (pd_client_open(&pd_opts, &sink.errors_pc) < 0) {
      elog(LOG_WARNING, "Unable to open pd_errors dev %s: %m", pd_opts.devname);
    }
  }

  if (misc_parse_out_option(&argc, argv, "help", 'h')) usage(argv[0]);

  if ((epoch = misc_parse_out_option(&argc, argv, "epoch", 'e'))) {
    TRACK_FAILURE_WINDOW_SIZE = atoi(epoch);
  }

#ifdef USE_BAYES
  if ((emstar_root = misc_parse_out_option(&argc, argv, "root", 'r'))) {
    elog(LOG_DEBUG(1), "emstar root : %s \n", emstar_root);
    /* bayes engine initialization */
    initialize_bayes(emstar_root);
  }
#endif

  if (misc_args_remain(&argc, argv)) {
    elog(LOG_CRIT, "Additional unparsed arguments!");
    usage(argv[0]);
  }

  g_timer_add(sink.sympathy_auto_metric_period * 1000, expect_smetric,
				NULL, NULL, &sink.auto_pd_timer);
  g_timer_add(METRICS_PERIOD_MSEC, call_track_lost_nodes, 
				NULL, NULL, NULL);
  g_timer_add(10 * 1000, check_for_failures, 
				NULL, NULL, NULL);

  /* Emvew initialization */
  sympathy_init_emview();
  
  /* Initialize our structs */
  {
    sympathy_node_info_t* stat;
    sympathy_node_app_info_t* snode;
    int i, j;
    for (i = 0; i < SMAX_SRCS; i++) {
      stat = &sink.status_srcs[i];
      stat->fault_buf = buf_new();
      stat->neighbor_buf = buf_new();
      for (j = 0; j < SCOMP_LAST; j++) {
        snode = &stat->node_app_info[j];
        snode->fault_buf = buf_new();
        snode->generic_stats_buf = buf_new();
        snode->pkt_reception_percent = SMSG_RECEPTION_THRESH_DEFAULT;
      }
    }
  }

  /* Add the sink to the srcs list! */
  add_src(my_node_id);

  emrun_init(&emrun_opts); 
  g_main();
  elog_g(LOG_CRIT, "event loop terminated abnormally!");
  return 0;
}




See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  bayes/
    lib/
      libnetica.a
    src/
      Netica.h
      NeticaEx.c
      NeticaEx.h
    bayes_classifier.c
    nodes.states
    training.prob
  include/
    sympathy.h
    sympathy_app.h
    sympathy_dev.h
    sympathy_routing.h
  libsympathy/
    sympathy_routing.c
  scripts/
    get_recent_status.pl
    get_throughput.pl
  testtabs/
    essjr.run
    essjr_sensor.run
    rr_mote.run
    rr_mote_ceiling.run
    sympathy.run
    sympathy_ceiling.sim
    sympathy_sim.sim
    sympathy_sim_small.sim
    sympathy_snoop.run
    sympathy_test.sim
  BUILD
  data.xml
  jr_loc_small
  sars.pl
  sympathy_analyze.c
  sympathy_battery.c
  sympathy_device.c
  sympathy_doc
  sympathy_emview.c
  sympathy_events.c
  sympathy_main.c
  sympathy_print_stats.c
  sympathy_status.c