Show event_detect.c syntax highlighted
/*
* Copyright (c) 2006 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/*
* Acoustic Event Detection
*/
#include <libdev/sensor_client.h>
#include <libdev/status_dev.h>
#include <emrun/emrun.h>
#include <emrun/fault_logger.h>
#include "libmisc/misc.h"
#include "link/link.h"
#include <libnl/nl.h>
#include <libnl/decim.h>
#include <sensors/event_detector.h>
/* times passes through the loop */
#undef TIMING
/* disables skipping for testing purposes */
#undef NOCHEAT
/*
* Basic algorithm:
* Stream 1 channel in
* Decimate
* Hanning, 32 point FFT, sum bins, skip N
* EWMA Thresholding
*
*/
#define DECIMATION_FACTOR 2
/* 16 tap FIR filter: 8-12Khz Lowpass */
float lowpass[16] = {
0.0197555,
0.0348693,
0.0115786,
-0.0541005,
-0.0904721,
-0.0071750,
0.1879367,
0.3574557,
0.3574557,
0.1879367,
-0.0071750,
-0.0904721,
-0.0541005,
0.0115786,
0.0348693,
0.0197555
};
typedef struct estimator_state {
float thresh_value;
int trigger;
float smoothed_mean;
float smoothed_var;
uint64_t start;
float alpha;
float hi_thresh;
float trigger_value;
int startup;
int refract;
int refract_interval;
} est_t;
typedef struct evdet_state {
status_context_t *status;
sensor_client_ctx_t *sc;
int fftn;
int fft_points;
int skip_n;
int counter;
int detect_alg;
float *hanning;
float *decimation_state;
float *decimation_coeffs;
int tap_count;
uint64_t last_index;
active_fault_t *xrun;
active_fault_t *nosync;
char *flood_interface;
char *sensor;
char *sensor_clock;
lu_context_t *flood_event;
int clock_id;
int clock_scalar;
est_t e[4];
int active;
int fail_no_sync;
int fail_no_net;
int fail_xrun;
int fail_behind;
int fail_noise_lock;
int detection_count;
int dump_detector;
int dump_spectrum;
int rate;
struct timeval started_at;
uint64_t first_sample;
uint64_t check_after;
int64_t behind_by;
g_event_t *refract_timer;
} evdet_state_t;
void evdet_start(evdet_state_t *ed);
void evdet_notify(evdet_state_t *ed);
static
int schedule_restart(void *data, int period, g_event_t *event)
{
evdet_state_t *ed = (evdet_state_t *)data;
evdet_start(ed);
return EVENT_DONE;
}
static inline
float UpdateEWMA(float new_value, float *state, float alpha)
{
return *state = new_value * (1.0-alpha) + *state * alpha;
}
/* noise estimator */
void evdet_estimator(evdet_state_t *ed, est_t *est, float V, uint64_t start_sample)
{
int this_trigger = 0;
if (est->trigger) {
if ((start_sample - est->start) > MAX_RUN_LENGTH) {
elog(LOG_WARNING, "Detection length exceeded maximum of %d, "
"re-estimating noise", MAX_RUN_LENGTH);
ed->fail_noise_lock++;
/* restarting from the callback seems to be bad */
g_timer_add(0, schedule_restart, ed, NULL, NULL);
}
if (V > est->thresh_value) {
est->refract = est->refract_interval;
}
else if (est->refract > 0) {
est->refract--;
}
else {
est->trigger = 0;
if ((ed->dump_detector || ed->dump_spectrum)) {
elog(LOG_WARNING, "Detection at %lld, length %lld",
est->start, start_sample - est->start);
this_trigger = 1;
}
ed->detection_count++;
/* emit via flooding */
link_pkt_t hdr = {
dst: { id: LINK_BROADCAST },
type: PKT_TYPE_ACOUSTIC_EVENT
};
audio_trigger_pkt_t payload = {
run_length: (start_sample - est->start)
};
sync_id_t src = {
node: my_node_id,
comp: ed->clock_id
};
sync_id_t dest = {
node: my_node_id,
comp: GPS
};
struct timeval src_tv;
ll_to_tv(&src_tv, est->start*ed->clock_scalar);
if (sync_convert_tv(&src, &src_tv, &dest, &(payload.global_start_time)) < 0) {
elog(LOG_WARNING, "couldn't convert from %s to %s: %m\n",
print_sync_id(&src), print_sync_id(&dest));
char str[20];
snprintf(str, sizeof(str), "NoSync");
fault_raise_indicator(str, &(ed->nosync));
ed->fail_no_sync++;
evdet_notify(ed);
}
else {
fault_clear_indicator(ed->nosync);
buf_t *pkt_buf = buf_new();
bufcpy(pkt_buf, &hdr, sizeof(hdr));
bufcpy(pkt_buf, &payload, sizeof(payload));
if (lu_send(ed->flood_event, (link_pkt_t*)(pkt_buf->buf), sizeof(payload)) < 0) {
elog(LOG_WARNING, "Failed to send message to flood adaptor: %m");
ed->fail_no_net++;
evdet_notify(ed);
}
buf_free(pkt_buf);
}
}
}
else {
float thresh = est->hi_thresh*sqrtf(est->smoothed_var) + est->smoothed_mean;
if ((est->startup == 0) && V > thresh) {
est->trigger = 1;
est->refract = est->refract_interval;
est->thresh_value = thresh;
est->start = start_sample;
est->trigger_value = V;
}
else {
UpdateEWMA(V, &(est->smoothed_mean), est->alpha);
UpdateEWMA(sqrf(V-est->smoothed_mean), &(est->smoothed_var), est->alpha);
}
if (est->startup)
est->startup--;
}
if (ed->dump_detector) {
printf("%f %f %f %f ", V, est->smoothed_mean+sqrtf(est->smoothed_var),
est->smoothed_mean+est->hi_thresh*sqrtf(est->smoothed_var),
est->trigger ? V : 0);
if (this_trigger)
printf("\n%f %f %f DETECT",
est->start / 48000.0, start_sample / 48000.0,
est->trigger_value);
}
}
/* on data arrival */
int evdet_data_arrived(void* private_arg, sc_series_t* sample)
{
evdet_state_t *ed = (evdet_state_t *)private_arg;
int i;
int count = sample->num_samples;
float buf[count];
#ifdef TIMING
struct timeval entry;
struct timeval exiting;
gettimeofday(&entry, NULL);
#endif
/* check for xrun */
if (ed->last_index && ed->last_index != sample->start_sample) {
elog(LOG_WARNING, "XRUN!!!");
char str[20];
snprintf(str, sizeof(str), "XRun:%.2f", (sample->start_sample - ed->last_index)/48000.0);
fault_raise_indicator(str, &(ed->xrun));
ed->fail_xrun++;
evdet_notify(ed);
}
ed->last_index = sample->start_sample + sample->num_samples;
if (sample->num_samples != ed->fft_points * DECIMATION_FACTOR) {
elog(LOG_WARNING, "wrong number of samples: %d", sample->num_samples);
goto done;
}
/* check how far behind */
if (ed->last_index > ed->check_after) {
if (ed->first_sample == 0) ed->first_sample = sample->start_sample;
ed->check_after += 5*ed->rate;
ed->behind_by = msec_since(&(ed->started_at));
ed->behind_by *= (ed->rate / 1000);
ed->behind_by -= (ed->last_index - ed->first_sample);
if (ed->behind_by > ed->rate) {
elog(LOG_WARNING, "Over 1 sec behind.. restarting detector");
ed->fail_behind++;
evdet_notify(ed);
/* restarting from the callback seems to be bad */
g_timer_add(0, schedule_restart, ed, NULL, NULL);
return EVENT_RENEW;
}
}
/* inc counter */
if (ed->skip_n > 0) ed->counter = (ed->counter + 1)%(ed->skip_n + 1);
#ifndef NOCHEAT
/* if we're skipping, then just pre-load the state vector */
if (ed->counter) {
if (ed->counter == ed->skip_n)
for (i=0; i<ed->tap_count; i++) {
/* decimation state is ordered from newest to oldest */
ed->decimation_state[i] = ((int16_t*)(sample->data->buf))[sample->num_samples-1-i];
}
goto done;
}
#endif
/* copy to float working buffer */
for (i=0; i<count; i++)
buf[i] = ((int16_t*)(sample->data->buf))[i];
/* decimate in place */
decimf(DECIMATION_FACTOR, ed->tap_count, ed->decimation_coeffs,
ed->decimation_state, sample->num_samples, buf,
buf, &count);
#ifdef NOCHEAT
if (ed->counter) {
goto done;
}
#endif
/* now hanning and FFT */
nl_mult_into(buf, ed->hanning, ed->fft_points);
complex *fft = nl_fft_inplace(buf, ed->fft_points);
#ifdef NOCHEAT
float norms[16];
for (i=0; i<16; i++) norms[i] = Cnorm(fft[i]);
goto done;
#endif
/* OK, at this point buf[0..15] are 0-12 KHz.
* now we select our frequencies (bins 3-4) */
if (!(ed->dump_detector || ed->dump_spectrum)) {
switch (ed->detect_alg) {
case 1:
evdet_estimator(ed, &(ed->e[0]), Cnorm(Cadd(fft[3], fft[4])), ed->last_index);
break;
case 2:
evdet_estimator(ed, &(ed->e[0]), Cnorm(fft[3])+Cnorm(fft[4]), ed->last_index);
break;
case 3:
evdet_estimator(ed, &(ed->e[0]), Cnorm(fft[3])*Cnorm(fft[4]), ed->last_index);
break;
default:
break;
}
}
else {
if (ed->dump_spectrum) {
/* (destructive) magnitude */
for (i=0; i<(ed->fft_points/2); i++)
buf[i] = Cnorm(fft[i]);
for (i=0; i<16; i++) {
printf("%f %d %f\n", ed->last_index / 48000.0, i, buf[i]);
}
printf("\n");
}
else if (ed->dump_detector) {
float cs = Cnorm(Cadd(fft[4], Cadd(fft[5], Cadd(fft[6], fft[7]))));
#if 0
/* (destructive) magnitude */
for (i=0; i<(ed->fft_points/2); i++)
buf[i] = Cnorm(fft[i]);
/* OK, at this point buf[0..15] are 0-12 KHz.
* now we select our frequencies (bins 4-7) */
float sum = buf[4]+buf[5]+buf[6]+buf[7];
float m = buf[4]*buf[5]*buf[6]*buf[7];
#endif
printf("%f ", ed->last_index / 48000.0);
/* Now estimate noise and detect */
evdet_estimator(ed, &(ed->e[1]), cs, ed->last_index);
//evdet_estimator(ed, &(ed->e[2]), sum, ed->last_index);
//evdet_estimator(ed, &(ed->e[3]), m, ed->last_index);
printf("\n");
}
}
done:
#ifdef TIMING
gettimeofday(&exiting, NULL);
misc_tv_sub(&exiting, &entry);
static uint64_t global_usec = 0;
global_usec += exiting.tv_usec;
printf("%ld, total %lld\n", exiting.tv_usec, global_usec);
#endif
return EVENT_RENEW;
}
void evdet_start(evdet_state_t *ed)
{
elog(LOG_NOTICE, "Starting streaming detector...");
gettimeofday(&ed->started_at, NULL);
ed->first_sample = 0;
ed->check_after = 0;
/* set clock id */
ed->clock_id = sync_clock_no_create(sdev_path(ed->sensor_clock));
/* open stream */
if (ed->sc) sc_destroy(ed->sc);
if (sc_open_stream(ed->sensor, 0, 1, ed->fft_points * DECIMATION_FACTOR,
evdet_data_arrived, ed, &(ed->sc)) < 0) {
elog(LOG_WARNING, "Unable to open sensor device: %m");
return;
}
/* clear past fault */
fault_clear_indicator(ed->xrun);
/* init counter */
ed->counter = 0;
ed->last_index = 0;
/* init smoothing filter */
int i;
for (i=0; i<4; i++) {
ed->e[i].startup = 300;
ed->e[i].trigger = 0;
ed->e[i].smoothed_mean = 0;
ed->e[i].smoothed_var = 0;
ed->e[i].refract = 0;
}
/* pre-compute hanning function */
if (ed->hanning) free(ed->hanning);
ed->hanning = nl_hanning(ed->fft_points);
/* initialize decimation filter */
if (ed->decimation_state) free(ed->decimation_state);
ed->decimation_state = malloc(sizeof(float)*ed->tap_count);
memset(ed->decimation_state, 0, sizeof(float)*ed->tap_count);
}
void evdet_stop(evdet_state_t *ed)
{
elog(LOG_NOTICE, "Stopping streaming detector...");
/* close stream */
if (ed->sc) sc_destroy(ed->sc);
}
/* rate limit notifications */
int evdet_notify_aux(void *data, int period, g_event_t *event)
{
evdet_state_t *ed = (evdet_state_t *)data;
g_status_dev_notify(ed->status);
return EVENT_DONE;
}
void evdet_notify(evdet_state_t *ed)
{
if (ed->refract_timer) return;
g_timer_add(1000, evdet_notify_aux, ed, NULL, &(ed->refract_timer));
}
static
int evdet_bin_status(status_context_t *info, buf_t *buf)
{
evdet_state_t *ed = (evdet_state_t *)sd_data(info);
evdet_status_t s = {
detect_alg: ed->detect_alg,
skip_n: ed->skip_n,
fftn: ed->fftn,
active: ed->active,
hi_thresh: ed->e[0].hi_thresh,
refract_interval: ed->e[0].refract_interval,
alpha: ed->e[0].alpha,
fail_no_sync: ed->fail_no_sync,
fail_no_net: ed->fail_no_net,
fail_xrun: ed->fail_xrun,
fail_behind: ed->fail_behind,
behind_by: ed->behind_by / (float)(ed->rate)
};
bufcpy(buf, &s, sizeof(s));
return STATUS_MSG_COMPLETE;
}
static
int evdet_print_status(status_context_t *info, buf_t *buf)
{
evdet_state_t *ed = (evdet_state_t *)sd_data(info);
bufprintf(buf,
"Acoustic Event Detector (Node %d)\n"
"-----------------------\n\n"
"Parameters:\n"
" active: %d\n"
" alg: %d\n"
" fftn: %d\n"
" points: %d\n"
" skip_n: %d\n"
" thresh: %f\n"
" refract: %d\n"
" alpha: %f\n"
"\n"
"Faults:\n"
" Sync: %d\n"
" Xrun: %d\n"
" Net: %d\n"
" Lock: %d\n"
" Behind: %d\n"
"\n"
"%d detections\n"
"Behind by %f sec\n\n"
,
my_node_id,
ed->active,
ed->detect_alg,
ed->fftn,
ed->fft_points,
ed->skip_n,
ed->e[0].hi_thresh,
ed->e[0].refract_interval,
ed->e[0].alpha,
ed->fail_no_sync,
ed->fail_xrun,
ed->fail_no_net,
ed->fail_noise_lock,
ed->fail_behind,
ed->detection_count,
ed->behind_by / (float)(ed->rate)
);
return STATUS_MSG_COMPLETE;
}
static
int evdet_command(status_context_t *ctx, char *command, size_t buf_size)
{
evdet_state_t *ed = (evdet_state_t *)sd_data(ctx);
parser_state_t *ps = misc_parse_init(command, MISC_PARSE_COLON_SCHEME);
int retval = EVENT_RENEW;
while (misc_parse_next_kvp(ps) >= 0) {
if (strcmp(ps->key, "fftn") == 0) {
if (ps->value) {
ed->fftn = atoi(ps->value);
if (ed->fftn == 0) ed->fftn = 32;
ed->fft_points = 1 << ed->fftn;
}
}
else if (strcmp(ps->key, "alg") == 0) {
if (ps->value) {
ed->detect_alg = atoi(ps->value);
if (ed->detect_alg == 0) ed->detect_alg = 1;
}
}
else if (strcmp(ps->key, "skip_n") == 0) {
if (ps->value) {
ed->skip_n = atoi(ps->value);
}
}
/* NOTE alg must be set first!! */
else if (strcmp(ps->key, "thresh") == 0) {
if (ps->value) {
if (1 != sscanf(ps->value, "%f", &(ed->e[0].hi_thresh))) {
ed->e[0].hi_thresh = 5;
}
}
}
else if (strcmp(ps->key, "refract") == 0) {
if (ps->value) {
ed->e[0].refract_interval = atoi(ps->value);
if (ed->e[0].refract_interval == 0)
ed->e[0].refract_interval = 200;
}
}
else if (strcmp(ps->key, "alpha") == 0) {
if (ps->value) {
if (1 != sscanf(ps->value, "%f", &(ed->e[0].alpha))) {
ed->e[0].alpha = 0.999;
}
}
}
else if (strcmp(ps->key, "start") == 0) {
ed->active = 1;
}
else if (strcmp(ps->key, "stop") == 0) {
ed->active = 0;
}
else {
elog(LOG_WARNING, "Unrecognized command: %s", command);
}
}
int i;
for (i=1; i<3; i++) {
ed->e[i] = ed->e[0];
}
/* restarting */
evdet_stop(ed);
if (ed->active)
evdet_start(ed);
evdet_notify(ed);
return retval;
}
/*
* main and initialization
*/
void usage(char *name)
{
misc_print_usage
(name,
"--sensor <sensor-name> [--sensor_clock <sensor-name>] [--clock_scale <factor>] \n"
" [--dump_detect] [--dump_spect] ",
" --sensor <sensor-name>: the sensor to record\n"
" --sensor_clock <sensor-name>: the sensor timebase to use\n"
" --clock_scale <factor>: clock scale to usec\n"
" --dump_detect: dump detection profile info\n"
" --dump_spect: dump spectragram\n"
" \n"
);
exit(1);
}
void evdet_shutdown(void *data)
{
elog_g(LOG_NOTICE, "Event Detector shutting down");
exit(0);
}
int main(int argc, char **argv)
{
evdet_state_t ed = {
fft_points: 32,
skip_n: 3,
tap_count: 16,
decimation_coeffs: lowpass,
flood_interface: "flood",
detect_alg: 1,
clock_scalar: 1,
rate: 48000,
e: {
/* runtime */
{
alpha: 0.999,
startup: 300,
hi_thresh: 32,
refract_interval: 40
},
/* complex sum */
{
alpha: 0.999,
startup: 300,
hi_thresh: 32,
refract_interval: 40
},
/* sum of norms */
{
alpha: 0.999,
startup: 300,
hi_thresh: 32,
refract_interval: 40
},
/* product of norms */
{
alpha: 0.999,
startup: 300,
hi_thresh: 32,
refract_interval: 40
},
}
};
/* generic init */
misc_init(&argc, argv, CVSTAG);
if (misc_parse_out_switch(&argc, argv, "help", 'h'))
usage(argv[0]);
/* sensor */
ed.sensor = misc_parse_out_option(&argc, argv, "sensor", 0);
if (ed.sensor == NULL) {
elog(LOG_WARNING, "Missing required argument sensor <name>");
exit(1);
}
ed.sensor_clock = misc_parse_out_option(&argc, argv, "sensor_clock", 0);
if (ed.sensor_clock == NULL)
ed.sensor_clock = ed.sensor;
ed.dump_detector = misc_parse_out_switch(&argc, argv, "dump_detect", 0);
ed.dump_spectrum = misc_parse_out_switch(&argc, argv, "dump_spect", 0);
misc_parse_option_as_int(&argc, argv, "clock_scale", 0,
&(ed.clock_scalar));
if (misc_args_remain(&argc, argv))
usage(argv[0]);
/* open local flood adaptor */
lu_opts_t opts = {
opts: {
name: ed.flood_interface,
data: &ed
},
};
if (lu_open(&opts, &(ed.flood_event)) < 0) {
elog(LOG_CRIT, "Unable to open flood adapter: %m");
if (!(ed.dump_detector || ed.dump_spectrum))
;//exit(1);
}
/* trigger to start experiment */
status_dev_opts_t cmd_opts = {
device: {
devname: EVENT_DETECT_STATUS,
device_info: &ed
},
write: evdet_command,
printable: evdet_print_status,
binary: evdet_bin_status
};
if (g_status_dev(&cmd_opts, &(ed.status)) < 0) {
elog(LOG_CRIT, "can't create command dev: %m");
exit(1);
}
emrun_opts_t emrun_opts = {
shutdown: evdet_shutdown,
data: &ed
};
emrun_init(&emrun_opts);
/* run */
elog_g(LOG_INFO, "Event Detection daemon starting...");
g_main();
return 0;
}
See more files for this project here