Code Search for Developers
 
 
  

wl_main.c from EmStar at Krugle


Show wl_main.c syntax highlighted

/*
 * Copyright (c) 2004 The Regents of the University of California.  All 
 * rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Neither the name of the University nor the names of its
 *   contributors may be used to endorse or promote products derived
 *   from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */

#include "wl_i.h"
#include <sim/libsim.h>
#include <emrun/emrun.h>
#include <link/link.h>

/*
 * This callback is called every time the simulator framework tells us
 * that a new simulator configuration is now active.  We create new
 * simulated "motenics" if we learn that the number of nodes has
 * increased. 
 */
static void wl_new_config(sim_config_t *conf, void *data)
{
  int i;
  wl_state_t *wl = (wl_state_t *)data;

  elog(LOG_INFO, "got new simulator configuration");

  for (i = 1; i <= conf->num_nodes; i++) {
    if (wl_node_lookup(&(wl->nodes), i) == NULL)
      wl_node_register(wl, i);
  }
}


void wl_print_stats(wl_stats_t *stats, ssync_stats_t *stat, int node, float time)
{
  fprintf(stats->stat_logfile, "%d %f %d %d %d %d %d %d %d %d %d %d %d %d %d %d\n",
	  
	  node,
	  time,

	  stat->counts[0], stat->counts[1], stat->counts[2], 
	  stat->counts[3], stat->counts[4], stat->counts[5], 

	  stat->bytes[0], stat->bytes[1], stat->bytes[2], 
	  stat->bytes[3], stat->bytes[4], stat->bytes[5], 

	  stats->total_pushed,
	  stats->total_pushed_bytes
	  );
}


void wl_totalize_node_stats(wl_node_t *n, wl_stats_t *stats, float time)
{
  wl_print_stats(stats, &(stats->totals), 0, time);

  /* totalize */
  int add = stats->totals.bytes[1];
  int i;
  for (i=2; i<SSYNC_STAT_FIELD_COUNT; i++) 
    add = (stats->totals.bytes[i] += add);
  wl_print_stats(stats, &(stats->totals), -1, time);
}

  
int wl_do_node_stats(wl_node_t *n, wl_stats_t *stats, float time)
{
  if (stats->prefix) {
    wl_node_become(n);
    buf_t *buf = 
      g_status_client_read_once(ssync_path(stats->prefix, SSYNC_SUFFIX_STATS),
				1, sizeof(ssync_stats_t));
    wl_node_unbecome();
    if (buf) 
      memmove(&(stats->latest), buf->buf, sizeof(ssync_stats_t));
    else {
      elog(LOG_DEBUG(0), "Unable to get stats for prefix %s, node %d", 
	   stats->prefix, n->id);
      return -1;
    }
  }

  /* add these stats into totals */
  int i;
  for (i=0; i<SSYNC_STAT_FIELD_COUNT; i++) {
    stats->totals.bytes[i] += stats->latest.bytes[i];
    stats->totals.counts[i] += stats->latest.counts[i];
  }

  /* print it */
  wl_print_stats(stats, &(stats->latest), n->id, time);
  return 0;
}

void wl_dump_traf(wl_state_t *wl)
{
  wl_node_t *n;
  
  uint64_t uptime = misc_time_elapsed_sys();

  for (n=wl_nodes_top(&(wl->nodes)); n; n=wl_nodes_next(n)) {
    wl_node_become(n);
    buf_t *buf = g_status_client_read_once(link_name_s(wl->base_link, LINK_STATUS_SUBDEV), 
					   1, sizeof(link_status_t));
    if (buf) {
      link_status_t *stat = (link_status_t *)buf->buf;
      fprintf(wl->traf_logfile, "%.6f %d  %d  %d  %d  %d  %d  %d  %d\n",
	      uptime / MILLION_F,
	      n->id, stat->MTU, 
	      stat->packets_rx, stat->bytes_rx, 
	      stat->packets_tx, stat->bytes_tx,
	      stat->errors_tx, stat->errors_rx);
    }
    else 
      elog(LOG_DEBUG(0), "Unable to get status for link %s, node %d %s: %m", 
	   wl->base_link, n->id, link_name_s(wl->base_link, LINK_STATUS_SUBDEV));
    wl_node_unbecome();
  }      
}


int wl_do_stats(void *data, int interval, g_event_t *ev)
{
  wl_state_t *wl = (wl_state_t *)data;
  wl_node_t *n;
  
  float time = misc_time_elapsed_sys() / MILLION_F;

  memset(&(wl->main_stats.totals), 0, sizeof(wl->main_stats.totals));
  memset(&(wl->csync_stats.totals), 0, sizeof(wl->csync_stats.totals));
  memset(&(wl->comb_stats.totals), 0, sizeof(wl->comb_stats.totals));

  for (n=wl_nodes_top(&(wl->nodes)); n; n=wl_nodes_next(n)) {
    if (wl_do_node_stats(n, &(wl->main_stats), time) < 0) continue;
    if (wl->do_csync_stats) {
      if (wl_do_node_stats(n, &(wl->csync_stats), time) < 0) continue;

      /* sum across */
      int i;
      for (i=0; i<SSYNC_STAT_FIELD_COUNT; i++) {
	wl->comb_stats.latest.bytes[i] = 
	  wl->main_stats.latest.bytes[i] + wl->csync_stats.latest.bytes[i];
	wl->comb_stats.latest.counts[i] = 
	  wl->main_stats.latest.counts[i] + wl->csync_stats.latest.counts[i];
      }

      wl->comb_stats.total_pushed = wl->main_stats.total_pushed;
      wl->comb_stats.total_pushed_bytes = wl->main_stats.total_pushed_bytes;

      wl_do_node_stats(n, &(wl->comb_stats), time);		       
    }
  }      
  
  wl_totalize_node_stats(n, &(wl->main_stats), time);		       
  if (wl->do_csync_stats) {
    wl_totalize_node_stats(n, &(wl->csync_stats), time);		       
    wl_totalize_node_stats(n, &(wl->comb_stats), time);		       
  }

  wl_dump_traf(wl);
 
  return EVENT_RENEW;
}


void wl_stats_init(wl_stats_t *stats, char *prefix, char *log_name, char *name)
{
  char buf[256];
  sprintf(buf, "%s%s-stat", log_name, name);  
  stats->stat_logfile = fopen(buf, "w");
  stats->prefix = prefix;
  if (stats->stat_logfile == NULL) {
    elog(LOG_CRIT, "Unable to open file '%s' for writing: %m", buf);
    exit(1);
  }
  fprintf(stats->stat_logfile, 
	  "#h id time "
	  "actc txc retxc nackc refrc packc "
	  "act tx retx nack refr pack "
	  "tpush tbytes\n");  
}


void wl_shutdown(void *data)
{
  wl_state_t *wl = (wl_state_t *)data;

  elog(LOG_NOTICE, "Workload generator service closing logfiles");
  
  wl_do_stats(wl, 0, NULL);
  fclose(wl->main_stats.stat_logfile);
  if (wl->do_csync_stats) {
    fclose(wl->csync_stats.stat_logfile);
    fclose(wl->comb_stats.stat_logfile);
  }
  if (wl->traf_logfile) 
    fclose(wl->traf_logfile);

  if (!wl->stats_only) {
    wl_scan_for_complete(wl, 1);
    fclose(wl->logfile);
    fclose(wl->cdf_logfile);
  }

  elog(LOG_NOTICE, "Workload generator service shutting down...");
  
  /* halt simulation */
  if (wl->halt_sim)
    printf_to_file(EMSIM_EMRUN_COMMAND_DEVNAME, "halt workload complete!");

  exit(0);
}


static wl_state_t state = {
  interval: DEFAULT_INTERVAL,
  max_keys: DEFAULT_MAX_KEYS,
  base_link: "mote0",
  prefix: SSYNC_DEFAULT_PREFIX,
  typename: WL_TYPENAME,
  hops: DEFAULT_HOPS,
  refresh: DEFAULT_REFRESH,
  entry_size: DEFAULT_VALUE_LEN,
};


void wl_force_shutdown()
{
  wl_shutdown(&state);
  exit(0);
}

/*
 *  control device
 */

static
int wl_control_usage(status_context_t *info, buf_t *buf)
{
  bufprintf(buf, "Workload control device.. echo halt to stop workload\n");
  return STATUS_MSG_COMPLETE;
}

static
int wl_control_cmd(status_context_t *ctx, char *command, size_t buf_size)
{
  if (strncmp(command, "halt", 4) == 0) 
    wl_force_shutdown();
  return buf_size;
}


int main(int argc, char **argv)
{
  char *log_name = NULL;
  char *load = NULL;
  char *base_link;
  char *prefix;
  char *typename;

  /* emrun will trigger this callback to run on shutdown */
  emrun_opts_t emrun_opts = {
    shutdown: wl_shutdown,  /* this function implements our shutdown handler */
    data: &state            /* this pointer will be passed to our shutdown handler */
  };

  /* generic init */
  misc_init(&argc, argv, CVSTAG);

  /* init state */
  state.usage_buf = buf_new();

  /* parse command-line arguments */

  bufprintf(state.usage_buf, 
	    "  --logfile <path>:  [REQUIRED] specifies the path for the logfile\n"
	    "  --interval <ms>:   average interval of new data generation, per node, default %d\n"
	    "  --hops <count>:    number of hops to send, default %d\n"
	    "  --max-keys <max>:  maximum number of keys pushed into system, default %d\n"
	    "  --refresh <ms>:    refresh to use for items \n"
	    "  --halt:            halt sim after workload completes \n"
	    "  --link <link>:     base link to use for traffic counting (default '%s')\n"
	    "  --prefix <prefix>: base prefix to use for ssync access (default '%s')\n"
	    "  --delay <secs startup delay>\n"
	    "  --csync-stats:     record clustersync stats as well\n"
	    "  --stats-only:      no plugin.. just record usage stats\n"
	    "  --local:           hook to a local device not a simulator\n"
	    "  --load <plugin>:  [REQUIRED] specifies the workload module to use:\n",
	    DEFAULT_INTERVAL, DEFAULT_HOPS, DEFAULT_MAX_KEYS, state.base_link, state.prefix);

  /* parse standard options */
  log_name = misc_parse_out_option(&argc, argv, "logfile", 0);  
  misc_parse_option_as_int(&argc, argv, "interval", 0, &(state.interval));
  misc_parse_option_as_int(&argc, argv, "max-keys", 0, &(state.max_keys));
  misc_parse_option_as_int(&argc, argv, "entry-size", 0, &(state.entry_size));
  misc_parse_option_as_int(&argc, argv, "refresh", 0, &(state.refresh));
  misc_parse_option_as_int(&argc, argv, "delay", 0, &(state.delay));
  misc_parse_option_as_int(&argc, argv, "hops", 0, &(state.hops));
  misc_parse_option_as_int(&argc, argv, "iface", 0, &(state.iface));
  state.do_csync_stats = misc_parse_out_switch(&argc, argv, "csync-stats", 0);
  state.stats_only = misc_parse_out_switch(&argc, argv, "stats-only", 0);
  state.local = misc_parse_out_switch(&argc, argv, "local", 0);
  state.halt_sim = 1;
  if ((base_link = misc_parse_out_option(&argc, argv, "link", 0)))
    state.base_link = base_link;
  if ((prefix = misc_parse_out_option(&argc, argv, "prefix", 0)))
    state.prefix = prefix;
  if ((typename = misc_parse_out_option(&argc, argv, "typename", 0)))
    state.typename = typename;

  if (!state.stats_only) {
    /* select the workload */
    load = misc_parse_out_option(&argc, argv, "load", 0);  
    
    /* add new plugin inits here */
    wl_plugin_random_rate(&state, load, &argc, argv);
    wl_plugin_scenario_file(&state, load, &argc, argv);
  }

  if (log_name) {
    char buf[256];
    if (!state.stats_only) {
      state.logfile = fopen(log_name, "w");
      if (state.logfile == NULL) {
	elog(LOG_CRIT, "Unable to open file '%s' for writing: %m", log_name);
	exit(1);
      }
      sprintf(buf, "%s-cdf", log_name);
      state.cdf_logfile = fopen(buf, "w");
      if (state.cdf_logfile == NULL) {
	elog(LOG_CRIT, "Unable to open file '%s' for writing: %m", buf);
	exit(1);
      }
    }

    sprintf(buf, "%s-traf", log_name);
    state.traf_logfile = fopen(buf, "w");
    if (state.traf_logfile == NULL) {
      elog(LOG_CRIT, "Unable to open file '%s' for writing: %m", buf);
      exit(1);
    }
    fprintf(state.traf_logfile, "#h time ID MTU prx brx ptx btx etx erx\n");

    sprintf(buf, "%s-stat", log_name);
    wl_stats_init(&(state.main_stats), state.prefix, log_name, "");
    if (state.do_csync_stats) {
      wl_stats_init(&(state.csync_stats), "csync", log_name, "-csync");
      wl_stats_init(&(state.comb_stats), NULL, log_name, "-comb");
    }
  }
  else {
    elog(LOG_CRIT, "--logfile argument required!");
    fprintf(stderr, "%s\n", state.usage_buf->buf);
    exit(1);
  }    

  if (argc > 1) {
    int i;
    
    fprintf(stderr, "Extra arguments: ");
    for (i=1; i<argc; i++) {
      fprintf(stderr, "\"%s\" ", argv[i]);
    }
    fprintf(stderr, "\n\n");

    fprintf(stderr, "%s\n", state.usage_buf->buf);
    exit(1);
  }

  status_dev_opts_t t_opts = {
    device: {
      devname: sim_path("/dev/workload_control"),
      device_info: &state
    },
    printable: wl_control_usage,
    write: wl_control_cmd
  };
    
  if (g_status_dev(&t_opts, NULL) < 0) {
    elog(LOG_WARNING, "Unable to create control device %s: %m",
	 t_opts.device.devname);
  }

  if (state.local) {  
    if (wl_node_lookup(&(state.nodes), my_node_id) == NULL)
      wl_node_register(&state, my_node_id);
  }
  
  else {
    /* Set up event to read the simulator configuration.  This calls the
     * callback immediately with the initial simulator configuration. */
    sim_opts_t sim_opts = {
      new_config: wl_new_config,
      data: &state
    };
    
    if (g_sim_config(&sim_opts) < 0) {
      elog(LOG_ALERT, "can't get simulator configuration: %m");
      exit(1);
    }
  }
  
  /* set stats timer */
  g_timer_add(WL_STATS_INTERVAL, wl_do_stats, &state, NULL, NULL);
  
  /* we're ready to serve */
  emrun_init(&emrun_opts);

  g_main();
  
  /* this should never be reached */
  elog(LOG_ALERT, "event system terminated abnormally");
  return 1;
}





See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  generate.c
  wl_i.h
  wl_main.c
  wl_plugin_random_rate.c
  wl_plugin_scenario_file.c
  wl_structs.c