Code Search for Developers
 
 
  

event_detect.c from EmStar at Krugle


Show event_detect.c syntax highlighted

/*
 * Copyright (c) 2006 The Regents of the University of California.  All 
 * rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Neither the name of the University nor the names of its
 *   contributors may be used to endorse or promote products derived
 *   from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */


/*
 *  Acoustic Event Detection 
 */

#include <libdev/sensor_client.h>
#include <libdev/status_dev.h>
#include <emrun/emrun.h>
#include <emrun/fault_logger.h>
#include "libmisc/misc.h"
#include "link/link.h"
#include <libnl/nl.h>
#include <libnl/decim.h>
#include <sensors/event_detector.h>


/* times passes through the loop */
#undef TIMING

/* disables skipping for testing purposes */
#undef NOCHEAT

/*
 *  Basic algorithm:
 *    Stream 1 channel in
 *    Decimate
 *    Hanning, 32 point FFT, sum bins, skip N
 *    EWMA Thresholding
 *
 */

#define DECIMATION_FACTOR 2

/* 16 tap FIR filter: 8-12Khz Lowpass */
float lowpass[16] = {
  0.0197555,
  0.0348693,
  0.0115786,
  -0.0541005,
  -0.0904721,
  -0.0071750,
  0.1879367,
  0.3574557,  
  0.3574557,
  0.1879367,
  -0.0071750,
  -0.0904721,
  -0.0541005,
  0.0115786,
  0.0348693,
  0.0197555
};



typedef struct estimator_state {
  float thresh_value;
  int trigger;
  float smoothed_mean;
  float smoothed_var;
  uint64_t start;
  float alpha;
  float hi_thresh;
  float trigger_value;
  int startup;
  int refract;
  int refract_interval;
} est_t;


typedef struct evdet_state {
  status_context_t *status;
  sensor_client_ctx_t *sc;
  int fftn;
  int fft_points;
  int skip_n;
  int counter;
  int detect_alg;
  float *hanning;
  float *decimation_state;
  float *decimation_coeffs;
  int tap_count;
  uint64_t last_index;
  active_fault_t *xrun;
  active_fault_t *nosync;
  char *flood_interface;
  char *sensor;
  char *sensor_clock;
  lu_context_t *flood_event;
  int clock_id;
  int clock_scalar;

  est_t e[4];

  int active;
  int fail_no_sync;
  int fail_no_net;
  int fail_xrun;
  int fail_behind;
  int fail_noise_lock;
  int detection_count;

  int dump_detector;
  int dump_spectrum;

  int rate;
  struct timeval started_at;
  uint64_t first_sample;
  uint64_t check_after;
  int64_t behind_by;

  g_event_t *refract_timer;
} evdet_state_t;


void evdet_start(evdet_state_t *ed);
void evdet_notify(evdet_state_t *ed);

static
int schedule_restart(void *data, int period, g_event_t *event)
{
  evdet_state_t *ed = (evdet_state_t *)data;
  evdet_start(ed);
  return EVENT_DONE;
}

static inline
float UpdateEWMA(float new_value, float *state, float alpha)
{
  return *state = new_value * (1.0-alpha) + *state * alpha;
}


/* noise estimator */
void evdet_estimator(evdet_state_t *ed, est_t *est, float V, uint64_t start_sample)
{
  int this_trigger = 0;

  if (est->trigger) {
    if ((start_sample - est->start) > MAX_RUN_LENGTH) {
      elog(LOG_WARNING, "Detection length exceeded maximum of %d, "
	   "re-estimating noise", MAX_RUN_LENGTH);
      ed->fail_noise_lock++;

      /* restarting from the callback seems to be bad */
      g_timer_add(0, schedule_restart, ed, NULL, NULL);      
    }
    if (V > est->thresh_value) {
      est->refract = est->refract_interval;
    }
    else if (est->refract > 0) {
      est->refract--;
    }
    else {
      est->trigger = 0;
      if ((ed->dump_detector || ed->dump_spectrum)) {
	elog(LOG_WARNING, "Detection at %lld, length %lld", 
	     est->start, start_sample - est->start);
	this_trigger = 1;
      }

      ed->detection_count++;

      /* emit via flooding */
      link_pkt_t hdr = {
	dst: { id: LINK_BROADCAST },
	type: PKT_TYPE_ACOUSTIC_EVENT
      }; 

      audio_trigger_pkt_t payload = {
	run_length: (start_sample - est->start)
      };

      sync_id_t src = {
	node: my_node_id,
	comp: ed->clock_id
      };
      sync_id_t dest = {
	node: my_node_id,
	comp: GPS
      };
      
      struct timeval src_tv;
      ll_to_tv(&src_tv, est->start*ed->clock_scalar);
      
      if (sync_convert_tv(&src, &src_tv, &dest, &(payload.global_start_time)) < 0) {
	elog(LOG_WARNING, "couldn't convert from %s to %s: %m\n",
	     print_sync_id(&src), print_sync_id(&dest));
	char str[20];
	snprintf(str, sizeof(str), "NoSync");
	fault_raise_indicator(str, &(ed->nosync));
	ed->fail_no_sync++;
	evdet_notify(ed);
      }
      
      else {
	fault_clear_indicator(ed->nosync);
	buf_t *pkt_buf = buf_new();
	bufcpy(pkt_buf, &hdr, sizeof(hdr));
	bufcpy(pkt_buf, &payload, sizeof(payload));
	if (lu_send(ed->flood_event, (link_pkt_t*)(pkt_buf->buf), sizeof(payload)) < 0) {
	  elog(LOG_WARNING, "Failed to send message to flood adaptor: %m");
	  ed->fail_no_net++;
	  evdet_notify(ed);
	}
	buf_free(pkt_buf);
      }
    }
  }

  else {
    float thresh = est->hi_thresh*sqrtf(est->smoothed_var) + est->smoothed_mean;
    
    if ((est->startup == 0) && V > thresh) {
      est->trigger = 1;
      est->refract = est->refract_interval;
      est->thresh_value = thresh;
      est->start = start_sample;
      est->trigger_value = V;
    }
    else {
      UpdateEWMA(V, &(est->smoothed_mean), est->alpha);
      UpdateEWMA(sqrf(V-est->smoothed_mean), &(est->smoothed_var), est->alpha);
    }

    if (est->startup) 
      est->startup--;    
  }

  if (ed->dump_detector) {
    printf("%f %f %f %f ", V, est->smoothed_mean+sqrtf(est->smoothed_var), 
	   est->smoothed_mean+est->hi_thresh*sqrtf(est->smoothed_var),
	   est->trigger ? V : 0);
    if (this_trigger)
      printf("\n%f %f %f DETECT",
	     est->start / 48000.0, start_sample / 48000.0,
	     est->trigger_value);
  }
}


/* on data arrival */
int evdet_data_arrived(void* private_arg, sc_series_t* sample)
{
  evdet_state_t *ed = (evdet_state_t *)private_arg;
  int i;
  int count = sample->num_samples; 
  float buf[count];

#ifdef TIMING
  struct timeval entry;
  struct timeval exiting;
  gettimeofday(&entry, NULL);
#endif

  /* check for xrun */
  if (ed->last_index && ed->last_index != sample->start_sample) {
    elog(LOG_WARNING, "XRUN!!!");
    char str[20];
    snprintf(str, sizeof(str), "XRun:%.2f", (sample->start_sample - ed->last_index)/48000.0);
    fault_raise_indicator(str, &(ed->xrun));
    ed->fail_xrun++;
    evdet_notify(ed);
  }  
  ed->last_index = sample->start_sample + sample->num_samples;
  if (sample->num_samples != ed->fft_points * DECIMATION_FACTOR) {
    elog(LOG_WARNING, "wrong number of samples: %d", sample->num_samples);
    goto done;
  }

  /* check how far behind */
  if (ed->last_index > ed->check_after) {
    if (ed->first_sample == 0) ed->first_sample = sample->start_sample;
    ed->check_after += 5*ed->rate;
    ed->behind_by = msec_since(&(ed->started_at));
    ed->behind_by *= (ed->rate / 1000);
    ed->behind_by -= (ed->last_index - ed->first_sample);
    if (ed->behind_by > ed->rate) {
      elog(LOG_WARNING, "Over 1 sec behind.. restarting detector");
      ed->fail_behind++;
      evdet_notify(ed);
      /* restarting from the callback seems to be bad */
      g_timer_add(0, schedule_restart, ed, NULL, NULL);
      return EVENT_RENEW;
    }
  }

  /* inc counter */
  if (ed->skip_n > 0) ed->counter = (ed->counter + 1)%(ed->skip_n + 1);

#ifndef NOCHEAT
  /* if we're skipping, then just pre-load the state vector */
  if (ed->counter) {
    if (ed->counter == ed->skip_n) 
      for (i=0; i<ed->tap_count; i++) {
	/* decimation state is ordered from newest to oldest */
	ed->decimation_state[i] = ((int16_t*)(sample->data->buf))[sample->num_samples-1-i];
      }
    goto done;
  }
#endif
  
  /* copy to float working buffer */
  for (i=0; i<count; i++)
    buf[i] = ((int16_t*)(sample->data->buf))[i];
  
  /* decimate in place */
  decimf(DECIMATION_FACTOR, ed->tap_count, ed->decimation_coeffs, 
	 ed->decimation_state, sample->num_samples, buf,
	 buf, &count);

#ifdef NOCHEAT
  if (ed->counter) {
    goto done;
  }
#endif

  /* now hanning and FFT */
  nl_mult_into(buf, ed->hanning, ed->fft_points);
  complex *fft = nl_fft_inplace(buf, ed->fft_points);

#ifdef NOCHEAT
  float norms[16];
  for (i=0; i<16; i++) norms[i] = Cnorm(fft[i]);
  goto done;
#endif

  /* OK, at this point buf[0..15] are 0-12 KHz.
   * now we select our frequencies (bins 3-4) */

  if (!(ed->dump_detector || ed->dump_spectrum)) {
    switch (ed->detect_alg) {
    case 1:
      evdet_estimator(ed, &(ed->e[0]), Cnorm(Cadd(fft[3], fft[4])), ed->last_index); 
      break;
      
    case 2:
      evdet_estimator(ed, &(ed->e[0]), Cnorm(fft[3])+Cnorm(fft[4]), ed->last_index);
      break;
      
    case 3:
      evdet_estimator(ed, &(ed->e[0]), Cnorm(fft[3])*Cnorm(fft[4]), ed->last_index);
      break;
      
    default:
      break;
    }
  }

  else {
    if (ed->dump_spectrum) {
      /* (destructive) magnitude */
      for (i=0; i<(ed->fft_points/2); i++) 
	buf[i] = Cnorm(fft[i]);
      
      for (i=0; i<16; i++) {
	printf("%f %d %f\n", ed->last_index / 48000.0, i, buf[i]);
      }
      printf("\n");
    }
    
    else if (ed->dump_detector) {
      float cs = Cnorm(Cadd(fft[4], Cadd(fft[5], Cadd(fft[6], fft[7]))));
      
#if 0
      /* (destructive) magnitude */
      for (i=0; i<(ed->fft_points/2); i++) 
	buf[i] = Cnorm(fft[i]);
      
      /* OK, at this point buf[0..15] are 0-12 KHz.
       * now we select our frequencies (bins 4-7) */
      float sum = buf[4]+buf[5]+buf[6]+buf[7];    
      float m = buf[4]*buf[5]*buf[6]*buf[7];
#endif

      printf("%f ", ed->last_index / 48000.0);
      
      /* Now estimate noise and detect */
      evdet_estimator(ed, &(ed->e[1]), cs, ed->last_index);
      //evdet_estimator(ed, &(ed->e[2]), sum, ed->last_index);
      //evdet_estimator(ed, &(ed->e[3]), m, ed->last_index);
      printf("\n");
    }
  }

 done:
#ifdef TIMING
  gettimeofday(&exiting, NULL);
  
  misc_tv_sub(&exiting, &entry);
  static uint64_t global_usec = 0;
  global_usec += exiting.tv_usec;
  printf("%ld, total %lld\n", exiting.tv_usec, global_usec);
#endif
  
  return EVENT_RENEW;
}


void evdet_start(evdet_state_t *ed)
{
  elog(LOG_NOTICE, "Starting streaming detector...");
  gettimeofday(&ed->started_at, NULL);
  ed->first_sample = 0;
  ed->check_after = 0;

  /* set clock id */
  ed->clock_id = sync_clock_no_create(sdev_path(ed->sensor_clock));

  /* open stream */
  if (ed->sc) sc_destroy(ed->sc);
  if (sc_open_stream(ed->sensor, 0, 1, ed->fft_points * DECIMATION_FACTOR,
		     evdet_data_arrived, ed, &(ed->sc)) < 0) {
    elog(LOG_WARNING, "Unable to open sensor device: %m");
    return;
  }

  /* clear past fault */
  fault_clear_indicator(ed->xrun);

  /* init counter */
  ed->counter = 0;
  ed->last_index = 0;
  
  /* init smoothing filter */
  int i;
  for (i=0; i<4; i++) {
    ed->e[i].startup = 300;
    ed->e[i].trigger = 0;
    ed->e[i].smoothed_mean = 0;
    ed->e[i].smoothed_var = 0;
    ed->e[i].refract = 0;
  }

  /* pre-compute hanning function */
  if (ed->hanning) free(ed->hanning);
  ed->hanning = nl_hanning(ed->fft_points);
  
  /* initialize decimation filter */
  if (ed->decimation_state) free(ed->decimation_state);
  ed->decimation_state = malloc(sizeof(float)*ed->tap_count);
  memset(ed->decimation_state, 0, sizeof(float)*ed->tap_count);
}


void evdet_stop(evdet_state_t *ed)
{
  elog(LOG_NOTICE, "Stopping streaming detector...");

  /* close stream */
  if (ed->sc) sc_destroy(ed->sc);
}


/* rate limit notifications */
int evdet_notify_aux(void *data, int period, g_event_t *event)
{
  evdet_state_t *ed = (evdet_state_t *)data;
  g_status_dev_notify(ed->status);
  return EVENT_DONE;
}

void evdet_notify(evdet_state_t *ed)
{
  if (ed->refract_timer) return;
  g_timer_add(1000, evdet_notify_aux, ed, NULL, &(ed->refract_timer));
}


static
int evdet_bin_status(status_context_t *info, buf_t *buf) 
{
  evdet_state_t *ed = (evdet_state_t *)sd_data(info);

  evdet_status_t s = {
    detect_alg: ed->detect_alg,
    skip_n: ed->skip_n,
    fftn: ed->fftn,
    active: ed->active,
    hi_thresh: ed->e[0].hi_thresh,
    refract_interval: ed->e[0].refract_interval,
    alpha: ed->e[0].alpha,
    fail_no_sync: ed->fail_no_sync,
    fail_no_net: ed->fail_no_net,
    fail_xrun: ed->fail_xrun,
    fail_behind: ed->fail_behind,
    behind_by: ed->behind_by / (float)(ed->rate)
  };

  bufcpy(buf, &s, sizeof(s));
  
  return STATUS_MSG_COMPLETE;
}

static
int evdet_print_status(status_context_t *info, buf_t *buf) 
{
  evdet_state_t *ed = (evdet_state_t *)sd_data(info);

  bufprintf(buf,
	    "Acoustic Event Detector (Node %d)\n"
	    "-----------------------\n\n"
	    "Parameters:\n"
	    " active:  %d\n"
	    " alg:     %d\n"
	    " fftn:    %d\n"
	    " points:  %d\n"
	    " skip_n:  %d\n"
	    " thresh:  %f\n"
	    " refract: %d\n"
	    " alpha:   %f\n"
	    "\n"
	    "Faults:\n"
	    " Sync:    %d\n"
	    " Xrun:    %d\n"
	    " Net:     %d\n"
	    " Lock:    %d\n"
	    " Behind:  %d\n"
	    "\n"
	    "%d detections\n"
	    "Behind by %f sec\n\n"
	    ,
	    my_node_id,
	    ed->active,
	    ed->detect_alg,
	    ed->fftn,
	    ed->fft_points,
	    ed->skip_n,
	    ed->e[0].hi_thresh,
	    ed->e[0].refract_interval,
	    ed->e[0].alpha,
	    ed->fail_no_sync,
	    ed->fail_xrun,
	    ed->fail_no_net,
	    ed->fail_noise_lock,
	    ed->fail_behind,
	    ed->detection_count,
	    ed->behind_by / (float)(ed->rate)
	    );

  return STATUS_MSG_COMPLETE;
}


static
int evdet_command(status_context_t *ctx, char *command, size_t buf_size)
{
  evdet_state_t *ed = (evdet_state_t *)sd_data(ctx);

  parser_state_t *ps = misc_parse_init(command, MISC_PARSE_COLON_SCHEME);
  int retval = EVENT_RENEW;

  while (misc_parse_next_kvp(ps) >= 0) {

    if (strcmp(ps->key, "fftn") == 0) {
      if (ps->value) {
	ed->fftn = atoi(ps->value);
	if (ed->fftn == 0) ed->fftn = 32;
	ed->fft_points = 1 << ed->fftn;
      }
    }
    
    else if (strcmp(ps->key, "alg") == 0) {
      if (ps->value) {
	ed->detect_alg = atoi(ps->value);
	if (ed->detect_alg == 0) ed->detect_alg = 1;
      }
    }
    
    else if (strcmp(ps->key, "skip_n") == 0) {
      if (ps->value) {
	ed->skip_n = atoi(ps->value);
      }
    }
    
    /* NOTE alg must be set first!! */
    else if (strcmp(ps->key, "thresh") == 0) {
      if (ps->value) {
	if (1 != sscanf(ps->value, "%f", &(ed->e[0].hi_thresh))) {
	  ed->e[0].hi_thresh = 5;
	}
      }
    }
    
    else if (strcmp(ps->key, "refract") == 0) {
      if (ps->value) {
	ed->e[0].refract_interval = atoi(ps->value);
	if (ed->e[0].refract_interval == 0) 
	  ed->e[0].refract_interval = 200;
      }
    }

    else if (strcmp(ps->key, "alpha") == 0) {
      if (ps->value) {
	if (1 != sscanf(ps->value, "%f", &(ed->e[0].alpha))) {
	  ed->e[0].alpha = 0.999;
	}
      }
    }
    
    else if (strcmp(ps->key, "start") == 0) {
      ed->active = 1;
    }
    
    else if (strcmp(ps->key, "stop") == 0) {
      ed->active = 0;
    }
    
    else {
      elog(LOG_WARNING, "Unrecognized command: %s", command);
    }
  }

  int i;
  for (i=1; i<3; i++) {
    ed->e[i] = ed->e[0];
  }

  /* restarting */
  evdet_stop(ed);
  if (ed->active)
    evdet_start(ed);

  evdet_notify(ed);

  return retval;
}


/*
 *  main and initialization
 */


void usage(char *name)
{
  misc_print_usage
    (name, 
     "--sensor <sensor-name> [--sensor_clock <sensor-name>] [--clock_scale <factor>] \n"
     "  [--dump_detect] [--dump_spect] ",
     "  --sensor <sensor-name>: the sensor to record\n"
     "  --sensor_clock <sensor-name>: the sensor timebase to use\n"
     "  --clock_scale <factor>: clock scale to usec\n"
     "  --dump_detect: dump detection profile info\n"
     "  --dump_spect: dump spectragram\n"
     "  \n"
     );
  exit(1);

}


void evdet_shutdown(void *data)
{
  elog_g(LOG_NOTICE, "Event Detector shutting down");
  exit(0);
}


int main(int argc, char **argv)
{
  evdet_state_t ed = {
    fft_points: 32,
    skip_n: 3,
    tap_count: 16,
    decimation_coeffs: lowpass,
    flood_interface: "flood",
    detect_alg: 1,
    clock_scalar: 1,
    rate: 48000,

    e: {
      /* runtime */
      {
	alpha: 0.999,
	startup: 300,
	hi_thresh: 32,
	refract_interval: 40
      },
      
      /* complex sum */
      {
	alpha: 0.999,
	startup: 300,
	hi_thresh: 32,
	refract_interval: 40
      },
      
      /* sum of norms */
      {
	alpha: 0.999,
	startup: 300,
	hi_thresh: 32,
	refract_interval: 40
      },
      
      /* product of norms */
      {
	alpha: 0.999,
	startup: 300,
	hi_thresh: 32,
	refract_interval: 40
      },
    }
  };
  
  /* generic init */
  misc_init(&argc, argv, CVSTAG);

  if (misc_parse_out_switch(&argc, argv, "help", 'h'))
    usage(argv[0]);

  /* sensor */
  ed.sensor = misc_parse_out_option(&argc, argv, "sensor", 0);
  if (ed.sensor == NULL) {
    elog(LOG_WARNING, "Missing required argument sensor <name>");
    exit(1);
  }
  ed.sensor_clock = misc_parse_out_option(&argc, argv, "sensor_clock", 0);
  if (ed.sensor_clock == NULL)
    ed.sensor_clock = ed.sensor;

  ed.dump_detector = misc_parse_out_switch(&argc, argv, "dump_detect", 0);
  ed.dump_spectrum = misc_parse_out_switch(&argc, argv, "dump_spect", 0);

  misc_parse_option_as_int(&argc, argv, "clock_scale", 0,
			    &(ed.clock_scalar));
  
  if (misc_args_remain(&argc, argv))
    usage(argv[0]);

  /* open local flood adaptor */
  lu_opts_t opts = {
    opts: {
      name: ed.flood_interface,
      data: &ed
    },
  };
  
  if (lu_open(&opts, &(ed.flood_event)) < 0) {
    elog(LOG_CRIT, "Unable to open flood adapter: %m");
    if (!(ed.dump_detector || ed.dump_spectrum))
      ;//exit(1);
  }

  /* trigger to start experiment */
  status_dev_opts_t cmd_opts = {
    device: {
      devname: EVENT_DETECT_STATUS,
      device_info: &ed
    },
    write: evdet_command,
    printable: evdet_print_status,
    binary: evdet_bin_status
  };
  
  if (g_status_dev(&cmd_opts, &(ed.status)) < 0) {
    elog(LOG_CRIT, "can't create command dev: %m");
    exit(1);
  }

  emrun_opts_t emrun_opts = {
    shutdown: evdet_shutdown,
    data: &ed
  };
  emrun_init(&emrun_opts);
  
  /* run */
  elog_g(LOG_INFO, "Event Detection daemon starting...");
  
  g_main();
  return 0;
}




See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  event_detect.c
  event_recorder.c