sympathy_main.c from EmStar at Krugle
Show sympathy_main.c syntax highlighted
/* ex: set tabstop=2 expandtab shiftwidth=2 softtabstop=2: */
/*
*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/*
*
* Author: Nithya Ramanthan
*
*/
/* NR TODO: dont set check_for_failures always in insert_neighbors - only
* set it if recorded neighbors/routes differs from reported.
* *** Dont just look for solid epochs, instead calculate time since
* last heard from a node - set a timer every time get a packet, and
* if timer triggers, then havent heard from that node in x minutes so
* indicate it! */
#include "sympathy.h"
#include "sympathy_dev.h"
#include <tos-contrib/sympathy/tos/lib/Sympathy.h>
#define DEFAULT_ERRORS_LINK "mote0"
enum {
S_ROLLOVER_FINE = 0,
S_ROLLOVER_IS_REBOOT,
};
int sn_size = sizeof(Sneighbor_info_t);
int stats_size = sizeof(Sympathy_stats_t);
int app_stats_size = sizeof(Sympathy_comp_stats_t);
sympathy_ctx_t sink = {};
uint8_t seqno = 0;
uint8_t expect_ctr = 0;
int TRACK_FAILURE_WINDOW_SIZE = TRACK_FAILURE_WINDOW_SIZE_DEFAULT;
int NUM_ITERATIONS_FOR_FAIL;
static sympathy_node_info_t* add_src(int src);
/*** Non-static functions ***/
sympathy_node_info_t* find_status_ptr(if_id_t src)
{
int sctr = find_status(src);
if (sctr < 0) {
if (src == my_node_id) {
elog(LOG_ERR, "ERROR can't even find pointer for me: %d\n",
my_node_id);
exit(1);
}
return NULL;
}
return &sink.status_srcs[sctr];
}
int find_status(if_id_t src)
{
int i;
if (src == 0) return -1;
for (i = 0; i < sink.num_srcs; i++)
{
if (sink.status_srcs[i].addr == src) return i;
}
return -1;
}
/**** STATIC FUNCS ****/
static
int duplicate_seqno(sympathy_node_info_t* stat, int my_seqno) {
int i;
for (i = 0; i < SNUM_SEQNO_STORE; i++) {
if (stat->seqno[i] == my_seqno) {
elog(LOG_DEBUG(1), "found duplicate!\n");
return 1;
}
}
stat->seqno[stat->seqno_ctr] = my_seqno;
inc_mod_int(&stat->seqno_ctr, 1, SNUM_SEQNO_STORE);
return 0;
}
/* To address issue where: if an update to a variable came during period x,
* and then the next update only comes at x + 10, then
* agg_prev_metric_pd[window] gets the whole difference between x and x+10,
* which causes a false inflation:
* so, if interpolate set: if the number of pds since last update is >
* windows stored, interpolate between the previous value and current value
* to estimate what value should be stored in each window.
* ** Include everything when calculating the final total (in the ctr field)
* but, just don't put whole total in one window - spread it out amongst past
* windows.
* NOTE: This assumes that packets are expected to be regularly transmitted
* by the node. Interpolate is only set for stats_ctrs corresponding to stats
* coming from the node
*/
static
void stats_ctr_inc(stats_ctr_t* recorded_val, uint new_val,
int treat_rollover_as_reboot, uint8_t size_field, uint8_t interpolate)
{
gettimeofday(&recorded_val->last_updated, NULL);
/* Check if we have ever updated this counter before */
if ((recorded_val->iter_updated > 0) || (recorded_val->ctr > 0))
{
/* If we detected a reboot- subtraction falsely inflates values, so instead
* just treat the new value as an absolute */
if ((new_val < recorded_val->prev_val) && (treat_rollover_as_reboot))
{
elog(LOG_DEBUG(1), "Rebooted!\n");
recorded_val->agg_prev_metric_pd[sink.window] = new_val;
recorded_val->ctr = new_val;
}
else
{
uint inc_val;
/* If the size of the struct is 1B, then use uint8_t */
if (size_field == 1) {
inc_val = (uint8_t) (new_val - recorded_val->prev_val);
}
else if (size_field == 2) {
inc_val = (uint16_t) (new_val - recorded_val->prev_val);
}
else if (size_field == 4) {
inc_val = (uint) (new_val - recorded_val->prev_val);
}
/* We see if the last update we got is before the previous period.
* If it is, then - if desired - we will interpolate to fill in empty
* windows */
if ((interpolate) && (1 + recorded_val->iter_updated < sink.metric_pd)) {
uint num_pds = sink.metric_pd - recorded_val->iter_updated;
uint start_ind = sink.window;
uint max_pds = min(num_pds, TRACK_FAILURE_WINDOW_SIZE);
/* Calculate the value that should be in each window */
uint interpolate_val = inc_val / num_pds;
elog(LOG_DEBUG(1), "num_pds %d, start-index %d, max_pds %d, interpolated val %d\n",
num_pds, start_ind, max_pds, interpolate_val);
while (max_pds > 0) {
if ((recorded_val->agg_prev_metric_pd[start_ind] > 0)
|| (recorded_val->agg_prev_metric_pd_updated[start_ind])) {
elog(LOG_ERR, "ERROR max_pds=%d, start_index=%d, counter val is %d, should be 0!!\n",
max_pds, start_ind,
recorded_val->agg_prev_metric_pd[start_ind]);
}
else {
recorded_val->agg_prev_metric_pd[start_ind] = interpolate_val;
recorded_val->agg_prev_metric_pd_updated[start_ind] = 1;
}
max_pds--;
start_ind = dec_mod(start_ind, TRACK_FAILURE_WINDOW_SIZE);
elog(LOG_DEBUG(1), "%d:start_ind %d\n", max_pds, start_ind);
}
}
else {
recorded_val->agg_prev_metric_pd[sink.window] += inc_val;
}
recorded_val->ctr += inc_val;
}
}
else
{
recorded_val->ctr = new_val;
recorded_val->agg_prev_metric_pd[sink.window] = new_val;
}
recorded_val->agg_prev_metric_pd_updated[sink.window] = 1;
recorded_val->iter_updated = sink.metric_pd;
recorded_val->prev_val = new_val;
}
static
int non_generic(int type)
{
if (type > SCOMP_LAST) return (type - SCOMP_LAST);
return type;
}
/* this function will send the packet to the routing component
* which will send it out on the correct layer */
static
int sympathy_send_pkt(Spkt_t* pkt, void* data, ssize_t data_len)
{
link_pkt_t lhdr = {
dst: {
id: LINK_BROADCAST,
},
src: {
id: my_node_id,
},
type: STO_NETWORK,
};
buf_t* buf = buf_new();
bufcpy(buf, &lhdr, sizeof(link_pkt_t));
bufcpy(buf, pkt, sizeof(Spkt_t));
if (data_len > 0) bufcpy(buf, data, data_len);
elog(LOG_DEBUG(1), "will send pkt of size %d\n", buf->len - sizeof(link_pkt_t));
if ((lu_send(sink.sympathy_link, (link_pkt_t*)buf->buf, buf->len - sizeof(link_pkt_t))) < 0)
{
elog(LOG_ERR, "ERROR Unable to send packet on sympathy link: %m!\n");
}
buf_free(buf);
return 0;
}
static
void ping_node(int node)
{
Srequest_t pd = {
seqno: seqno++,
val: node,
};
Spkt_t pkt = {
type: SREQUEST_PING_NODE,
};
sympathy_send_pkt(&pkt, (void *)&pd, sizeof(Srequest_t));
}
static
void change_auto_period(uint16_t period_secs)
{
Srequest_t pd = {
seqno: seqno++,
val: (period_secs / 60),
};
Spkt_t pkt = {
type: SREQUEST_CHANGE_AUTO_PD,
};
g_timer_resched(sink.auto_pd_timer, period_secs * 1000);
sympathy_send_pkt(&pkt, (void *)&pd, sizeof(Srequest_t));
}
static
int check_for_failures(void* data, int interval, g_event_t* event)
{
if (sink.check_for_failures) {
track_lost_nodes();
}
sink.check_for_failures = 0;
return EVENT_RENEW;
}
static
int expect_smetric(void* data, int interval, g_event_t* event)
{
if (expect_ctr < 3) expect_ctr++;
stats_ctr_inc(&sink.expected_num_sympathy_metrics,
sink.expected_num_sympathy_metrics.ctr + 1, 0,
sizeof(sink.expected_num_sympathy_metrics.ctr),0);
return EVENT_RENEW;
}
int find_neighbor(sympathy_node_info_t* stat, Saddr_t neighbor)
{
int i;
for (i = 0; i < stat->num_neighbors; i++)
{
if (stat->neighbors[i].node_id == neighbor) return i;
}
return -1;
}
static void insert_neighbors(Sneighbor_info_t* neighbor_table,
int num_neighbors, sympathy_node_info_t* stat)
{
int t, i;
Squality_t my_egress = 0;
sympathy_node_info_t tmp;
struct timeval now;
memcpy(&tmp, stat, sizeof(sympathy_node_info_t));
gettimeofday(&now, NULL);
if (num_neighbors > MAX_NEIGHBORS) {
elog(LOG_CRIT, "WARNING, Cannot add %d neighbors, max is: %d!!\n",
num_neighbors, MAX_NEIGHBORS);
}
stat->num_neighbors = 0;
elog(LOG_DEBUG(1), "node %d, num-neighbors: %d\n",
stat->addr, num_neighbors);
for (i = 0; i < num_neighbors; i++) {
sneighbor_info_t tmp_neighbor = {
time_added: now,
time_max_change: now,
};
if (stat->num_neighbors >= MAX_NEIGHBORS) {
stat->num_neighbors = MAX_NEIGHBORS;
goto done;
}
if (neighbor_table[i].addr > 0) {
/* If there is a previous entry for this node:
* - get old egress value if current info is not providing it
* - don't update the time_added, but update other info */
if ((t = find_neighbor(&tmp, neighbor_table[i].addr)) >= 0) {
if (my_egress == 0) my_egress = stat->neighbors[t].conn_to;
tmp_neighbor.num_samples = stat->neighbor_info[t].num_samples + 1;
tmp_neighbor.time_added = stat->neighbor_info[t].time_added;
}
stat->neighbors[stat->num_neighbors].node_id = neighbor_table[i].addr;
memcpy(&stat->neighbor_info[stat->num_neighbors], &tmp_neighbor,
sizeof(sneighbor_info_t));
stat->neighbors[stat->num_neighbors].if_id = neighbor_table[i].addr;
stat->neighbors[stat->num_neighbors].conn_to = my_egress;
stat->neighbors[stat->num_neighbors].state = ACTIVE;
stat->num_neighbors++;
/* Sympathy should be tracking all nodes that are on all neighbor
* lists. So, if a node is on a neighbor list, but we haven't
* heard it directly, still include it */
{
sympathy_node_info_t* tmp = find_status_ptr(neighbor_table[i].addr);
if (!tmp) add_src(neighbor_table[i].addr);
}
}
}
done:
gettimeofday(&stat->neighbors[stat->num_neighbors].last_heard, NULL);
elog(LOG_DEBUG(1), "CHECK node %d, recorded num-neighbors: %d\n",
stat->addr, stat->num_neighbors);
/* Push neighbor information to emview */
if (stat->num_neighbors > 0) sympathy_emview_neighbors(stat);
/* Call failure diagnosis algorithm because we have new info now */
sink.check_for_failures = 1;
}
/* Called when sympathy receives a PATH_ADVERT packet */
static
void handle_route_info(sympathy_node_info_t* stat, Snext_hop_t* route)
{
if (route->next_hop > 0) {
if (stat->next_hop.next_hop != route->next_hop) {
gettimeofday(&stat->next_hop_info.time_changed, NULL);
stat->next_hop_info.num_changes++;
/* Only trigger failure detection if route has changed */
sink.check_for_failures = 1;
}
stat->next_hop.next_hop = route->next_hop;
}
stat->next_hop.sink = route->sink;
stat->next_hop.quality = route->quality;
sympathy_emview_route(stat);
}
/* If returns -1, then packet is garbage */
static
int check_rollover(struct timeval *time_prev_pkt, int reported_time, int recorded_time)
{
struct timeval now;
unsigned long diff_usec;
int diff_mins;
uint diff = reported_time - recorded_time;
gettimeofday(&now, NULL);
diff_usec = misc_tv_offset(&now, time_prev_pkt);
diff_mins = diff_usec / (MILLION_I * 60);
/* Get diff between reported_time and our recorded time to see if
* rollover is possible */
if (diff <= diff_mins) return S_ROLLOVER_FINE;
/* Get difference between reported_time and 0 to see if reboot is possible */
if (reported_time <= diff_mins) return S_ROLLOVER_IS_REBOOT;
return -1;
}
/* Set the last_epoch value.
* If a value was not accumulated during a certain window, then if
* interpolate is set, we take the average value and assume it for the
* empty window. WE would only use this for expected values*/
void stats_ctr_update(stats_ctr_t* ctr, uint8_t clear, uint8_t interpolate)
{
int i;
int num_ctr = 0;
/* If clear == 1, then clear value aggregated during the prev metric-pd
* which has already been advanced, so its pointing to the "next" window */
if (clear) {
ctr->agg_prev_metric_pd[sink.window] = 0;
ctr->agg_prev_metric_pd_updated[sink.window] = 0;
}
else {
ctr->agg_prev_epoch = 0;
for (i = 0; i < TRACK_FAILURE_WINDOW_SIZE; i++)
{
if (ctr->agg_prev_metric_pd_updated[i]) {
num_ctr++;
ctr->agg_prev_epoch += ctr->agg_prev_metric_pd[i];
}
}
if ((interpolate) && (num_ctr > 0)) {
int avg = ctr->agg_prev_epoch/num_ctr;
elog(LOG_DEBUG(1), "avg %d\n", avg);
/* If any slot hasn't been updated, then we just add the average to
* the total aggregate for this epoch! */
for (i = 0; i < TRACK_FAILURE_WINDOW_SIZE; i++) {
if (!ctr->agg_prev_metric_pd_updated[i]) {
elog(LOG_DEBUG(1), "adding avg to window %d\n", i);
ctr->agg_prev_epoch += avg;
}
}
}
}
}
static
int find_app_type(int type)
{
int i, munged_type = non_generic(type);
if (type == SCOMP_STATS_SYMPATHY) {
elog(LOG_ERR, "ERROR Something is wrong - why are you sending Sympathy stats type %d!\n", type);
return -1;
}
for (i = 0; i < sink.num_apps_registered; i++)
{
if (munged_type == sink.app_type[i]) return i;
}
if (sink.num_apps_registered < SCOMP_LAST - 1)
{
sink.app_type[sink.num_apps_registered] = munged_type;
sink.num_apps_registered++;
return sink.num_apps_registered - 1;
}
return -1;
}
static
void record_app_stats(sympathy_node_info_t* stat, Sympathy_comp_stats_t* stats,
stats_pkt_type_t type, int reboot)
{
int actr = find_app_type(type);
sympathy_node_app_info_t* sninfo;
if (actr < 0)
{
elog(LOG_ERR, "Can't fit app: %d\n", type);
return;
}
sninfo = &stat->node_app_info[actr];
stats_ctr_inc(&sninfo->app_stats_rx_from_node,
sninfo->app_stats_rx_from_node.ctr+1,0,
sizeof(sninfo->app_stats_rx_from_node.ctr), 0);
stats_ctr_inc(&sninfo->node_num_pkts_rx,
(uint) stats->num_pkts_rx, reboot,
sizeof(stats->num_pkts_rx), 1);
stats_ctr_inc(&sninfo->node_send_failures,
(uint) stats->send_failures, reboot,
sizeof(stats->send_failures), 1);
stats_ctr_inc(&sninfo->node_max_queue_occupancy,
(uint) stats->max_queue_occupancy, reboot,
sizeof(stats->max_queue_occupancy), 1);
stats_ctr_inc(&sninfo->node_num_pkts_dropped,
(uint) stats->num_pkts_dropped, reboot,
sizeof(stats->num_pkts_dropped), 1);
stats_ctr_inc(&sninfo->node_num_pkts_tx,
(uint) stats->num_pkts_tx, reboot,
sizeof(stats->num_pkts_tx), 1);
}
static
int record_sympathy_stats(sympathy_node_info_t* stat, Sympathy_stats_t* stats)
{
int accept = S_ROLLOVER_FINE;
if (stats->time_awake_mins < stat->time_awake_mins.prev_val)
{
/* Check if the packet is garbage by detecting false roll-over warnings
* by looking at its time awake */
accept = check_rollover(&stat->sympathy_stats_rx.last_updated,
stats->time_awake_mins, stat->time_awake_mins.ctr);
if (accept < S_ROLLOVER_FINE)
{
elog(LOG_WARNING, "WARNING: Packet is garbage!\n");
return -1;
}
}
if (accept == S_ROLLOVER_IS_REBOOT) stat->rebooted = 1;
stats_ctr_inc(&stat->time_awake_mins, stats->time_awake_mins,
stat->rebooted, sizeof(stats->time_awake_mins), 1);
stats_ctr_inc(&stat->num_metrics_tx, stats->num_metrics_tx,
stat->rebooted, sizeof(stats->num_metrics_tx), 1);
stats_ctr_inc(&stat->num_stats_tx, stats->num_stats_tx,
stat->rebooted, sizeof(stats->num_stats_tx), 1);
stats_ctr_inc(&stat->num_pkts_tx_succeeded, stats->num_pkts_tx_succeeded,
stat->rebooted, sizeof(stats->num_pkts_tx_succeeded), 1);
stats_ctr_inc(&stat->num_pkts_rx, stats->num_pkts_rx,
stat->rebooted, sizeof(stats->num_pkts_rx), 1);
stats_ctr_inc(&stat->num_pkts_dropped, stats->num_pkts_dropped,
stat->rebooted, sizeof(stats->num_pkts_dropped), 1);
stats_ctr_inc(&stat->num_pkts_tx_failed, stats->num_pkts_tx_failed,
stat->rebooted, sizeof(stats->num_pkts_tx_failed), 1);
stats_ctr_inc(&stat->num_pkts_crc_error, stats->num_pkts_crc_error,
stat->rebooted, sizeof(stats->num_pkts_crc_error), 1);
stats_ctr_inc(&stat->sympathy_stats_rx, stat->sympathy_stats_rx.ctr + 1,
0, sizeof(stat->sympathy_stats_rx.ctr), 0);
return accept;
}
static
void send_stats_pkt(int addr, void* stats, int type, int len)
{
buf_t* buf = buf_new();
link_pkt_t pkt = {
dst: {
id: LINK_BROADCAST
},
src: {
id: addr
},
ext_type: type,
type: SCOMP_STATS,
};
bufcpy(buf, &pkt, sizeof(link_pkt_t));
bufcpy(buf, stats, len);
elog(LOG_DEBUG(1), "buflen: %d\n", buf->len);
if (lu_send(sink.sympathy_link, (link_pkt_t *)buf->buf, buf->len - sizeof(link_pkt_t)) < 0)
{
elog(LOG_ERR, "Unable to send pkt: %m!\n");
}
buf_free(buf);
}
static
void handle_sympathy_stats(sympathy_node_info_t* stat, uint8_t* data, ssize_t len)
{
stats_pkt_type_t type;
int offset = 0;
Sympathy_stats_t* s_stats;
Sympathy_comp_stats_t* a_stats;
int reboot = 0;
generic_comp_stats_t* gen_pkt;
/* This is a stats-packet of the format: {type, data} */
if (len == 0) return;
while (offset < len-1)
{
type = data[offset++];
elog(LOG_DEBUG(1), "offset %d, pkt len: %d, first type: %d\n", offset,
len, type);
switch (type)
{
case (SCOMP_STATS_SYMPATHY):
s_stats = (Sympathy_stats_t *)&data[offset];
if (len - offset >= stats_size)
{
reboot = record_sympathy_stats(stat, s_stats);
offset += stats_size;
}
else elog(LOG_ERR, "Type is %d, but remaining size: %d\n", type,
len - offset);
break;
case (SCOMP_STATS2):
case (SCOMP_STATS3):
case (SCOMP_STATS4):
case (SCOMP_STATS5):
case (SCOMP_STATS6):
a_stats = (Sympathy_comp_stats_t *)&data[offset];
if (len - offset >= app_stats_size)
{
record_app_stats(stat, a_stats, type, reboot);
offset += app_stats_size;
}
else elog(LOG_ERR, "Type is %d, but remaining size: %d\n", type,
len - offset);
break;
case (SCOMP_STATS2_GEN):
case (SCOMP_STATS3_GEN):
case (SCOMP_STATS4_GEN):
case (SCOMP_STATS5_GEN):
case (SCOMP_STATS6_GEN):
gen_pkt = (generic_comp_stats_t *) &data[offset];
elog(LOG_DEBUG(1), "comp-specific stats with len: %d!\n",
gen_pkt->len);
send_stats_pkt(stat->addr, gen_pkt->data, type, gen_pkt->len);
offset += gen_pkt->len;
break;
default:
elog(LOG_ERR, "Unidintifiable stats-type: %d\n", type);
break;
}
}
}
static
void handle_sympathy_metrics(sympathy_node_info_t* stat, Smetrics_t *metrics,
ssize_t data_len)
{
stats_ctr_inc(&stat->metrics_rx, stat->metrics_rx.ctr + 1, 0,
sizeof(stat->metrics_rx.ctr), 0);
/* First check if we are even supposed to be getting these! */
if (sink.sympathy_auto_metric_period== 0)
{
elog(LOG_ERR, "auto-period = %d! Shouldn't get auto-metric pkts Send another cancel!\n",
sink.sympathy_auto_metric_period);
bufprintf(stat->fault_buf,
"Node %d BAD: auto-period = %d! Shouldn't get auto-metric pkts. Send another cancel!\n",
stat->addr, sink.sympathy_auto_metric_period);
change_auto_period(0);
}
/* Get the next-hop info */
/* First sanity check */
if (data_len < sizeof(Smetrics_t)) {
elog(LOG_ERR, "Pkt-size: %d < Smetrics_t size (%d)\n",
data_len, sizeof(Smetrics_t));
return;
}
handle_route_info(stat, &metrics->route);
/* Get the neighbor-list info */
elog(LOG_DEBUG(1), "node %d, data-len %d, metrics num-neighbors: %d\n",
stat->addr, data_len, metrics->num_neighbors);
/* First sanity check */
if (data_len < sizeof(Smetrics_t)
+ (sizeof(Sneighbor_info_t) * metrics->num_neighbors)) {
elog(LOG_ERR, "Pkt-size: %d < Smetrics_t + neighbors size (%d)\n", data_len,
sizeof(Smetrics_t) + sizeof(Sneighbor_info_t) * metrics->num_neighbors);
return;
}
insert_neighbors(metrics->neighbors, metrics->num_neighbors, stat);
}
void handle_sympathy_data(sympathy_node_info_t* stat,
Spkt_t* spkt, ssize_t pkt_len)
{
uint8_t spkt_len = pkt_len - sizeof(Spkt_t);
switch (spkt->type)
{
case(SMETRICS_UPDATE):
handle_sympathy_metrics(stat, (Smetrics_t *)spkt->data, spkt_len);
break;
case (SYMPATHY_STATS):
handle_sympathy_stats(stat, spkt->data, spkt_len);
break;
default:
elog(LOG_WARNING, "Got unexpected packet-type: %d\n", spkt->type);
}
}
static sympathy_node_info_t* add_src(int src)
{
int sctr = sink.status_ctr;
if (src == 0) {
elog(LOG_ERR, "ERROR: Unable to add nodes with address %d!!\n", src);
return NULL;
}
sink.status_srcs[sctr].addr = src;
inc_mod(&sink.status_ctr, 1, SMAX_SRCS);
sink.num_srcs = min(sink.num_srcs + 1, SMAX_SRCS);
sympathy_update_emview(src);
return &sink.status_srcs[sctr];
}
/* NR Move this to routing layer! */
static
int handle_error_packets(void *pkt, ssize_t len,
pd_client_context_t* pdc)
{
link_pkt_t *hdr = (link_pkt_t *)pkt;
sympathy_node_info_t* stat;
sympathy_node_info_t* mystat = find_status_ptr(my_node_id);
elog(LOG_DEBUG(1), "error from: %d\n", hdr->src.id);
/* If we dont have an entry for this node, then record it in
* the general category for the sink! We record
* it from the last-hop source, not from the source recorded in the
* multihop header! */
if (!(stat = find_status_ptr(hdr->src.id))) {
stat = mystat;
}
stats_ctr_inc(&stat->errs_rx, stat->errs_rx.ctr + 1, 0,
sizeof(stat->errs_rx.ctr), 0);
/* Increment counter to check for congestion at the sink */
stats_ctr_inc(&mystat->num_pkts_crc_error,
mystat->num_pkts_crc_error.ctr + 1, 0,
sizeof(mystat->num_pkts_crc_error.ctr), 0);
g_free(pkt);
return EVENT_RENEW;
}
/* Receive a packet from sympathy link status device */
static
void usage(char *name)
{
buf_t* buf = buf_new();
bufprintf(buf, "--uses <link> to monitor packets received with errors [default=%s]\n \t-e number metric-pds per epoch [default=%d]\n",
DEFAULT_ERRORS_LINK, TRACK_FAILURE_WINDOW_SIZE_DEFAULT);
misc_print_usage
(name, "[-U <link> -e <failure-window>]\n", buf->buf);
buf_free(buf);
exit(1);
}
static
void sympathy_shutdown(void *data)
{
elog(LOG_NOTICE, "sympathy-sink shutting down");
exit(0);
}
static
int sympathy_command(char* cmd, size_t size, void* data)
{
parser_state_t *ps = misc_parse_init(cmd, MISC_PARSE_COLON_SCHEME);
int retval = EVENT_RENEW;
elog(LOG_DEBUG(1), "command: %s\n", cmd);
/* parse message */
while (misc_parse_next_kvp(ps) >= 0)
{
if( strcmp(ps->key, "ping") == 0 ) {
elog(LOG_DEBUG(1), "key: %s is ping %d\n",
ps->key, atoi(ps->value));
ping_node(atoi(ps->value));
}
else if( strcmp(ps->key, "auto_period") == 0 )
{
elog(LOG_DEBUG(1), "key: %s is auto_period %d\n",
ps->key, atoi(ps->value));
sink.sympathy_auto_metric_period = (uint16_t) atoi(ps->value);
change_auto_period(sink.sympathy_auto_metric_period);
}
else {
elog(LOG_ERR, "Unrecognized key: %s\n", ps->key);
retval = EVENT_ERROR(EINVAL);
goto done;
}
}
done:
misc_parse_cleanup(ps);
return retval;
}
static char* sympathy_usage(void *data)
{
buf_t buf = {};
int i;
bufprintf(&buf, "Sympathy command interface:\n");
bufprintf(&buf, " ping=<node-id> Gather info about a node\n");
bufprintf(&buf, " auto_period=<period-in-seconds> Change period of automatic metric floods\n");
bufprintf(&buf, " (pd=0 => stop metric floods)\n");
bufprintf(&buf, "\nCurrent values:\nnum-apps-registered=%d\tauto_period=%d[secs]\n", sink.num_apps_registered, sink.sympathy_auto_metric_period);
bufprintf(&buf, "\n**** Data Returned From Pings:\n");
for (i=0; i < sink.num_srcs; i++) {
if (sink.status_srcs[i].neighbor_buf->len > 0) {
bufprintf(&buf, "%s", sink.status_srcs[i].neighbor_buf->buf);
}
}
return buf.buf;
}
static
void update_sympathy_sink(sympathy_node_info_t* stat,
sympathy_status_info_t* info, int pkt_type, int update_only_pkt_tx)
{
sympathy_node_app_info_t* sninfo;
int actr;
if ((actr = find_app_type(pkt_type)) < 0)
{
elog(LOG_ERR, "Can't find struct for app_type: %d!\n",
pkt_type);
return;
}
sninfo = &stat->node_app_info[actr];
stats_ctr_inc(&sninfo->sink_pkt_tx, info->pkt_tx, 0, sizeof(info->pkt_tx), 0);
if (!update_only_pkt_tx)
{
stats_ctr_inc(&sninfo->sink_pkt_rx, info->pkt_rx, 0, sizeof(info->pkt_rx), 0);
sninfo->sink_pkt_rx.last_updated = info->time_rx_last_pkt;
stats_ctr_inc(&sninfo->sink_pkt_expected_rx, info->pkt_expected_rx, 0,
sizeof(info->pkt_expected_rx), 0);
if (strlen(info->component_name) > 0) {
if (strlen(info->component_name) > SMAX_COMP_NAME_SIZE) {
elog(LOG_WARNING,"WARN len of component-name: %s(%d) is > max (%d)\n",
info->component_name, strlen(info->component_name),
SMAX_COMP_NAME_SIZE);
snprintf(sninfo->comp_name, SMAX_COMP_NAME_SIZE, "%s",
info->component_name);
}
else sprintf(sninfo->comp_name, "%s", info->component_name);
}
if ((info->pkt_reception_percent > 0)
&& (!sninfo->pkt_reception_locked)) {
sninfo->pkt_reception_percent = info->pkt_reception_percent;
}
}
}
static
int handle_good_packets(lu_context_t* lu, link_pkt_t* pkt, ssize_t data_len)
{
sympathy_node_info_t* stat = find_status_ptr(my_node_id);
/* Increment counter to check for congestion at the sink */
stats_ctr_inc(&stat->num_pkts_rx,
stat->num_pkts_rx.ctr + 1, 0,
sizeof(stat->num_pkts_rx.ctr), 0);
g_free(pkt);
return EVENT_RENEW;
}
static
int status_receive(lu_context_t* lu, link_pkt_t* pkt, ssize_t data_len)
{
sympathy_status_info_t* info = (sympathy_status_info_t *)pkt->data;
sympathy_node_info_t* stat = NULL;
sympathy_node_app_info_t* sninfo;
int ictr;
int msg_len = data_len;
elog(LOG_DEBUG(1), "src %d, Pkt type/comp: %d/%d, datalen: %d, msg_len %d\n",
pkt->src.id, pkt->type, pkt->ext_type, data_len, msg_len);
if (pkt->src.id == LINK_BROADCAST) {
if (pkt->type != SSINK_UPDATE) {
elog(LOG_ERR, "ERROR, Need valid source addr for all pkt-type but %d\n",
SSINK_UPDATE);
goto done;
}
}
else {
/* Find index for original source of packet */
if (!(stat = find_status_ptr(pkt->src.id))) {
if (!(stat = add_src(pkt->src.id))) {
elog(LOG_ERR, "Can't find struct for node %d\n", pkt->src.id);
goto done;
}
}
}
switch(pkt->type) {
case (SCOMP_ASCII_STATS):
elog(LOG_DEBUG(2), "pkt_type: %d\n", pkt->type);
if ((ictr = find_app_type(pkt->type)) < 0)
{
elog(LOG_ERR, "Can't find info for app-type %d for src %d!\n",
pkt->type, pkt->src.id);
goto done;
}
sninfo = &stat->node_app_info[ictr];
bufcpy(sninfo->generic_stats_buf, pkt->data, msg_len);
break;
case (SSINK_UPDATE):
/* Treat this case differently - update all nodes */
if (pkt->src.id == LINK_BROADCAST) {
int i;
for (i = 0; i < sink.num_srcs; i++) {
update_sympathy_sink(&sink.status_srcs[i], info,
pkt->ext_type, 1);
}
goto done;
}
else {
update_sympathy_sink(stat, info, pkt->ext_type, 0);
}
break;
case (SNETWORK_PKT):
elog(LOG_DEBUG(2), "have network packet!\n");
/* First check for duplicate sequence number */
if ((((pkt->ext_type >= SNON_ROUTING_PKT)
&& (!duplicate_seqno(stat, pkt->seqno)))
|| pkt->ext_type < SNON_ROUTING_PKT)
/* Last case is special case - when we get a neighbor-advert
* from my_node_id - with seqno of -1, then this is nto a packet
* the sink is sending, instead this is a neighbor-update
* received from a status device on the sink */
&& (!((stat->addr == my_node_id)
&& (pkt->seqno == 255)))) {
stats_ctr_inc(&stat->packet, stat->packet.ctr+1, 0,
sizeof(stat->packet.ctr), 0);
stats_ctr_inc(&stat->tos_packets[pkt->ext_type],
stat->tos_packets[pkt->ext_type].ctr+1, 0,
sizeof(stat->tos_packets[pkt->ext_type].ctr), 0);
}
/* We don't need a separate case for beacon and non-sympathy
* app packets because we just need a count of those */
switch (pkt->ext_type)
{
case(SNEIGHBOR_ADVERT): /* neighbor data */
{
sympathy_neighbor_t* n = (sympathy_neighbor_t *) pkt->data;
insert_neighbors(n->neighbor, n->num_neighbors, stat);
}
break;
case(SPATH_ADVERT): /* route data */
handle_route_info(stat, (Snext_hop_t *) (pkt->data));
break;
case(SYMPATHY_PKT): /* Multihop app layer */
handle_sympathy_data(stat, (Spkt_t*)pkt->data, data_len);
break;
default:
elog(LOG_DEBUG(3), "Ignoring packet w ext-type: %d\n", pkt->ext_type);
break;
}
default:
elog(LOG_DEBUG(3), "Ignoring pkt type %d\n", pkt->type);
break;
}
done:
g_free(pkt);
return EVENT_RENEW;
}
int main(int argc, char **argv)
{
char *epoch;
#ifdef USE_BAYES
char *emstar_root;
#endif
emrun_opts_t emrun_opts = {
shutdown: sympathy_shutdown,
silent: 1
};
cmd_dev_opts_t copts = {
device: {
},
command: sympathy_command,
usage: sympathy_usage,
};
/* Open command device */
copts.device.devname = sympathy_device(SCMD_DEVICE);
if (g_command_dev(&copts, &sink.cmd) < 0) {
elog(LOG_ERR, "Unable to open command-device %s: %m\n", copts.device.devname);
}
sink.sympathy_auto_metric_period = METRICS_PERIOD_MSEC/1000;
/* generic initialization common to all programs */
misc_init(&argc, argv, CVSTAG);
gettimeofday(&start_time, NULL);
initialize_devices();
/* Link-user to pass status between components on the sink */
{
lu_opts_t lopts = {
opts: {
name: SYMPATHY_STATS_DEVICE,
pkt_type: PKT_TYPE_ALL
},
receive: status_receive
};
if (lu_open(&lopts, &sink.sympathy_link) < 0)
{
elog(LOG_ERR, "UNable to open status device: %s: %m!\n",
lopts.opts.name);
usage(argv[0]);
}
}
{
lu_opts_t lopts = {
opts: {
pkt_type: PKT_TYPE_ALL
},
receive: handle_good_packets,
};
pd_client_opts_t pd_opts = {
data: &sink,
receive: handle_error_packets,
};
/* Get the --uses name */
if (!(lopts.opts.name = link_parse_uses(&argc, argv, NULL))) {
lopts.opts.name = DEFAULT_ERRORS_LINK;
}
/* Open regular link */
if (lu_open(&lopts, NULL) < 0)
{
elog(LOG_ERR, "UNable to open link device %s: %m!\n",
lopts.opts.name);
usage(argv[0]);
}
/* Open errors link */
pd_opts.devname = link_name_s(lopts.opts.name, "errors");
if (pd_client_open(&pd_opts, &sink.errors_pc) < 0) {
elog(LOG_WARNING, "Unable to open pd_errors dev %s: %m", pd_opts.devname);
}
}
if (misc_parse_out_option(&argc, argv, "help", 'h')) usage(argv[0]);
if ((epoch = misc_parse_out_option(&argc, argv, "epoch", 'e'))) {
TRACK_FAILURE_WINDOW_SIZE = atoi(epoch);
}
#ifdef USE_BAYES
if ((emstar_root = misc_parse_out_option(&argc, argv, "root", 'r'))) {
elog(LOG_DEBUG(1), "emstar root : %s \n", emstar_root);
/* bayes engine initialization */
initialize_bayes(emstar_root);
}
#endif
if (misc_args_remain(&argc, argv)) {
elog(LOG_CRIT, "Additional unparsed arguments!");
usage(argv[0]);
}
g_timer_add(sink.sympathy_auto_metric_period * 1000, expect_smetric,
NULL, NULL, &sink.auto_pd_timer);
g_timer_add(METRICS_PERIOD_MSEC, call_track_lost_nodes,
NULL, NULL, NULL);
g_timer_add(10 * 1000, check_for_failures,
NULL, NULL, NULL);
/* Emvew initialization */
sympathy_init_emview();
/* Initialize our structs */
{
sympathy_node_info_t* stat;
sympathy_node_app_info_t* snode;
int i, j;
for (i = 0; i < SMAX_SRCS; i++) {
stat = &sink.status_srcs[i];
stat->fault_buf = buf_new();
stat->neighbor_buf = buf_new();
for (j = 0; j < SCOMP_LAST; j++) {
snode = &stat->node_app_info[j];
snode->fault_buf = buf_new();
snode->generic_stats_buf = buf_new();
snode->pkt_reception_percent = SMSG_RECEPTION_THRESH_DEFAULT;
}
}
}
/* Add the sink to the srcs list! */
add_src(my_node_id);
emrun_init(&emrun_opts);
g_main();
elog_g(LOG_CRIT, "event loop terminated abnormally!");
return 0;
}
See more files for this project here