Show quality_main.c syntax highlighted
/* ex: set tabstop=2 expandtab shiftwidth=2 softtabstop=2: */
/*
*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/*
*
* Author: Nithya Ramanthan
*
*/
#include <emrun/emrun.h>
#include <libdev/status_dev.h>
#include <libmisc/misc.h>
#include <./confidence_quality.h>
#include <../include/confidence.h>
#include <link/link.h>
#define MAX_ACTION_TS 30
#define DEFAULT_SINK_MAC_LINK "mote0"
typedef struct action {
int total;
char action[MAX_ACTION_STRING];
} action_t;
action_t actions[MAX_ACTION_TS];
pd_context_t* pd = NULL;
pd_context_t* pd_network = NULL;
static
char* network_information_ascii(network_information_t att)
{
switch(att) {
case(HEARTBEAT_I): return "Heartbeat";
case(SENSOR_DATA_PKT_I): return "Sensor Data";
case(ROUTE_I): return "Route";
case(NEIGHBORS): return "Neighbors";
case(INVERSE_NEIGHBORS): return "Inverse Neighbors";
case(NUM_PKTS_RX): return "Num Pkts Rx";
case(NUM_PKTS_RX_CRC): return "Num CRC Rx";
case(ANY_PKT): return "Anything";
default: return "Unrecognized";
}
return "Unrecognized";
}
static
void copy_params(sensor_params_t* src, sensor_params_t* dst,
int node_id, int sensor)
{
if (dst->set) {
elog(LOG_WARNING, "WARNING: Overwriting params for node %d Sensor %d\n",
node_id, sensor);
if (src->error) {
elog(LOG_DEBUG(1), "Not copying params dest is already set and src has error\n");
return;
}
}
memcpy(dst, src, sizeof(sensor_params_t));
}
static
int min(int a, int b)
{
if (a < b) return a;
return b;
}
static
void sensor_check_gte(char* first_var, int first_var_int,
char* second_var, int second_var_int, sensor_params_t* params)
{
if (!(first_var_int >= second_var_int)) {
memset(params, 0, sizeof(sensor_params_t));
elog(LOG_ERR, "CLEARNING STRUCT%s (%d) should be >= %s (%d)\n",
first_var, first_var_int, second_var, second_var_int);
params->error = 1;
}
}
static
void string_mins_since(confidence_ctr_t* ctr, char* str, int size)
{
int my_time;
if (!ctr) snprintf(str, size, "NONE");
if (ctr->last_updated.tv_sec == 0) snprintf(str, size, "Never");
memset(str, 0, size);
my_time = (int) msec_since(&ctr->last_updated)/60000;
snprintf(str, size, "%d", my_time);
}
static
void confidence_shutdown(void *data)
{
}
static
void usage(char* argv[])
{
elog(LOG_ERR, "\n\tUsage: %s -W <watch-link> [-c <config-file> -d <data-cluster-init> -n <network-cluster-init>] \n",
argv[0]);
elog(LOG_ERR, "\t\t data/network-cluster-init contain data to initiailze data/network clusters\n");
exit(1);
}
static
void print_actions_info(buf_t* buf, sensor_quality_t* sensor,
int clear_array)
{
int j, ctr = 0;
node_quality_t* node = find_add_node(sensor->node_id, 0);
// Print actions taken
for (j = 0; j < MAX_ACTIONS_STORED; j++) {
if (sensor->printed_actions_taken[j] >= 0) {
if (actions[sensor->printed_actions_taken[j]].total > 0) {
bufprintf(buf, "\n\tTOOK %s,",
actions[sensor->printed_actions_taken[j]].action);
}
}
}
// Don't change the local copy - change the global copy
{
sensor_quality_t* my_sensor = find_add_sensor(find_add_node(sensor->node_id, 0),
sensor->sensor, 0);
if ((my_sensor) && (clear_array)) {
clear_actions_array(my_sensor->printed_actions_taken);
}
}
// Check if we should even print actions for this sensor
if (sensor->broken) {
if (sensor->no_action_needed) bufprintf(buf, "\t NO ACTION NEEDED");
else if (node && sensor->sensor == NETWORK_SENSOR &&
node->node_localized > 0) {
bufprintf(buf, "\t LOCALIZED TO %d\n", node->node_localized);
}
// Print actions to take
else {
for (j = 0; j < MAX_ACTIONS_STORED; j++) {
if (sensor->actions_to_take[j] > -1) {
ctr++;
bufprintf(buf, "\n\t => %s", get_action(sensor->actions_to_take[j]));
}
}
// If the sensor is broken but no actions are available, then
// notify the user of that as well
// Can't just check if slot 0 == -1 because as actions are taken,
// they are removed from the array.
if (ctr == 0) {
bufprintf(buf, "\t NO ACTIONS TO TAKE!");
}
}
}
}
static
void print_network_sensor(buf_t* buf, node_quality_t* nodeQ)
{
int j;
char x[100];
for (j = 0; j < MAX_NETWORK_INFORMATION; j++) {
string_mins_since(&nodeQ->node_information[j], x, sizeof(x));
bufprintf(buf, "%d (%s mins) ",
nodeQ->node_information[j].value, x);
}
}
static
void print_data_sensor(buf_t* buf, sensor_quality_t* sensor)
{
int j, print_error = 0;
if (!sensor) return;
if (sensor->sensor == BATTERY) return;
for (j = 0; j < MAX_DATA_ATTRIBUTES; j++) {
if ((!sensor->params.set) && (!print_error)) {
print_error = 1;
bufprintf(buf, " NO PARAMS");
}
bufprintf(buf, " (%d)", sensor->attributes_values[j]);
}
}
static
void confidence_print_network (pd_context_t *pd, buf_t *response,
const void *data, int packetlen)
{
sensor_quality_t* sensor = (sensor_quality_t *)data;
static int ctr = 0;
node_quality_t* nodeQ = NULL;
if (packetlen != sizeof(sensor_quality_t)) {
elog(LOG_ERR, "ERROR: Sample size %d != sensor_quality_t (%d)\n",
packetlen, sizeof(sensor_quality_t));
return;
}
if (!(nodeQ = find_add_node(sensor->node_id, 1))) {
elog(LOG_ERR, "ERROR unable to find/add node %d\n",
sensor->node_id);
return;
}
if (nodeQ->suppressed) return;
if (ctr % 30 == 0) {
int i;
bufprintf(response, "\n# Node, ");
for (i = 0; i < MAX_NETWORK_INFORMATION; i++) {
bufprintf(response, "%s, ", network_information_ascii(i));
}
bufprintf(response, "Seconds since heard... \n\tActions to take ...\n");
}
ctr++;
bufprintf(response, "\n%d, ", sensor->node_id);
print_network_sensor(response, nodeQ);
print_actions_info(response, sensor, 1);
}
void confidence_print_data (pd_context_t *pd, buf_t *response,
const void *data, int packetlen)
{
sensor_quality_t* sensor = (sensor_quality_t *)data;
static int ctr = 0;
if (ctr % 30 == 0) {
int i;
bufprintf(response, "\n# Node, Sensor, TimeStamp, Data-Value, ");
for (i = 0; i < MAX_DATA_ATTRIBUTES; i++) {
bufprintf(response, "%s, ", data_attributes_ascii(i));
}
bufprintf(response, "\n\tActions to take ...\n");
}
ctr++;
if (packetlen != sizeof(sensor_quality_t)) {
elog(LOG_ERR, "ERROR: Sample size %d != sensor_quality_t (%d)\n",
packetlen, sizeof(sensor_quality_t));
return;
}
bufprintf(response, "\n%d, %s, %d, %d, ",
sensor->node_id, sensor_ascii(sensor->sensor),
// Print the most recent time-val
(int) sensor->data.last_updated.tv_sec,
sensor->data.value);
print_data_sensor(response, sensor);
print_actions_info(response, sensor, 1);
}
static
confidence_sensor_t ascii_sensor(char* key)
{
char* tmp = key;
int i = 0;
// Ignore leading space
while ((key[i] == ' ') && (i < strlen(key))) i++;
tmp = &key[i];
if (!strncasecmp(tmp, "temperature", strlen("temperature"))) return TEMPERATURE;
if (!strncasecmp(tmp, "moisture", strlen("moisture"))) return MOISTURE;
if (!strncasecmp(tmp, "ammonium", strlen("ammonium"))) return AMMONIUM;
if (!strncasecmp(tmp, "nitrate", strlen("nitrate"))) return NITRATE;
if (!strncasecmp(tmp, "calcium", strlen("calcium"))) return CALCIUM;
if (!strncasecmp(tmp, "battery", strlen("battery"))) return BATTERY;
if (!strncasecmp(tmp, "network", strlen("network"))) return NETWORK_SENSOR;
if (!strncasecmp(tmp, "data_period", strlen("network"))) return DATA_PERIOD;
return -1;
}
static
int write_node_info(status_context_t *ctx, char* command, size_t size)
{
sensor_quality_t* sensorQ = NULL;
node_quality_t* nodeQ = NULL;
char tmp_packet[size + 1];
parser_state_t ps = {
input:(void *) tmp_packet,
input_len:size + 1
};
char my_action[MAX_ACTION_STRING];
int status = -1, node_id = -1;
int sensor = NETWORK_SENSOR;
int low = -1, high = -1, time_hours = -1;
memset(my_action, 0, MAX_ACTION_STRING);
/* Hack becuase misc_parse_next_kvp needs the last byte to be NULL */
strncpy(tmp_packet, command, size);
tmp_packet[size] = 0;
while (misc_parse_next_kvp(&ps) >= 0) {
// Error Checking
if (!ps.key || ps.key == "") continue;
if (*ps.key == 0) continue;
if (!ps.value || ps.value == "" || *ps.value == 0) {
elog(LOG_ERR, "ERROR Key %s Must have an assigned value! See usage by catting device\n",
ps.key);
goto done;
}
// Parse the arguments
if (strcmp(ps.key, "node") == 0) {
node_id = atoi(ps.value);
}
else if (strcmp(ps.key, "sensor") == 0) {
sensor = ascii_sensor(ps.value);
}
else if (strcmp(ps.key, "status") == 0) {
status = atoi(ps.value);
}
else if (strcmp(ps.key, "action") == 0) {
strncpy(my_action, ps.value, min(strlen(ps.value), MAX_ACTION_STRING));
}
else if (strcmp(ps.key, "low") == 0) {
low = atoi(ps.value);
if (high == -1) high = low;
}
else if (strcmp(ps.key, "high") == 0) {
high = atoi(ps.value);
if (low == -1) low = high;
}
else if (strcmp(ps.key, "time") == 0) {
time_hours = atoi(ps.value);
}
else {
elog(LOG_ERR, "ERROR Unrecognized key: %s\n", ps.key);
}
}
if (!(nodeQ = find_add_node(node_id, 0))) goto done;
if (status >= 0) {
nodeQ->suppressed = !status;
goto done;
}
sensorQ = find_add_sensor(nodeQ, sensor, 0);
if (sensorQ) {
// Check if this is a data validation action
if (time_hours > 0) {
if (low > -1 && high > -1) {
int i, set = 0;
if (! (low <= high)) {
elog(LOG_ERR, "ERROR: low (%d) must be <= high (%d)\n", low, high);
goto done;
}
for (i = 0; i < MAX_RANGES; i++) {
if (!set && sensorQ->time_valid_hours[i] == 0) {
sensorQ->time_valid_hours[i] = time_hours;
set_bit(&sensorQ->low[i], low);
set_bit(&sensorQ->high[i], high);
set = 1;
}
}
if (!set) elog(LOG_ERR, "Try increasing MAX_RANGES. Could not add new range!\n");
}
else elog(LOG_ERR, "Invalid command: %s!\n At least either low or high should be set!\n",
command);
}
else if (strlen(my_action) > 0) {
int action_index =
find_add_action(my_action, strlen(my_action));
add_action(sensorQ->actions_taken, action_index);
add_action(sensorQ->printed_actions_taken, action_index);
// If this is a network sensor, then checking for actions for
// this sensor should be delayed until epoch seconds after the
// action was taken to see if there is any benefit.
if (sensor == NETWORK_SENSOR) {
set_timer(nodeQ);
}
}
else elog(LOG_ERR, "Invalid command: %s!\n", command);
}
done:
return EVENT_RENEW;
}
static
int print_node_info(status_context_t *ctx, buf_t *buf)
{
int i, j, k;
sensor_quality_t* sensor;
char x[100];
bufprintf(buf, "\t");
for (i = 0; i < MAX_NETWORK_INFORMATION; i++) {
bufprintf(buf, "%s, ", network_information_ascii(i));
}
bufprintf(buf, "\n\t\t");
for (i = 0; i < MAX_DATA_ATTRIBUTES; i++) {
bufprintf(buf, "%s, ", data_attributes_ascii(i));
}
bufprintf(buf, "\n");
for (i = 0; i < MAX_NODES; i++) {
if (nodesQ[i].node_id <= 0) continue;
bufprintf(buf, "\nNode %d ", nodesQ[i].node_id);
if (nodesQ[i].suppressed) {
bufprintf(buf, " SUPPRESSED!");
continue;
}
bufprintf(buf, "\n\t");
print_network_sensor(buf, &nodesQ[i]);
print_actions_info(buf, &nodesQ[i].network_sensor, 0);
if ((nodesQ[i].node_id != my_node_id) && data_sensors_connected(&nodesQ[i])) {
for (j = 0; j < MAX_SENSORS; j++) {
sensor = &nodesQ[i].data_sensors[j];
if (sensor->sensor > 0) {
string_mins_since(&sensor->data, x, sizeof(x));
bufprintf(buf, "\n\t\t %s (%s mins) %d",
sensor_ascii(sensor->sensor), x,
sensor->data.value);
print_data_sensor(buf, sensor);
print_actions_info(buf, sensor, 0);
}
}
}
}
bufprintf(buf, "\n\n*** COMMANDS THIS DEVICE ACCEPTS ***\n");
bufprintf(buf, "\nTo set the print status of a node:\n node=<int>:status=1[enable]/0[disable]\n");
bufprintf(buf, "\nTo update an action:\n node=<int>:sensor=<int>:action=<index/name>\n");
bufprintf(buf, " * sensor => Default is 'network'\n"
" * 'action' is either generic string < %d characters"
"\n\tor index associated with previously entered actions: \n",
MAX_ACTION_STRING);
k = 0;
for (i = 0; i < MAX_ACTION_TS; i+=2) {
if (actions[i].action[0]) {
bufprintf(buf, " %d:%s (%d times)\t",
i, actions[i].action, actions[i].total);
k++;
}
if (actions[i+1].action[0]) {
bufprintf(buf, " %d:%s (%d times)\n",
i+1, actions[i+1].action, actions[i+1].total);
k++;
}
}
if (k == 0) bufprintf(buf, "\t\tNONE ");
bufprintf(buf, "\n");
bufprintf(buf, "\nValidate a range of data:\n node=<int>:sensor=<int>:low/high=<int [mV]>:time=<Duration valid [hours]>\n");
return EVENT_RENEW;
}
static
int print_sensor_params(status_context_t *ctx, buf_t *buf)
{
int i, j, k;
sensor_quality_t* sensor;
bufprintf(buf, "Node ID\t Sensor\t LDR \t NLDR\n");
bufprintf(buf, " N/LDR (Non/linear detect range): Upper/Lower [mV]\n");
for (i = 0; i < MAX_NODES; i++) {
if ((nodesQ[i].node_id <= 0) || (nodesQ[i].node_id == my_node_id)) continue;
bufprintf(buf, "%d", nodesQ[i].node_id);
for (j = 0; j < MAX_SENSORS; j++) {
sensor = &nodesQ[i].data_sensors[j];
if (sensor->sensor <= 0) continue;
bufprintf(buf, "\t %s", sensor_ascii(sensor->sensor));
if (sensor->params.error) {
bufprintf(buf, "\t Error on Entry (See log)\n");
}
else if (sensor->params.set) {
bufprintf(buf, "\t %d/%d %d/%d\n",
sensor->params.lower2.LDR_mv,
sensor->params.upper1.LDR_mv,
sensor->params.lower1.NLDR_mv,
sensor->params.upper2.NLDR_mv);
}
else bufprintf(buf, " No Params Entered\n");
for (k = 0; k < MAX_RANGES; k++) {
if (sensor->time_valid_hours[k] > 0) {
int time_valid = msec_since(&sensor->low[k].last_updated) / 3600000;
bufprintf(buf, "\t\t %d - %d mV (%d hours)\n", sensor->low[k].value,
sensor->high[k].value, sensor->time_valid_hours[k] - time_valid);
}
}
}
}
bufprintf(buf, "\n\nTo Update Parameters: echo <param-line> > %s\n",
sd_device_name(ctx));
bufprintf(buf, " <param-line>:\n");
bufprintf(buf, "\n\t\tNode-id,Sensor,LDR Lower, Upper, NLDR Lower," \
"Upper\n \nSee config_file_sample.txt for explanation/sample\n");
return EVENT_RENEW;
}
static
int write_sensor_params(status_context_t *ctx, char* command, size_t buf_size)
{
parser_state_t* ps = misc_parse_init(command, MISC_PARSE_COMMA_SCHEME);
int ctr = 0;
sensor_quality_t* sensorQ = NULL;
node_quality_t* nodeQ = NULL;
/* If a node id == * in the config file then this parameter is
* added for past, current, and future nodes.
* If so, store the information in the params struct. */
int add_param_to_all_nodes = 0;
sensor_params_t params = {};
int sensor = -1;
int node_id = -1;
int i;
if (!ps) {
elog(LOG_ERR, "Could not assign parser_state!\n");
return EVENT_RENEW;
}
while(misc_parse_next_kvp(ps) >= 0) {
if (!ps->key || (ps->key == "") || ((int) *ps->key == '\n')) {
continue;
}
switch (ctr) {
/*** The first field is the node ID ***/
case 0: {
node_id = atoi(ps->key);
/* If its a '*' then this line applies to all nodes */
if (*ps->key == '*') {
add_param_to_all_nodes = 1;
}
else if (!(nodeQ = find_add_node(node_id, 1))) {
goto done;
}
break;
}
/*** The second field is the sensor ***/
case 1: {
sensor = ascii_sensor(ps->key);
break;
}
/* The next four fields are the: LDR/NLDR Lower/Upper values */
case 2:
params.lower2.LDR_mv = atoi(ps->key);
break;
case 3:
params.upper1.LDR_mv = atoi(ps->key);
break;
case 4:
params.lower1.NLDR_mv = atoi(ps->key);
break;
case 5:
params.upper2.NLDR_mv = atoi(ps->key);
break;
default:
elog(LOG_WARNING, "%s is the %d th input! We only need %d\n",
ps->key, ctr, ctr);
break;
}
ctr++;
}
if (sensor == DATA_PERIOD) {
if (add_param_to_all_nodes) {
for (i = 0; i < MAX_NODES; i++) {
nodesQ[i].data_period = params.lower2.LDR_mv;
}
}
else if (nodeQ) nodeQ->data_period = params.lower2.LDR_mv;
else elog(LOG_ERR, "Unable to find struct for node %d!\n",
node_id);
goto done;
}
if (ctr < 6) {
elog(LOG_DEBUG(2), "Nothing more to do for line: %s\n", command);
goto done;
}
// Check if params are correct before saving them off.
params.set = 1;
sensor_check_gte("NLDR_upper", params.upper2.NLDR_mv,
"LDR_upper", params.upper1.LDR_mv, ¶ms);
sensor_check_gte("LDR_upper", params.upper1.LDR_mv,
"LDR_lower", params.lower2.LDR_mv, ¶ms);
sensor_check_gte("LDR_lower", params.lower2.LDR_mv,
"NLDR_lower", params.lower1.NLDR_mv, ¶ms);
if (add_param_to_all_nodes) {
sensor_quality_t* mySensor;
// Copy params to all nodes. So even future nodes that are
// added will have access to these params.
for (i = 0; i < MAX_NODES; i++) {
if ((mySensor = find_add_sensor(&nodesQ[i], sensor, 1))) {
copy_params(¶ms, &mySensor->params, node_id, sensor);
}
}
}
else {
if (!(sensorQ = find_add_sensor(nodeQ, sensor, 1))) {
goto done;
}
copy_params(¶ms, &sensorQ->params, node_id, sensor);
// Set the node_id for any sensors that may have had params set
// already using the * functionality
for (i = 0; i < MAX_SENSORS; i++) {
if (nodeQ->data_sensors[i].sensor > 0) {
nodeQ->data_sensors[i].node_id = nodeQ->node_id;
}
}
}
done:
misc_parse_cleanup(ps);
return EVENT_RENEW;
}
// Read the config-file. Ignore text after "#"
static
void init_config(char* config_file)
{
FILE *input_file_pointer;
/* variable to read in the words from the file */
char line[500];
int ctr, good_ctr;
/* open input file n.b. no test for the case that filled does
* not exist */
if (!(input_file_pointer = fopen(config_file, "r"))) {
elog(LOG_ERR, "Unable to open config file: %s\n", config_file);
return;
}
/* use while construction to use fscanf until an EOF is reached */
/* %s reads a string - a word and != means not equal */
/* do not place semicolon at the end of the line */
/* do not put & before word as it is an array */
while( fgets(line, sizeof(line), input_file_pointer)!=NULL) {
ctr = 0; good_ctr = 0;
// Ignore all characters after the pound
while ((line[ctr] != '\n') && (line[ctr] != '#')) {
if (line[ctr] != ' ') good_ctr++;
ctr++;
}
if (good_ctr > 0) {
// NR FIXME - this is the wrong val - should probably pass
// ctr not good_ctr - the string is longer than good_ctr.
write_sensor_params(NULL, line, good_ctr);
}
}
}
static
int init_ui()
{
status_dev_opts_t opts = {
device: {
devname: CONFIDENCE_PARAM_STATUS
},
printable: print_sensor_params,
write: write_sensor_params
};
/* Setup both status-devices */
if (g_status_dev(&opts, NULL) < 0) {
elog(LOG_ERR, "Unable to create status-device %s: %m\n",
opts.device.devname);
exit(1);
}
opts.device.devname = CONFIDENCE_NODE_STATUS;
opts.write = write_node_info;
opts.printable = print_node_info;
if (g_status_dev(&opts, NULL) < 0) {
elog(LOG_ERR, "Unable to create status-device %s: %m\n",
opts.device.devname);
exit(1);
}
return 0;
}
static
int handle_good_packets(lu_context_t* lu, link_pkt_t* pkt, ssize_t data_len)
{
node_quality_t* sink_node = find_add_node(my_node_id, 1);
set_bit(&sink_node->node_information[NUM_PKTS_RX],
sink_node->node_information[NUM_PKTS_RX].value+1);
g_free(pkt);
return EVENT_RENEW;
}
static
int handle_error_packets(void *pkt, ssize_t len,
pd_client_context_t* pdc)
{
node_quality_t* sink_node = find_add_node(my_node_id, 1);
set_bit(&sink_node->node_information[NUM_PKTS_RX_CRC],
sink_node->node_information[NUM_PKTS_RX_CRC].value+1);
g_free(pkt);
return EVENT_RENEW;
}
// ************** API ********************
char* data_attributes_ascii(data_attributes_t att)
{
switch(att) {
case(STD_DEVIATION): return "Standard Deviation";
case(GRADIENT): return "Gradient";
case(NLDR): return "Non-Linear Detect";
case(DISTANCE_FROM_LIMIT): return "Distance from Limit";
default: return "Unrecognized";
}
return "Unrecognized";
}
void add_action(int all_actions[], int action_index)
{
int i, empty_index = -1;
if (action_present(all_actions, action_index, NULL)) return;
// If the string does not exist in actions array, then add it
for (i = 0; i < MAX_ACTIONS_STORED; i++) {
if (all_actions[i] == action_index) {
return;
}
if ((empty_index == -1) && (all_actions[i] == -1)) {
empty_index = i;
}
}
all_actions[empty_index] = action_index;
}
int find_add_action(char* str, int ssize)
{
int i, index = atoi(str);
if (!str) return -1;
// If str is an integer (either if atoi returns something
// other than 0, or if it is 0 then check that the actualy
// str was actully "0"), then if the index is valid, return
// the corresponding action otherwise return NULL
if ((index != 0) || (!strcmp(str, "0"))) {
if (actions[index].total) goto done;
return -1;
}
index = -1;
for (i = 0; i < MAX_ACTION_TS; i++) {
if (!strcmp(actions[i].action, str)) {
index = i;
goto done;
}
else if ((index == -1) && (actions[i].total == 0)) {
index = i;
}
}
// If action does not exist, then see if there is an empty location
// and copy it over
if (index >= 0) strncpy(actions[index].action, str, ssize);
else {
elog(LOG_ERR,"ERROR actions array full! Cant save action:%s\n", str);
return -1;
}
done:
actions[index].total++;
return index;
}
char* sensor_ascii(confidence_sensor_t sensor)
{
switch(sensor) {
case (NETWORK_SENSOR):
return "network";
case (AMMONIUM):
return "ammonium";
case (BATTERY):
return "battery";
case (NITRATE):
return "nitrate";
case (CALCIUM):
return "calcium";
case (TEMPERATURE):
return "temperature";
case (MOISTURE):
return "moisture";
default:
return "";
}
}
node_quality_t* find_add_node(int node_id, int add_node)
{
int i, saved_slot = -1;
if (node_id == 0) return NULL;
for (i=0; i< MAX_NODES; i++) {
if (nodesQ[i].node_id == node_id) {
return &nodesQ[i];
}
if ((nodesQ[i].node_id == 0) && (saved_slot == -1)) {
saved_slot = i;
}
}
if (add_node && saved_slot > -1) {
nodesQ[saved_slot].node_id = node_id;
nodesQ[saved_slot].network_sensor.node_id = node_id;
}
return &nodesQ[saved_slot];
}
sensor_quality_t* find_add_sensor(node_quality_t* node,
confidence_sensor_t sensor, int add_sensor)
{
int i;
sensor_quality_t* tmp_sensor = NULL;
if (!node) return NULL;
if (sensor == NETWORK_SENSOR) return &node->network_sensor;
for (i=0; i< MAX_SENSORS; i++) {
if (node->data_sensors[i].sensor == sensor) {
tmp_sensor = &node->data_sensors[i];
goto done;
}
if ((node->data_sensors[i].sensor == 0) && (!tmp_sensor)) {
tmp_sensor = &node->data_sensors[i];
}
}
done:
// This step is often redundant, but it is needed if tmp_sensor
// is pointing to a new index.
if (add_sensor && tmp_sensor) {
tmp_sensor->sensor = sensor;
tmp_sensor->node_id = node->node_id;
}
return tmp_sensor;
}
void clear_actions_array(int my_actions[])
{
memset(my_actions, -1, sizeof(int) * MAX_ACTIONS_STORED);
}
void clear_actions(sensor_quality_t* sensor)
{
clear_actions_array(sensor->actions_to_take);
clear_actions_array(sensor->validation_for_actions_to_take);
clear_actions_array(sensor->actions_taken);
clear_actions_array(sensor->all_actions_taken);
}
int main(int argc, char *argv[])
{
emrun_opts_t emrun_opts = {
shutdown: confidence_shutdown,
};
packet_dev_opts_t p_opts = {
device: {
},
unparse: confidence_print_data,
};
char* config_file = NULL;
char* network_clusters = NULL;
char* data_clusters = NULL;
int i, j;
/* Init */
misc_init(&argc, argv, CVSTAG);
/* initialize the node structure */
memset (nodesQ, 0, sizeof(nodesQ));
memset (actions, 0, sizeof(actions));
for (i = 0; i < MAX_NODES; i++) {
for (j = 0; j < MAX_SENSORS; j++) {
clear_actions(&nodesQ[i].data_sensors[j]);
clear_actions_array(nodesQ[i].data_sensors[j].printed_actions_taken);
}
clear_actions(&nodesQ[i].network_sensor);
clear_actions_array(nodesQ[i].network_sensor.printed_actions_taken);
nodesQ[i].network_sensor.sensor = NETWORK_SENSOR;
// Initiailze the last_udpated timestampf or node_attributes
// otherwise the times are all screwed up if a node dies in
// the very beginning.
for (j = 0; j < MAX_NETWORK_INFORMATION; j++) {
gettimeofday(&nodesQ[i].node_information[j].last_updated, NULL);
}
}
/* Get the config-file argument */
network_clusters = misc_parse_out_option(&argc, argv, "network-clusters", 'n');
data_clusters = misc_parse_out_option(&argc, argv, "data-clusters", 'd');
init_clusters(network_clusters, data_clusters);
init_monitoring();
// Set this here, after misc_init has been called otherwise
// sim_path does not include SIM_GROUP/ID
p_opts.device.devname = CONFIDENCE_DATA_STREAM;
if ((g_packet_dev(&p_opts, &pd)) < 0) {
elog(LOG_ERR,"Unable to open packet-device: %s: %m\n",
p_opts.device.devname);
return -1;
}
p_opts.device.devname = CONFIDENCE_NETWORK_STREAM;
p_opts.unparse = confidence_print_network;
if ((g_packet_dev(&p_opts, &pd_network)) < 0) {
elog(LOG_ERR,"Unable to open packet-device: %s: %m\n",
p_opts.device.devname);
return -1;
}
/* Get the config-file argument */
if ((config_file = misc_parse_out_option(&argc, argv, "config-file", 'c'))) {
init_config(config_file);
}
/* Call all of the initialization functions */
if (init_routing(&argc, argv) != 0) {
usage(argv);
}
init_collection();
if (init_ui() != 0) {
elog(LOG_ERR, "Could not initialize rules module!\n");
exit(1);
}
{
lu_opts_t lopts = {
opts: {
pkt_type: PKT_TYPE_ALL
},
receive: handle_good_packets,
};
pd_client_opts_t pd_opts = {
receive: handle_error_packets,
};
lopts.opts.name = DEFAULT_SINK_MAC_LINK;
/* Open regular link */
if (lu_open(&lopts, NULL) < 0)
{
elog(LOG_ERR, "UNable to open link device %s: %m!\n",
lopts.opts.name);
usage(argv);
}
/* Open errors link */
pd_opts.devname = link_name_s(lopts.opts.name, "errors");
if (pd_client_open(&pd_opts, NULL) < 0) {
elog(LOG_WARNING, "Unable to open pd_errors dev %s: %m", pd_opts.devname);
}
}
emrun_init(&emrun_opts);
g_main();
return 0;
}
char* get_action(int action)
{
if (action > -1) {
if (actions[action].total) {
return actions[action].action;
}
}
return NULL;
}
See more files for this project here