Code Search for Developers
 
 
  

netrec.c from EmStar at Krugle


Show netrec.c syntax highlighted

/*
 * Copyright (c) 2006 The Regents of the University of California.  All 
 * rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Neither the name of the University nor the names of its
 *   contributors may be used to endorse or promote products derived
 *   from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */


/*
 *  Acoustic ENS Box Network Control Module
 */

#include "devel/state/ssync.h"
#include "libmisc/misc.h"
#include "link/link.h"
#include <libdev/status_dev.h>
#include <libdev/sensor_client.h>
#include <libdev/option_dev.h>
#include <sys/vfs.h>
#include <math.h>
#include <sensors/vxp.h>
#include <emrun/emrun.h>
#include <libdev/g_signals.h>
#include <wait.h>
#include <devel/remote_storage/remote.h>

typedef struct netrec_state {
  char *prefix;
  ssync_sub_t *subref;  
  g_event_t *recent_master;
  status_context_t *status;
  netrec_table_t pub_table;
  netrec_table_t *table;
  int table_count;
  int load_counter;
  int requested_master_mode;
  uint requested_rate;

  /* recording state */
  int child_pid;
  int last_status;
  char *lastname;

} netrec_state_t;


void alarm_handler(int sig)
{
  elog(LOG_CRIT, "ACK!!! we got an alarm.  File system b0rked?");  
}


static 
void masterize(netrec_state_t *nr)
{
  if (!nr->pub_table.netrec.am_master) {
    nr->pub_table.netrec.am_master = 1;

    /* become gsync master */
    printf_to_file(sim_path("/dev/sync/gsync"), "root");

    /* become loc master */
    printf_to_file(sim_path("/dev/opt/loc/master_mode"), "1");
  }
}


static 
void demasterize(netrec_state_t *nr)
{
  if (nr->pub_table.netrec.am_master) {
    nr->pub_table.netrec.am_master = 0;
    nr->requested_master_mode = 0;
    
    /* unbecome gsync master */
    printf_to_file(sim_path("/dev/sync/gsync"), "unroot");

    /* unbecome loc master */
    printf_to_file(sim_path("/dev/opt/loc/master_mode"), "0");
  }
}

static int netrec_diskfree(char *path)
{
  int retval = 0;
  struct statfs s = {};
  alarm(2);
  int fd = open(path, O_RDONLY);
  if (fd < 0 && errno != EINTR) {
    alarm(2);
    fd = open(path, O_CREAT|O_RDWR);
  }
  if (fd < 0) 
    elog(LOG_WARNING, "Unable to open test file %s: %m", path);
  else {
    alarm(2);
    fstatfs(fd, &s); 
    elog(LOG_DEBUG(0), "got value of %ld,%d", s.f_bfree, s.f_bsize);
    retval = s.f_bfree * (s.f_bsize/1024) / 100;
    alarm(2);
    close(fd);
  }
  alarm(0);
  return retval;
}


/* check the health of the system */
static int netrec_health_check(void *data, int interval, g_event_t *ev)
{
  netrec_state_t *nr = (netrec_state_t *)data;

  /* remasterize .. grrr.  gsync is my nemisis */
  if (nr->pub_table.netrec.am_master) {
    /* become gsync master */
    printf_to_file(sim_path("/dev/sync/gsync"), "root");
  }

  /* RAM free?? $$$ */

  /* timesync check */
  sync_id_t src = {
    node: my_node_id,
    comp: CPU
  };
  sync_id_t dst2 = {
    node: my_node_id,
    comp: GPS
  };
  sync_id_t dst = {
    node: my_node_id,
    comp: sync_clock_no_create(sdev_path("vxp/all"))
  };
      
  struct timeval src_tv, dst_tv;
  int err;
  gettimeofday(&src_tv, NULL);
  nr->pub_table.netrec.have_sync = 1;
      
  if (sync_convert_tv_with_err(&src, &src_tv, &dst, &(dst_tv), &err) < 0) {
    elog(LOG_WARNING, "couldn't convert from %s to %s: %m\n",
	 print_sync_id(&src), print_sync_id(&dst));
    nr->pub_table.netrec.have_sync = 0;
  }

  if (sync_convert_tv_with_err(&src, &src_tv, &dst2, &(dst_tv), &err) < 0) {
    elog(LOG_WARNING, "couldn't convert from %s to %s: %m\n",
	 print_sync_id(&src), print_sync_id(&dst2));
    nr->pub_table.netrec.have_sync = 0;
    nr->pub_table.netrec.gps_seconds = 0;
  }
  else
    nr->pub_table.netrec.gps_seconds = dst_tv.tv_sec;
  
  /* disk free */
  nr->pub_table.netrec.external_flash_MB = netrec_diskfree("/mnt/sd_card/.testfile");
  nr->pub_table.netrec.onboard_flash_MB = netrec_diskfree("/usr/lib/.testfile");

  /* load average.  update only if it differs by at least one */
  char str[256];
  read_from_file("/proc/loadavg", str, sizeof(str));
  float load=0;
  sscanf(str, "%f %*f %*f", &load);
  if (load > 0) {
    nr->load_counter = (nr->load_counter+1)%10;
    if (nr->load_counter == 0 || 
	(fabs(load - nr->pub_table.netrec.load_avg / 100.0) > 1.0))
      nr->pub_table.netrec.load_avg = load*100;
  }
  else {
    elog(LOG_WARNING, "load is <= 0?? %f", load);
  }

  /* read back sample rates from codec */
  buf_t *buf = g_status_client_read_once(DEV_VXP_STATUS, STATUS_M_BINARY, 
					 sizeof(audiod_status_t));
  if (buf) {
    audiod_status_t *sensor_stat = (audiod_status_t *)buf->buf;
    nr->pub_table.netrec.nominal_sample_rate = (int)sensor_stat->nominal_sample_rate;
    nr->pub_table.netrec.sample_rate = (int)sensor_stat->sample_rate;
    int i;
    for (i=0; i<4; i++) {
      nr->pub_table.netrec.levels[i] = (100*abs(sensor_stat->peak[i]))/32768;
      if (abs(sensor_stat->peak[i]) > 10)
	nr->pub_table.netrec.levels[i]++;
      if (nr->pub_table.netrec.levels[i] >= 100)
	nr->pub_table.netrec.levels[i] = 99;
    }
    buf_free(buf);
  }
  else {
    nr->pub_table.netrec.nominal_sample_rate = 0xFFFF;
    nr->pub_table.netrec.sample_rate = 0xFFFF;
  }

  /* read remote storage state */
  buf = g_status_client_read_once(REMOTE_SAVE_CONTROL, STATUS_M_BINARY, 
				  sizeof(struct remote_status));
  if (buf) {
    memmove(&(nr->pub_table.netrec.remote_storage),
	    buf->buf, sizeof(struct remote_status));
    buf_free(buf);
  }
  else {
    memset(&(nr->pub_table.netrec.remote_storage),
	   0, sizeof(struct remote_status));
  }

  /* read back detector state */
  buf = g_status_client_read_once(EVENT_DETECT_STATUS, STATUS_M_BINARY, 
				  sizeof(evdet_status_t));
  if (buf) {
    memmove(&(nr->pub_table.netrec.detector_state),
	    buf->buf, sizeof(evdet_status_t));
    buf_free(buf);
  }
  else {
    memset(&(nr->pub_table.netrec.detector_state),
	   0, sizeof(evdet_status_t));
  }

  /* read back recorder state */
  buf = g_status_client_read_once(EVENT_DETECT_RECORDING, STATUS_M_BINARY, 
				  sizeof(evrec_status_t));
  if (buf) {
    memmove(&(nr->pub_table.netrec.recorder_state),
	    buf->buf, sizeof(evrec_status_t));
    buf_free(buf);
  }
  else {
    memset(&(nr->pub_table.netrec.recorder_state),
	   0, sizeof(evrec_status_t));
  }

  /* read back continuous recorder state */
  buf = g_status_client_read_once("/dev/netrec/recording_status", STATUS_M_BINARY, 
				  sizeof(crec_status_t));
  if (buf) {
    memmove(&(nr->pub_table.netrec.crec_state),
	    buf->buf, sizeof(crec_status_t));
    buf_free(buf);
  }
  else {
    memset(&(nr->pub_table.netrec.crec_state),
	   0, sizeof(crec_status_t));
  }

  /* fill in other stuff */
  nr->pub_table.netrec.last_status = nr->last_status;
  nr->pub_table.netrec.record_pid = nr->child_pid;

  /* republish */
  flow_id_t fid = {
    src: my_node_id,
    dst: LINK_BROADCAST,
    max_hops: 10
  };

  if (netrec_pub(nr->prefix, &(nr->pub_table), 1, &fid) < 0) {
    elog(LOG_WARNING, "Error publishing table: %m");
  }

  return TIMER_RENEW;
}


static
void netrec_process(netrec_state_t *nr)
{
  netrec_table_t *our_master = NULL;

  /* seek max seqno */
  int i;
  uint32_t max=0;
  for (i=0; i<nr->table_count; i++) {
    if (max < nr->table[i].netrec.seqno) 
      max = nr->table[i].netrec.seqno;
  }

  /* if we are a recent master, increase our seqno to max of all */
  if (nr->recent_master) {

    /* check for master that is not me with this seqno */
    for (i=0; i<nr->table_count; i++) {
      if (nr->table[i].netrec.my_master != my_node_id &&
	  nr->table[i].netrec.seqno >= max)
	max++;
    }

    /* set our seqno */
    nr->pub_table.netrec.seqno = max;
    nr->pub_table.netrec.my_master = my_node_id;

    /* now we are the master... */
    masterize(nr);

    our_master = &(nr->pub_table);
  }

  else {

    int need_new = 0;

    if (max == nr->pub_table.netrec.seqno) {
      /* if we are not a master, verify that our master is real */
      if (!nr->pub_table.netrec.am_master && nr->pub_table.netrec.my_master) {
	for (i=0; i<nr->table_count; i++) {
	  if (nr->table[i].netrec.my_master == nr->pub_table.netrec.my_master &&
	      nr->table[i].netrec.am_master) {
	    our_master = &(nr->table[i]);
	    goto done;
	  }
	}
	need_new = 1;
      }
    }

    if (max > nr->pub_table.netrec.seqno || need_new) {
      /* abdicate */
      demasterize(nr);

      /* check for master to join */
      for (i=0; i<nr->table_count; i++) {
	if (nr->table[i].netrec.my_master != my_node_id &&
	    nr->table[i].netrec.seqno >= max &&
	    nr->table[i].netrec.am_master) {
	  nr->pub_table.netrec.my_master = nr->table[i].netrec.my_master;
	  our_master = &(nr->table[i]);
	  goto done;
	}
      }

      /* fall back on hearsay */
      for (i=0; i<nr->table_count; i++) {
	if (nr->table[i].netrec.my_master != my_node_id &&
	    nr->table[i].netrec.seqno >= max) {
	  nr->pub_table.netrec.my_master = nr->table[i].netrec.my_master;
	  our_master = &(nr->table[i]);
	  goto done;
	}
      }
    }

    else {
      if (nr->pub_table.netrec.am_master)
	our_master = &(nr->pub_table);
    }
    
  done:
    ;
  }

  /* update behavior to reflect master state */
  if (our_master == NULL) {
    elog(LOG_WARNING, "No Master!!");
  }

  /* update health info */
  //netrec_health_check(nr, 0, NULL);

}


static
int netrec_table(ssync_sub_t *sub, netrec_table_t *table, 
		 int count, void *data)
{
  netrec_state_t *nr = (netrec_state_t *)data;

  if (nr->table) free(nr->table);
  nr->table = table;
  nr->table_count = count;
  
  netrec_process(nr);
  
  return EVENT_RENEW;
}


static
int netrec_status_print(status_context_t *info, buf_t *buf)
{
  netrec_state_t *nr = (netrec_state_t *)sd_data(info);
  ssync_type_t nrtype = {
    fixed_len: sizeof(aensbox_net_control_t)
  };
  strcat(nrtype.type, "netrec");
  ssync_unparse_cb_t cb = ssync_lookup_unparse(&nrtype);
  
  int i;
  bufprintf(buf, 
	    "Network State\n"
	    "-------------\n");
  cb(buf, NULL, 0, "                ");
  for (i=0; i<nr->table_count; i++) {
    bufprintf(buf, "%15s ", print_if_id(nr->table[i].header.flow_id.src));
    cb(buf, &(nr->table[i].netrec), 0, "");
  }
  bufprintf(buf, "\n");

  bufprintf(buf, 
	    "Detection Application State\n"
	    "---------------------------\n"
	    "Node            Act Detect Alg Skip FFTn Alpha  Thresh Rfrct NoSnc NoNet  Xrun Noise Behnd By\n"
	    "--------------- --- ------ --- ---- ---- ------ ------ ----- ----- ----- ----- ----- ----- ------\n"
	    );
  for (i=0; i<nr->table_count; i++) {
    bufprintf(buf, 
	      "%15s %3d %6d %3d %4d %4d %6.3f %6.3f %5d %5d %5d %5d %5d %5d %6.3f\n"
	      , 
	      print_if_id(nr->table[i].header.flow_id.src),
	      nr->table[i].netrec.detector_state.active,
	      0,  /* need to add this to the netrec structs!! $$$ */
	      nr->table[i].netrec.detector_state.detect_alg,
	      nr->table[i].netrec.detector_state.skip_n,
	      nr->table[i].netrec.detector_state.fftn,
	      nr->table[i].netrec.detector_state.alpha,
	      nr->table[i].netrec.detector_state.hi_thresh,
	      nr->table[i].netrec.detector_state.refract_interval,
	      nr->table[i].netrec.detector_state.fail_no_sync,
	      nr->table[i].netrec.detector_state.fail_no_net,
	      nr->table[i].netrec.detector_state.fail_xrun,
	      nr->table[i].netrec.detector_state.fail_noise_lock,
	      nr->table[i].netrec.detector_state.fail_behind,
	      nr->table[i].netrec.detector_state.behind_by);	      
  }
  bufprintf(buf, "\n");

  bufprintf(buf, 
	    "Recording Application State\n"
	    "---------------------------    (K)     (s)   (K)\n"
	    "Node            Prepad Post   BufMax Maxrun BufLen Qlen FRun FSyn FDat FBuf FWri Saved Active\n"
	    "--------------- ------ ------ ------ ------ ------ ---- ---- ---- ---- ---- ---- ----- ------\n"
	    );
  for (i=0; i<nr->table_count; i++) {
    bufprintf(buf, 
	      "%15s %6d %6d %6d %6.2f %6d %4d %4d %4d %4d %4d %4d %5d %5d\n"
	      , 
	      print_if_id(nr->table[i].header.flow_id.src),
	      nr->table[i].netrec.recorder_state.pre_pad,
	      nr->table[i].netrec.recorder_state.post_pad,
	      nr->table[i].netrec.recorder_state.buffer_max/1000,
	      nr->table[i].netrec.recorder_state.max_run_length/48000.0,
	      nr->table[i].netrec.recorder_state.buffer_size/1000,
	      nr->table[i].netrec.recorder_state.queue_length,
	      nr->table[i].netrec.recorder_state.fail_run_length,
	      nr->table[i].netrec.recorder_state.fail_no_sync,
	      nr->table[i].netrec.recorder_state.fail_no_data,
	      nr->table[i].netrec.recorder_state.fail_buffer_full,
	      nr->table[i].netrec.recorder_state.fail_write_error,
	      nr->table[i].netrec.recorder_state.success_count,
	      nr->table[i].netrec.recorder_state.active);	      
  }
  bufprintf(buf, "\n");

  bufprintf(buf, 
	    "Continuous Recording Application State\n"
	    "--------------------------------------\n"
	    "Node            BufSz    HwMark   Total      Restarts\n"
	    "--------------- -------- -------- ---------- --------\n"
	    );
  for (i=0; i<nr->table_count; i++) {
    bufprintf(buf, 
	      "%15s %8d %8d %10d %8d\n",
	      print_if_id(nr->table[i].header.flow_id.src),
	      nr->table[i].netrec.crec_state.buf_size,
	      nr->table[i].netrec.crec_state.hwmark,
	      nr->table[i].netrec.crec_state.total,
	      nr->table[i].netrec.crec_state.restart_count);
  }
  bufprintf(buf, "\n");

  bufprintf(buf, "Commands:\n"
	    "  set_rate=<rate>\n"
	    "  record_command=<recording command string>\n\n"
	    "Note: To set into master mode: \n"
	    "         echo 1 > /dev/opt/netrec/master_mode \n"
	    "      To set the global sample rate: \n"
	    "         echo rate > /dev/opt/netrec/sample_rate \n\n");

  return STATUS_MSG_COMPLETE;
}


static
int netrec_status_write(status_context_t *info, char *command, size_t buf_size)
{
  //netrec_state_t *nr = (netrec_state_t *)sd_data(info);

  /* 
   * handle following commands:
   *  - set sample rate by rbsh 
   */
  
  parser_state_t *ps = misc_parse_init(command, MISC_PARSE_COLON_SCHEME);
  int retval = EVENT_RENEW;

  while (misc_parse_next_kvp(ps) >= 0) {

    if (strcmp(ps->key, "set_rate") == 0) {
      if (ps->value) {
	char str[256];
	sprintf(str, "/home/emstar/bin/rbsh -U flood --timeout 100"
		" \"echo rate=%s > /dev/sensors/vxp/status\"", 
		ps->value);
	system(str);
      }
      else {
	elog(LOG_WARNING, "set_rate with missing argument");
	retval = EVENT_ERROR(EINVAL);
      }
    }
    
    else if (strcmp(ps->key, "umount") == 0) {
      char str[256];
      sprintf(str, "/home/emstar/bin/rbsh -U flood --timeout 100"
	      " \"echo umount > %s\"", 
	      REMOTE_SAVE_CONTROL);
      system(str);
    }
    
    else if (strcmp(ps->key, "mount") == 0) {
      char str[256];
      sprintf(str, "/home/emstar/bin/rbsh -U flood --timeout 100"
	      " \"echo mount > %s\"", 
	      REMOTE_SAVE_CONTROL);
      system(str);
    }
    
    else if (strcmp(ps->key, "umountsd") == 0) {
      char str[256];
      sprintf(str, "/home/emstar/bin/rbsh -U flood --timeout 100"
	      " \"sync && umount /mnt/sd_card\"");
      system(str);
    }
    
    else if (strcmp(ps->key, "mountsd") == 0) {
      char str[256];
      sprintf(str, "/home/emstar/bin/rbsh -U flood --timeout 100"
	      " \"mount /mnt/sd_card\"");
      system(str);
    }
    
    else if (strcmp(ps->key, "record_command") == 0) {
      if (ps->value) {
	int remain;
	char *rest = misc_parse_get_raw_value(ps, &remain);
	char *rcpy = strdup(rest);
	char *ptr = strchr(rcpy, '\n');
	if (ptr) *ptr = 0;
	char str[256];
	struct timeval now;
	gettimeofday(&now, NULL);
	sprintf(str, "/home/emstar/bin/rbsh -U flood --timeout 100 "
		"\"echo %s:now=%ld.%06ld > /dev/netrec/recorder\"", 
		rcpy, now.tv_sec, now.tv_usec);
	free(rcpy);
	system(str);
      }
      break;
    }
    
    else {
      elog(LOG_WARNING, "Unrecognized command: %s", command);
    }
  }

  misc_parse_cleanup(ps);

  return retval;
}


static
int record_child(void *data, int sig, g_signal_context_t *ev)
{
  netrec_state_t *nr = (netrec_state_t *)data;
  int status;
  int pid = waitpid(nr->child_pid, &status, WNOHANG);
  if (pid != nr->child_pid) {
    elog(LOG_WARNING, "sigchld not our child: %m");
    return EVENT_RENEW;
  }
  elog(LOG_WARNING, "recording complete? status %x", status);
  nr->child_pid = 0;
  nr->last_status = status;
  return EVENT_DONE;
}

static
int netrec_recorder_print(status_context_t *info, buf_t *buf)
{
  netrec_state_t *nr = (netrec_state_t *)sd_data(info);

  bufprintf(buf, 
	    "netrec recording control (node %d)\n"
	    "%s (pid %d)\n\n"
	    "Commands:\n"
	    "  stop,kill: to stop or kill the recording process\n"
	    "  start: \n"
	    "    wavpack,single/all,dur,name,sync\n\n",
	    my_node_id, 
	    nr->child_pid ? "Recording" : "Idle",
	    nr->child_pid);

  return STATUS_MSG_COMPLETE;
}

static
int netrec_last_name_print(status_context_t *info, buf_t *buf)
{
  netrec_state_t *nr = (netrec_state_t *)sd_data(info);
  bufprintf(buf, "%s\n", nr->lastname);
  return STATUS_MSG_COMPLETE;
}

static
int netrec_recorder_write(status_context_t *info, char *command, size_t buf_size)
{
  netrec_state_t *nr = (netrec_state_t *)sd_data(info);
  int use_wavpack = 0;
  int single = 0;
  int remote = 0;
  int dur = 60;
  char name[80];
  static int counter = 1;

  sprintf(name, "rec%d", counter);

  /* 
   * handle start/stop recording
   */

  if (strncmp(command, "stop", 4) == 0) {
    if (nr->child_pid) 
      kill(nr->child_pid, 2);
    else {
      elog(LOG_WARNING, "recorder not running.");
    }
    goto done;
  }

  if (strncmp(command, "kill", 4) == 0) {
    if (nr->child_pid) 
      kill(nr->child_pid, 9);
    else {
      elog(LOG_WARNING, "recorder not running.");
    }
    goto done;
  }

  if (strncmp(command, "start", 5) == 0) {
    if (nr->child_pid) {
      elog(LOG_WARNING, "recorder already running.");
      goto done;
    }
  
    parser_state_t *ps = misc_parse_init(command, MISC_PARSE_COLON_SCHEME);
    int retval = EVENT_RENEW;
    int sync = 0;
    char *now = NULL;
    
    while (misc_parse_next_kvp(ps) >= 0) {
      
      if (strcmp(ps->key, "wavpack") == 0) {
	if (ps->value == NULL || 
	    ps->value[0] == '1') 
	  use_wavpack = 1;
      }

      else if (strcmp(ps->key, "dur") == 0) {
	if (ps->value)
	  dur = atoi(ps->value);
      }

      else if (strcmp(ps->key, "single") == 0) {
	if (ps->value == NULL || 
	    ps->value[0] == '1') 
	  single = 1;
      }

      else if (strcmp(ps->key, "remote") == 0) {
	if (ps->value == NULL || 
	    ps->value[0] == '1') 
	  remote = 1;
      }

      else if (strcmp(ps->key, "sync") == 0) {
	if (ps->value == NULL || 
	    ps->value[0] == '1') 
	  sync = 1;
      }

      else if (strcmp(ps->key, "now") == 0) {
	if (ps->value) {
	  now = strdup(ps->value);
	}
      }

      else if (strcmp(ps->key, "all") == 0) {
	if (ps->value == NULL || 
	    ps->value[0] == '1') 
	  single = 0;
      }

      else if (strcmp(ps->key, "name") == 0) {
	if (ps->value) {
	  strcpy(name, ps->value);
	  if (nr->lastname) free(nr->lastname);
	  nr->lastname = strdup(name);
	}
      }

      else if (strcmp(ps->key, "start") == 0) {
	counter++;
      }

      else {
	elog(LOG_WARNING, "set_rate with missing argument");
	retval = EVENT_ERROR(EINVAL);
      }
    }

    misc_parse_cleanup(ps);
    
    char sensor_name[80];
    sprintf(sensor_name, "vxp/%s", single ? "single" : "all");

    char file_path[80];
    if (remote)
      sprintf(file_path, "/remote/%s", name);
    else
      sprintf(file_path, "/mnt/sd/%s", name);

    char dur_str[10];
    sprintf(dur_str, "%d", dur);
    
    char *str[21] = {
      "/home/emstar/sensors/record_from",
      "--scale",
      "20",
      "--sensor_clock",
      "/dev/sensors/vxp/all",
      "--sensor",
      sensor_name,
      "--path",
      file_path,
      "--dur",
      dur_str,
      "--netrec",
      "--restart",
      "--max_buf",
      "4",
      "--start_clock",
      "gps"
    };

    int next = 17;
    if (use_wavpack)
      str[next++] = "--wavpack";
    if (sync) {
      str[next++] = "--start";
      str[next++] = now;
    }

    str[next++] = NULL;

    nr->child_pid = fork();
    if (nr->child_pid == 0) {
      execv(str[0], str);
      elog(LOG_CRIT, "failed to exec record_from: %m");
      exit(0);
    }

    if (now) free(now);

    if (nr->child_pid < 0) {
      elog(LOG_CRIT, "cant fork!: %m");
      exit(1);
    }

    g_signal_opts_t opts = {
      signo: SIGCHLD,
      callback: record_child,
      data: nr
    };

    g_signal_handler(&opts, NULL);	     
  }
  
 done:
  return EVENT_RENEW;
}


static
void netrec_master_mode_notify(char opt_char, void *new_value, void *device_info)
{
  netrec_state_t *nr = (netrec_state_t *)device_info;
  
  if (nr->requested_master_mode) {
    g_event_destroy(nr->recent_master);
    g_timer_add(60000, NULL, NULL, NULL, &(nr->recent_master));
  }

  netrec_process(nr);
}

static
void netrec_sample_rate_notify(char opt_char, void *new_value, void *device_info)
{
  netrec_state_t *nr = (netrec_state_t *)device_info;
  char str[80];
  sprintf(str, "set_rate=%d", nr->requested_rate);
  netrec_status_write(nr->status, str, strlen(str));
}


/*
 *  main and initialization
 */

void netrec_shutdown(void *data)
{
  elog_g(LOG_NOTICE, "AENSBox Network Contoller shutting down");
  exit(0);
}


void usage(const char *name)
{
  misc_print_usage
    (name, 
     "--prefix <ssync_device> ",
     "  --prefix: specify the statesync interface to push into\n"
     );
  exit(1);
}


int main(int argc, char *argv[])
{
  netrec_state_t nr = {
    requested_rate: 48000
  };
  
  /* generic init */
  misc_init(&argc, argv, CVSTAG);

  /* required arguments */

  if (misc_parse_out_switch(&argc, argv, "help", 'h')) 
    usage(argv[0]);

  nr.prefix = misc_parse_out_option(&argc, argv, "prefix", 0);
  if (nr.prefix == NULL) {
    elog(LOG_CRIT, "Missing required argument: mhsync prefix");
    usage(argv[0]);
  }
 
  /* ok, no more args */
  if (misc_args_remain(&argc, argv)) {
    elog(LOG_CRIT, "Extra args.. aborting");
    usage(argv[0]);
  }

  /* set up configurable options */
  option_dev_opts_t opt_opts = {
    root: "netrec",
    argc: &argc,
    argv: argv,
    data: &nr
  };

  opt_opts.notify = netrec_master_mode_notify;
  if (g_b_option_dev("master_mode", 0, &(nr.requested_master_mode),
		     &opt_opts, NULL) < 0) {
    elog(LOG_WARNING, "unable to create options device master_mode: %m");
    exit(1);
  }
  
  opt_opts.notify = netrec_sample_rate_notify;
  if (g_u_option_dev("sample_rate", 0, &(nr.requested_rate),
		     8000, 48000, &opt_opts, NULL) < 0) {
    elog(LOG_WARNING, "unable to create options device sample_rate: %m");
    exit(1);
  }
  
  status_dev_opts_t s_opts = {
    device: {
      devname: sim_path("/dev/netrec/control"),
      device_info: &nr
    },
    printable: netrec_status_print,
    write: netrec_status_write
  };

  if (g_status_dev(&s_opts, &(nr.status)) < 0) {
    elog(LOG_CRIT, "Unable to register status device %s: %m", 
	 s_opts.device.devname);
    exit(1);
  }

  status_dev_opts_t s_opts2 = {
    device: {
      devname: sim_path("/dev/netrec/recorder"),
      device_info: &nr
    },
    printable: netrec_recorder_print,
    write: netrec_recorder_write
  };

  if (g_status_dev(&s_opts2, NULL) < 0) {
    elog(LOG_CRIT, "Unable to register status device %s: %m", 
	 s_opts2.device.devname);
    exit(1);
  }

  status_dev_opts_t s_opts3 = {
    device: {
      devname: sim_path("/dev/netrec/last_name"),
      device_info: &nr
    },
    printable: netrec_last_name_print,
  };

  if (g_status_dev(&s_opts3, NULL) < 0) {
    elog(LOG_CRIT, "Unable to register status device %s: %m", 
	 s_opts3.device.devname);
    exit(1);
  }

  /* open sub channel */
  if (netrec_sub_open(nr.prefix, netrec_table, &nr, &(nr.subref)) < 0) {
    elog(LOG_CRIT, "Can't connect to mhsync to subscribe to netrec feed: %m");    
    exit(1);
  }
  ssync_sub_show_stale(nr.subref, 1);
     
  /* set up a timer to periodically check out the system */
  g_timer_add(10000, netrec_health_check, &nr, NULL, NULL);

  /* set alarm handler for fs check */
  signal(SIGALRM, alarm_handler);

  /* connect to emrun */
  emrun_opts_t emrun_opts = {
    shutdown: netrec_shutdown
  };
  emrun_init(&emrun_opts); /* this init should be done last */
    
  /* run */
  elog_g(LOG_INFO, "AENSBox Network Recording Application");
  
  g_main();
  return 0;
}





See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  netrec.c