Code Search for Developers
 
 
  

sympathy_events.c from EmStar at Krugle


Show sympathy_events.c syntax highlighted

/* ex: set tabstop=2 expandtab shiftwidth=2 softtabstop=2: */
/*
 *
 * Copyright (c) 2003 The Regents of the University of California.  All 
 * rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Neither the name of the University nor the names of its
 *   contributors may be used to endorse or promote products derived
 *   from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */

 /*
  *
  * Author: Nithya Ramanthan
  *
  */

#include "sympathy_app.h"
#include <tos-contrib/multihop/tos/lib/MultihopTypes.h>
#include <tos-contrib/include/protocols.h>

#define NUM_RB_ELEMS RING_BUF_NUM_ELEMS

uint8_t ssize = sizeof(Spkt_t);
uint8_t rsize = sizeof(RBelem_t);
uint8_t seqno = 0;

sevent_ctx_t ctx = {
    sevent_req_period_secs: EVENT_REQUEST_PERIOD_MSEC/1000,
};

/*** STATIC FUNCTIONS ***/

static int find_event_info(if_id_t src)
{
  int i;
  for (i = 0; i < ctx.num_srcs; i++)
  {
    if (ctx.events[i].addr == src) return i;
  }
  return -1;
}

static
void print_rb_elem(buf_t* tbuf, RBelem_t* rb)
{
  bufprintf(tbuf, "%d   %s   %d    ", rb->time, 
    (rb->Sevent.type == NEIGHBOR_ADDED ? "add-neigh":
	    rb->Sevent.type == NEIGHBOR_DROPPED ? "remv-neigh":
	      rb->Sevent.type == CHANGE_IN_INGRESS ? "change-ingress": 
	        rb->Sevent.type == CHANGE_IN_EGRESS ? "change-egress": 
	          rb->Sevent.type == NEXT_HOP_CHANGED ? "nh-changed": 
	            rb->Sevent.type == NEXT_HOP_QUALITY_CHANGED ? "nh-qual-changed":
	              rb->Sevent.type == NEXT_HOP_NOT_NEIGHBOR ? "nh-not-neighbor":
	 ""),
      rb->Sevent.address);

  if ((rb->Sevent.type == NEIGHBOR_ADDED) 
    || (rb->Sevent.type == NEIGHBOR_DROPPED)) 
  { 
    new_neighbor_t* n = (new_neighbor_t *) &(rb->Sevent.data[0]);
    bufprintf(tbuf, "%d\t %d\t %d\n", n->ingress, n->egress, 
        n->num_current_neighbors);
  }
  else if ((rb->Sevent.type == CHANGE_IN_INGRESS)  
     || (rb->Sevent.type == CHANGE_IN_EGRESS)) 
  { 
    lq_node_t* n = (lq_node_t *) &(rb->Sevent.data[0]);
    bufprintf(tbuf, "%d    %d\n", n->quality, n->change);
  }

  else if ((rb->Sevent.type == NEXT_HOP_CHANGED)  
    || (rb->Sevent.type == NEXT_HOP_QUALITY_CHANGED) 
    || (rb->Sevent.type == NEXT_HOP_NOT_NEIGHBOR)) 
  { 
    nh_node_t* n = (nh_node_t *) &(rb->Sevent.data[0]);
    bufprintf(tbuf, "%d    %d\n", n->quality, n->sink);
  }
  else bufprintf(tbuf, "\n"); 
}

static
int get_minutes_since_event(struct timeval* time)
{
  struct timeval t;
  gettimeofday(&t, NULL);
  return ((int) (t.tv_sec - time->tv_sec) / 60);
}

static
void print_events(buf_t* buf, sevent_rb_t* stat)
{
  int i, minutes = get_minutes_since_event(&stat->event_time);

  bufprintf(buf, "\n**************\nNode %d, number-elems: %d\n", 
      stat->addr, stat->rb_num_elems);
  bufprintf(buf, "Packets-rx from node: %d\n", 
      stat->pkts_rx_from_node);
  bufprintf(buf, "Number requests tx: %d\n", ctx.num_sevent_reqs);
  bufprintf(buf, "Minutes since last pkt-rx: %d mins\n", minutes);
  bufprintf(buf, "\nTimestamp of last pkt rx: %d mins\n\n", 
      stat->last_event_time_mins);

  if (stat->rb_num_elems > 0)
  {
    bufprintf(buf, "Time\t Type\t\t Addr\t Quality\t Change/Sink\n");
    bufprintf(buf, "Time\t Type\t\t Addr\t Ingress\t Egress\t Num-neighbors\n");
    for (i = 0; i < stat->rb_num_elems; i++)
    {
      print_rb_elem(buf, &stat->rb[i]);
    }
  }
}

int sympathy_print_events(status_context_t *info, buf_t *buf)
{
  int i;
	for (i = 0; i < ctx.num_srcs; i++)
	{
    print_events(buf, &ctx.events[i]);
	}
  return STATUS_MSG_COMPLETE;
}
static int abs_subtract(int a, int b)
{
  if (a > b) return a - b;
  return b - a;
}

static RBelem_t* get_last_element(sevent_rb_t* stat)
{
	/* Get a pointer to the last-element. If the head-ptr is 0, then
	 * it is either because the rb is empty or because it has wrapped
	 * around */
  if (stat->rb_head == 0)
  {
	  if (stat->rb_full) return(&stat->rb[NUM_RB_ELEMS - 1]);
    else return NULL;
  }
	/* No wrap-around has ocurred yet, but the buffer isn't full */
  else return(&stat->rb[stat->rb_head - 1]);
	return NULL;
}

/**** Public functions ****/
static
int add_event_info(if_id_t node)
{
  int sctr = ctx.num_srcs;
  if (node == 0) return -1;

  ctx.events[sctr].addr = node;
  elog(LOG_DEBUG(1), "Inserting node %d in slot %d\n", node, sctr);
  inc_mod(&ctx.node_ctr, 1, SMAX_SRCS);
  ctx.num_srcs = min(ctx.num_srcs + 1, SMAX_SRCS);
  return sctr;
}

static void update_node(sevent_rb_t* stat)
{
  sympathy_status_info_t info = {
    pkt_tx:ctx.num_sevent_reqs,
    pkt_rx:stat->pkts_rx_from_node,
    pkt_expected_rx:ctx.num_sevent_reqs,
    time_rx_last_pkt: stat->event_time,
  };
  link_pkt_t pkt = {
    dst: {
      id: LINK_BROADCAST
    },
    src: {
      id: stat->addr
    },
    ext_type: SCOMP_STATS6,
    type: SSINK_UPDATE,
  };
  buf_t* buf = buf_new();

  bufcpy(buf, &pkt, sizeof(link_pkt_t));
  bufcpy(buf, &info, sizeof(sympathy_status_info_t));

  elog(LOG_DEBUG(1), "updating for node %d\n", stat->addr);
  if (lu_send(ctx.update_lu, (link_pkt_t *)buf->buf, buf->len - sizeof(link_pkt_t)) < 0)
  {
     elog(LOG_ERR, "Unable to send pkt: %m!\n");
  }
  buf_free(buf);
}

/* Returns the number of elements copied over */
uint16_t handle_event_data(if_id_t node_addr, Spkt_t* mpkt, uint8_t len)
{
  uint8_t num_copied = 0;
	uint8_t start_index = 0; /* start copying from pkt's rb */
	uint8_t num_elem_in_pkt = (len - ssize) / rsize;
	RBelem_t* last_elem, *tmpE = NULL;
	int diff, sctr, last_t = 0;
  sevent_rb_t* stat;

  elog(LOG_DEBUG(1), "from node %d, packet %dB\n", node_addr, len);
  if ((sctr = find_event_info(node_addr)) < 0)
  {
    sctr = add_event_info(node_addr);
  }
  stat = &ctx.events[sctr];
  if (!stat) return 0;
  gettimeofday(&stat->event_time, NULL);
  stat->pkts_rx_from_node++;

	if ((last_elem = get_last_element(stat))) last_t = last_elem->time;
	while (start_index < num_elem_in_pkt)
	{
		tmpE = (RBelem_t *)&mpkt->data[start_index*rsize];

		/* Only take the difference if this is NOT a wrap-around case */
		if (last_t >= tmpE->time) diff = 0;
		else diff = abs_subtract(tmpE->time, last_t);

		if (diff > TIME_DIFF_THRESH) 
		{
			elog(LOG_DEBUG(1), "WARN, skip elem w/time %d, its %d > than last-time (%d)\n",
	       tmpE->time, diff, last_t);
		} 

		/* Insert elements if ring-buffer is empty, or if the time
		 * of the element is > than the last-time - after passing the
		 * "garbage check" */
		else if (tmpE->time >= last_t)
		{
			memcpy(&stat->rb[stat->rb_head], &mpkt->data[rsize * start_index], rsize);
			inc_mod(&(stat->rb_head), 1, NUM_RB_ELEMS);
      if (stat->rb_head == 0) stat->rb_full = 1;
			last_t = tmpE->time;
      stat->last_event_time_mins = tmpE->time / 60;
			num_copied++;
		}
    start_index++;
	}

  elog(LOG_DEBUG(1), "copying %d elements\n", num_copied);
	if (num_copied > 0)  
	{
		stat->rb_num_elems = min16(stat->rb_num_elems + num_copied, NUM_RB_ELEMS);
    update_node(stat);
	}
  return (num_copied);
}

static
int sevent_send_pkt(Spkt_t* pkt, void* data, ssize_t data_len)
{
  buf_t* buf = buf_new();
  int retval = 0;
  link_pkt_t hdr = {
    dst: {
      id: LINK_BROADCAST,
    },
    ext_type: MULTIHOP_SYMPATHY,
    type: MULTIHOP_SYMPATHY
  };

  bufcpy(buf, &hdr, sizeof(hdr));
  bufcpy(buf, pkt, sizeof(Spkt_t));
  if (data_len > 0) bufcpy(buf, data, data_len);
  if ((lu_send(ctx.mh_link, (link_pkt_t*)buf->buf, buf->len - sizeof(link_pkt_t))) < 0)
  {
    elog(LOG_ERR, "Unable to send packet of type: %d!\n", pkt->type);
    retval = -1;
    goto done;
  }
  elog(LOG_DEBUG(1), "will send packet with type: %d, len: %d\n", pkt->type,
    buf->len - sizeof(link_pkt_t));   
done:
  buf_free(buf);
  return retval;
}

static int send_event_request(void* data, int interval, g_event_t* event)
{
  Spkt_t pkt = {
    type: SREQUEST_EVENTS 
  };   
  Srequest_t req = {
    seqno: seqno++,
  };
  int i;
  sevent_rb_t* stat;

  elog(LOG_DEBUG(1), "num-srcs: %d, sending seqno: %d\n", 
      ctx.num_srcs, req.seqno);
  if (sevent_send_pkt(&pkt, (void *)&req, sizeof(Srequest_t)) == 0) 
  {
    ctx.num_sevent_reqs++;
    for (i = 0; i < ctx.num_srcs; i++)
    {
      stat = &ctx.events[i];
      elog(LOG_DEBUG(1), "%d: calling update for node %d\n", i, stat->addr);
      update_node(stat);
    }
    elog(LOG_DEBUG(1),"Sending sympathy request!\n");
  }
  else
  {
    elog(LOG_ERR, "Unable to send sympathy request!\n");
  }
  return EVENT_RENEW;
}

void change_sreq_period(int period_msecs)
{
  elog(LOG_DEBUG(1), "New period_msecs = %d\n", period_msecs);
  if (ctx.sreq_timer) 
  {
    if (period_msecs == 0) g_event_destroy(ctx.sreq_timer);
    g_timer_resched(ctx.sreq_timer, period_msecs);
    return;
  }
  elog(LOG_DEBUG(1), "Adding new timer!\n");
  if (period_msecs > 0) g_timer_add(period_msecs, send_event_request, NULL, NULL, &ctx.sreq_timer);
}

static
int sevent_command(char* cmd, size_t size, void* data)
{
  parser_state_t *ps = misc_parse_init(cmd, MISC_PARSE_COLON_SCHEME);

  elog(LOG_DEBUG(1), "command: %s\n", cmd);
  /* parse message */
  while (misc_parse_next_kvp(ps) >= 0)
  {
    if(strcmp(ps->key, "sreq_period") == 0 )
    {
      elog(LOG_DEBUG(1), "key: %s is sreq-period so new period is %d\n", 
          ps->key, atoi(ps->value));
      ctx.sevent_req_period_secs = atoi(ps->value);
      change_sreq_period(ctx.sevent_req_period_secs * 1000);
    }
    else elog(LOG_ERR, "Unrecognized key: %s\n", ps->key);
  }
  misc_parse_cleanup(ps);
  return EVENT_RENEW;
}

char* sevent_usage(void *data)
{
  buf_t buf = {};
  bufprintf(&buf, "To change period of sympathy-requests write (pd=0 => stopsending sympathy requests):\n" \
      "\t\t sreq_period=<int>[seconds] \n");
  bufprintf(&buf, "Current value sreq_period=%d[secs]\n", ctx.sevent_req_period_secs);
  return buf.buf;
}

static
int sevent_rcv_pkt(lu_context_t *link, link_pkt_t *hdr, ssize_t data_len)
{
  buf_t* buf2 = buf_new();
  multihop_hdr_t* mhdr = (multihop_hdr_t *)hdr->data;
  Saddr_t src = mhdr->src;
  Spkt_t* pkt;
  void* tpkt;
  ssize_t pkt_len = data_len - sizeof(multihop_hdr_t);

  elog(LOG_DEBUG(1),"spkt-len: %d, ext_type: %d, hdr-type: %d len = %d, src = %d\n", 
      pkt_len, hdr->ext_type, hdr->type, data_len, src);
  misc_hexdump_to_buf(buf2, (char *)pkt, pkt_len, "NR");
  elog(LOG_DEBUG(1), "DATA SPKT: %s\n", buf2->buf);

  /* NR get correct type info! */
  if (hdr->ext_type == 4) 
  {
    tpkt = mhdr + 1;
    pkt = (Spkt_t *)tpkt;
    if (mhdr->type == MULTIHOP_SYMPATHY)
    {
      if (pkt->type == SEVENT_RESPONSE) handle_event_data(src, pkt, pkt_len);
    }
  }
  buf_free(buf2);
  return EVENT_RENEW;
}

void usage(char *name)
{
   misc_print_usage
      (name, "-U <link> -W<link>",
            "  --uses <link>: specify the link to send sympathy-requests\n  --watch <link>: specify the link to listen to packets on");
      exit(1);
} 

static
void sevent_shutdown(void *data)
{
    elog(LOG_NOTICE, "sympathy-sevent shutting down");
      exit(0);
}

static
int status_receive(lu_context_t* lu, link_pkt_t* pkt, ssize_t data_len)
{
  elog(LOG_DEBUG(1), "src %d, Pkt w type/comp: %d/%d, datalen: %d\n",
       pkt->src.id, pkt->type, pkt->ext_type, data_len);
  /* NR todo eventually, need to decode packets that we receive of a
   * certain comp-stats type! should be specified on the command-line!*/

  g_free(pkt);
  return EVENT_RENEW;
}

int main(int argc, char** argv)
{
  char devname[100];
  emrun_opts_t emrun_opts = {
    shutdown: sevent_shutdown,
    silent: 1
  };

	status_dev_opts_t sopts = {
    device: {
		},
	};

  cmd_dev_opts_t copts = {
    device: {
		},
    command: sevent_command,
    usage: sevent_usage,
	};

  lu_opts_t opts = {
    opts: {
       pkt_type: MULTIHOP_SYMPATHY
    },
  };

  lu_opts_t opts_status = {
    opts: {
      name: SYMPATHY_STATS_DEVICE
    },
    receive: status_receive
  };

  misc_init(&argc, argv, CVSTAG);

  /* Open command device */
  sprintf(devname, "%s/%s", SSTATUS_APP_BASE, SEVENTS_CMD);
  copts.device.devname = sim_path(devname);
  if (g_command_dev(&copts, NULL) < 0) {
    elog(LOG_ERR, "Unable to open command-device: %s\n", copts.device.devname);
  }

  if (lu_open(&opts_status, &ctx.update_lu) < 0) {
    elog(LOG_DEBUG(1), "Unable to open link: %s:%m\n", 
        opts_status.opts.name);
  }

	/* Open status-devices */
  sprintf(devname, "%s/%s", SSTATUS_APP_BASE,SEVENTS_STATUS);
  sopts.device.devname = sim_path(devname);
	sopts.printable = sympathy_print_events;
  if (g_status_dev(&sopts, NULL) < 0) {
		elog(LOG_ERR, "Unable to open status device: %s\n", sopts.device.devname);
	}

  /* Open link-device */
  if (!(opts.opts.name = link_parse_uses(&argc, argv, NULL))) {
    elog(LOG_WARNING, "WARNING: WILL NOT be sending sympathy-requests because didn't specify -U!\n");
  }
  else {
     elog(LOG_DEBUG(1), "link-name U: %s\n", opts.opts.name);
     if (lu_open(&opts, &ctx.mh_link) < 0) {
       elog(LOG_CRIT, "Unable to open link %s: %m", link_name(&(opts.opts), NULL));
      usage(argv[0]);
     }
  }

  opts.opts.pkt_type = PKT_TYPE_TOS;
  opts.receive = sevent_rcv_pkt;
  if (!(opts.opts.name = misc_parse_out_option(&argc, argv, "watch", 'W'))) {
    elog(LOG_CRIT, "Please specify --watch!");
    usage(argv[0]);
  } 
  else {
     elog(LOG_DEBUG(1), "link-name W: %s\n", opts.opts.name);
     if (lu_open(&opts, NULL) < 0) {
       elog(LOG_CRIT, "Unable to open link %s: %m", link_name(&(opts.opts), NULL));
       exit(0);
     }
  }

  change_sreq_period(ctx.sevent_req_period_secs * 1000);
  if (NUM_RB_ELEMS == 0)
  {
	  elog(LOG_ERR, "num-rb-elems (%d) must be > 0!!\n", NUM_RB_ELEMS);
    exit(1);
  }

  emrun_init(&emrun_opts);
  g_main();
  return 0;
}




See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  bayes/
    lib/
      libnetica.a
    src/
      Netica.h
      NeticaEx.c
      NeticaEx.h
    bayes_classifier.c
    nodes.states
    training.prob
  include/
    sympathy.h
    sympathy_app.h
    sympathy_dev.h
    sympathy_routing.h
  libsympathy/
    sympathy_routing.c
  scripts/
    get_recent_status.pl
    get_throughput.pl
  testtabs/
    essjr.run
    essjr_sensor.run
    rr_mote.run
    rr_mote_ceiling.run
    sympathy.run
    sympathy_ceiling.sim
    sympathy_sim.sim
    sympathy_sim_small.sim
    sympathy_snoop.run
    sympathy_test.sim
  BUILD
  data.xml
  jr_loc_small
  sars.pl
  sympathy_analyze.c
  sympathy_battery.c
  sympathy_device.c
  sympathy_doc
  sympathy_emview.c
  sympathy_events.c
  sympathy_main.c
  sympathy_print_stats.c
  sympathy_status.c