Show idr_util.c syntax highlighted
/*
*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/*
* idr_util.c: Some utilities for the idr deamon
*
* $Id: idr_util.c,v 1.9 2003/07/11 22:29:52 cerpa Exp $
*/
char idr_util_c_cvsid[] = "$Id: idr_util.c,v 1.9 2003/07/11 22:29:52 cerpa Exp $";
#include <stdio.h>
#include <stdlib.h>
#include <link/link.h>
#include <link/neighbor.h>
#include "idr_i.h"
/*** Internal helper functions ***/
int8_t update_datarecord(idr_pkt_t *pkt, idr_state_t *s);
int8_t check_datarecord(node_id_t sender, idr_state_t *s);
uint8_t count_datarecords(idr_state_t *s);
int8_t stat_index(node_id_t sender, idr_state_t *s);
int handle_nack_reply(idr_state_t *s, node_id_t sender, int count);
int update_nackcount(idr_state_t *s, node_id_t id);
float assign_score(node_rec_t *noderec);
void sort_scores(node_rec_t noderec[], int count);
void sort_importance(node_rec_t noderec[], int count);
void log_stats(idr_state_t *s);
int best_score_nack(idr_state_t *s);
int combinatorial_nack(idr_state_t *s);
// TODO: add a timer based on the deadline or something, then use send_nack
// internally. For now, I use it at the command device callback
//void send_nack(idr_state_t *f, node_id_t toAddr, int nack_count);
/*******************************************************************/
/* NACK timer callback
* When this expires, the time_remaining is decremented by 'interval'
* and if it's nonzero, it gets scheduled again. If the max_nackno has not
* been reached, the selection algorithm is run again, then a nack is sent
*/
gboolean nack_timer(void *data, int interval, g_event_t *event)
{
idr_state_t *s = (idr_state_t *) data;
// do we have any time left?
if ((s->constr.time_remaining-interval) > 0 ) {
s->constr.time_remaining-=interval;
// See which method we are using
switch (s->nackmethod) {
case NO_NACKS:
// cool, no nacks, no work to do!
log_stats(s);
return TIMER_DONE;
break;
case BEST_SCORE:
// Any nacks left?
if (s->constr.nacks_remaining > 0) {
select_source_to_nack(s);
elog(LOG_NOTICE, "Time remaining: %d ms\n",
s->constr.time_remaining);
return TIMER_RENEW;
} else {
elog(LOG_NOTICE, "Max number of NACKs sent. "
"Timer disabled-%d ms early\n",
s->constr.time_remaining);
log_stats(s);
return TIMER_DONE;
}
break;
case COMBINATORIAL:
// have we met the accuracy constraint?
if (s->constr.min_accuracy>0) {
select_source_to_nack(s);
elog(LOG_NOTICE, "Time remaining: %d ms\n",
s->constr.time_remaining);
return TIMER_RENEW;
} else {
elog(LOG_NOTICE, "Minimum accuracy constraint met. "
"Timer disabled-%d ms early\n",
s->constr.time_remaining);
log_stats(s);
return TIMER_DONE;
}
break;
default:
elog(LOG_ERR, "Unknown NACK mehtod (%d)\n", s->nackmethod);
break;
}
}
elog(LOG_NOTICE, "Deadline expired\n");
log_stats(s);
return TIMER_DONE;
}
void log_stats(idr_state_t *s)
{
int fd,i,j;
char logbuf[255]={0};
j=sprintf(logbuf, LOGFILE);
j+=sprintf(logbuf+j, "%d", s->pot);
if ((fd=open(logbuf, O_WRONLY|O_CREAT|O_APPEND, 0644)) < 0) {
elog(LOG_ERR, "Unable to open %s\n", logbuf);
perror("open");
return;
}
// -1 = beginning of log
j=sprintf(logbuf, "-1:0:0:0\n");
write(fd, (char*)&logbuf, j);
// Logfile format:
// nodeid:importance:nack_count:retx_count
for (i=0;i<MAX_NEIGHBORS;i++) {
if (s->status.nodelist[i].id != 0) {
j=sprintf(logbuf, "%d:%d:%d:%d\n",
s->status.nodelist[i].id,
s->status.nodelist[i].importance,
s->status.nodelist[i].nack_count,
s->status.nodelist[i].retx_count);
write(fd, (char *)&logbuf, j);
}
}
// -2 = end of log
j=sprintf(logbuf, "-2:0:0:0\n");
write(fd, (char*)&logbuf, j);
close(fd);
}
/*
* Called once during the init, before event processing starts
*/
void idr_util_init(idr_state_t *f)
{
// setup a periodic ack timer callback
// g_timer_add(f->ack_period, ack_timer, f, NULL, NULL);
}
/*
* Called when a IDR packet with data comes in from the lower
* interface
*/
void idr_handle_data_pkt(idr_state_t *f, link_pkt_t *link_pkt,
ssize_t data_len)
{
idr_pkt_t *idr_pkt = (idr_pkt_t *) link_pkt->data;
idr_data_pkt_t *idr_data_hdr = (idr_data_pkt_t *) idr_pkt->next_hdr;
link_pkt_t up_link_hdr;
buf_t *up_pkt;
// Duplicate suppression and other handling goes here
if (idr_pkt==NULL) {
elog_g(LOG_CRIT, "NULL pointer\n");
return;
}
// if the toAddr is set (so this is a nack reply)
// and it has my name on it
if ((idr_data_hdr->toAddr!=0) && (idr_data_hdr->toAddr==my_node_id)) {
handle_nack_reply(f, idr_pkt->sender, idr_data_hdr->count);
}
// Construct the packet
up_link_hdr.src.id=link_pkt->src.id;
up_link_hdr.dst.id=link_pkt->dst.id;
up_link_hdr.rcv_time=link_pkt->rcv_time;
up_link_hdr.type=idr_data_hdr->inner_type;
// Send it up the stack
up_pkt = buf_new();
// first construct the link header
bufcpy(up_pkt, &up_link_hdr, sizeof(up_link_hdr));
// followed by user data
bufcpy(up_pkt, idr_data_hdr->data,
data_len - (sizeof(idr_pkt_t)+sizeof(idr_data_pkt_t)));
// send it up
pd_receive(f->pd_context, up_pkt->buf, up_pkt->len);
buf_free(up_pkt);
}
neighbor_t* find_neighbor(neighbor_t *nb_list, int nb_count, node_id_t id) {
int i;
if( nb_list )
for( i=0; i < nb_count; i++) {
if( nb_list[i].node_id==id)
return(&nb_list[i]);
}
return NULL;
}
/*
* Called when a IDR NACK packet arrives from the lower interface
*/
void idr_handle_nack_pkt(idr_state_t *f, link_pkt_t *link_pkt, ssize_t data_len)
{
idr_pkt_t *idr_pkt_hdr = (idr_pkt_t *) link_pkt->data;
idr_nack_pkt_t *idr_nack_pkt = (idr_nack_pkt_t *)(idr_pkt_hdr->next_hdr);
if (idr_nack_pkt->toAddr != my_node_id) {
elog(LOG_DEBUG(4), "NACK packet from node %d is not for me. "
"Ignoring it\n", idr_pkt_hdr->sender);
return;
}
elog(LOG_NOTICE, "Received NACK pkt from node %d\n",
idr_pkt_hdr->sender);
if (f->pktbuf!=NULL) {
// header stripping fun!
int n;
idr_data_pkt_t *idr_data_hdr;
n=sizeof(link_pkt_t)+sizeof(idr_pkt_t);
idr_data_hdr=(idr_data_pkt_t *)(f->pktbuf->buf+n);
// hopefully it now points to the right point...
idr_data_hdr->toAddr=idr_pkt_hdr->sender;
idr_data_hdr->count++;
elog(LOG_NOTICE, "RETRANSMITTING packet to node %d, %d time(s)\n",
idr_data_hdr->toAddr, idr_data_hdr->count);
idr_send_to_all(f, (link_pkt_t *)f->pktbuf->buf,
f->pktbuf->len - sizeof(link_pkt_t));
}
f->status.retno++;
}
int new_neighbor_list(neighbor_t *nb_list, int count, void *data)
{
idr_state_t *s=data;
elog(LOG_DEBUG(0), "Whoo hoo - new neighbor list received!");
// replace old list (if any) with a new one
if(s->nb_list) {
free(s->nb_list);
}
s->nb_list=nb_list;
s->nb_count=count;
return EVENT_RENEW;
}
// Syncing the conf nodelist with the status nodelist
int sync_conf_status(node_command_rec_t c[], node_rec_t s[], int max)
{
uint16_t i;
int retcode=0;
for (i=0; i<max; i++) {
if (c[i].id != 0) {
s[i].id = c[i].id;
s[i].importance = c[i].importance;
retcode++;
}
}
return retcode;
}
// insert an element in the nodelist of the status struct
// return the index to the nodelist or -1 if the list is full
int insert_in_status(node_command_rec_t c, node_rec_t s[], int max)
{
uint16_t i;
for (i=0; i<max; i++) {
if (s[i].id==0) {
s[i].id=c.id;
s[i].importance=c.importance;
return i;
}
}
elog(LOG_ERR, "Status nodelist FULL\n");
return -1;
}
int sync_neighb_status(neighbor_t *nb_list, node_rec_t s[], int max, int count)
{
uint16_t i,j;
int retcode=0;
elog(LOG_ERR,"%d\n", count);
// i is the s index
for (i=0;i<max;i++) {
if (s[i].id != 0) {
// j is the neighborlist index
for (j=0;j<count;j++) {
if (s[i].id == nb_list[j].node_id) {
// node IDs match, update info at status struct
s[i].conn_to = nb_list[j].conn_to;
s[i].conn_from = nb_list[j].conn_from;
retcode++;
// there shouldn't be a duplicate match anyway
break;
}
}
/*
// neighbor not found in neighborlist
// now print the list
s[i].conn_to = 0.0;
s[i].conn_from = 0.0;
*/
}
}
return retcode;
}
void send_nack(idr_state_t *f, node_id_t toAddr, int nack_count)
{
link_pkt_t link_hdr;
idr_pkt_t idr_hdr;
idr_nack_pkt_t idr_nack_hdr;
buf_t *pkt_out;
// Populate lower-level link header
link_hdr.dst.id=LINK_BROADCAST;
link_hdr.type=PKT_TYPE_IDR;
// Populate idr base header
idr_hdr.sender=my_node_id;
idr_hdr.sub_type=IDR_NACK;
// Populate idr nack header
// 'round' is probably useless
idr_nack_hdr.toAddr=toAddr;
idr_nack_hdr.count=nack_count;
elog(LOG_NOTICE, "Sending IDR NACK message to node %d\n", toAddr);
pkt_out = buf_new();
bufcpy(pkt_out, &link_hdr, sizeof(link_hdr));
bufcpy(pkt_out, &idr_hdr, sizeof(idr_hdr));
bufcpy(pkt_out, &idr_nack_hdr, sizeof(idr_nack_hdr));
idr_send_to_all(f, (link_pkt_t *)pkt_out->buf,
(pkt_out->len - sizeof(link_pkt_t)));
//
f->status.total_nacks++;
f->status.round_nacks++;
f->enable_monitor=1;
buf_free(pkt_out);
}
int update_nackcount(idr_state_t *s, node_id_t id)
{
int16_t i=0;
for (i=0; i<MAX_NEIGHBORS; i++) {
if (s->status.nodelist[i].id == id) {
s->status.nodelist[i].nack_count++;
return i;
}
}
return -1;
}
int get_index(idr_state_t *s, node_id_t id)
{
int16_t i=0;
for (i=0; i<MAX_NEIGHBORS; i++) {
if (s->status.nodelist[i].id == id) {
return i;
}
}
return -1;
}
int handle_nack_reply(idr_state_t *s, node_id_t sender, int count)
{
uint8_t i;
for (i=0; i<MAX_NEIGHBORS; i++) {
// Find the sender in the status table
if (s->status.nodelist[i].id == sender) {
s->status.nodelist[i].retx_count = count;
// Now update the accuracy constraint
s->constr.min_accuracy-=s->status.nodelist[i].importance;
break;
}
}
return i;
}
/* nack selection functions */
int select_source_to_nack(idr_state_t *s)
{
switch (s->nackmethod) {
case NO_NACKS:
elog(LOG_ERR, "NO_NACKS selected, not sending anything\n");
break;
case BEST_SCORE:
best_score_nack(s);
break;
case COMBINATORIAL:
combinatorial_nack(s);
break;
default:
elog(LOG_ERR, "Unknown nack method (%d)\n", s->nackmethod);
break;
}
return s->nackmethod;
}
/* Best-score greedy algorithm
* Used in BEST_SCORE nack method
*/
int best_score_nack(idr_state_t *s)
{
int i;
i=0;
for (i=0; i<MAX_NEIGHBORS; i++) {
s->status.nodelist[i].score=assign_score(&(s->status.nodelist[i]));
}
sort_scores(s->status.nodelist, MAX_NEIGHBORS);
if (s->constr.nacks_remaining > 0) {
// The array is sorted, max is on top
if (s->status.nodelist[0].score > 1e-6) {
// increment the nackcount for this sender
s->status.nodelist[0].nack_count++;
send_nack(s, s->status.nodelist[0].id,
s->status.nodelist[0].nack_count);
s->constr.nacks_remaining--;
} else {
elog(LOG_NOTICE, "Score too low\n");
}
} else {
elog(LOG_NOTICE, "Maximum number of NACKs exceeded\n");
}
return 0;
}
/* Combinatorial algorith
* Used in accuracy-guarantee, COMBINATORIAL nack method
* Uses a binomial tree as its primary data structure
*/
int combinatorial_nack(idr_state_t *s)
{
nodep head[MAX_NEIGHBORS]={NULL};
ep candidates[MAX_NEIGHBORS]={NULL};
struct info d[MAX_NEIGHBORS];
int i,j;
int nacks_out=0;
int num_elements=0; // the actual number of non-zero elements
int solutionindex=-1;
ep solution=NULL; // the element in the candidate linked list
// that is the actual solution
double max_success=0.0L;
node_rec_t nt[MAX_NEIGHBORS];
memset(d,0,(sizeof(struct info) * MAX_NEIGHBORS));
memset(nt, 0, (sizeof(node_rec_t) * MAX_NEIGHBORS));
// First, sort by importance
sort_importance(s->status.nodelist, MAX_NEIGHBORS);
// Count up to the first zero element. That's the number of
// elements on the binomial tree
for (i=0; i<MAX_NEIGHBORS; i++) {
if (s->status.nodelist[i].id!=0
&& s->status.nodelist[i].retx_count==0) {
num_elements++;
} else
break;
}
if (num_elements==0) {
elog(LOG_NOTICE, "0 Elements in nodelist\n");
// we're done, nothing to nack
return 0;
}
// Now assign values to the d array
elog(LOG_NOTICE, "%d elements in nodelist\n", num_elements);
for (i=0; i<num_elements; i++) {
d[i].data=s->status.nodelist[i].importance;
d[i].index=i;
}
// Create the trees
elog(LOG_NOTICE, "Creating tree with %d elements\n", num_elements);
head[0]=create_tree(d, num_elements);
// assign the rest of the head pointers
assign_heads(head);
// Now, for each head, calculate the sums of its nodes
// this also populates the candidates table of linked lists
elog(LOG_NOTICE, "Min_accuracy constraint=%d\n", s->constr.min_accuracy);
for (i=0; i<num_elements; i++) {
ep tmp;
calculate_sums_with_constraint(head[i], head[i]->degree,
s->constr.min_accuracy,
&candidates[i], head[i]);
tmp=candidates[i];
// printf("Candidate list # %d contents\n", i+1);
// ANY HANDLING OF THE LIST SHOULD GO HERE!
while(tmp!=NULL) {
struct node *n=tmp->element;
int sum=0;
double success=1.0L;
while (n!=NULL) {
/*
printf("%d(%d) ",
s->status.nodelist[n->data.index].importance,
s->status.nodelist[n->data.index].id);
*/
sum+=n->data.data;
if ((s->status.nodelist[n->data.index].conn_from) < 1e-6) {
success*=0.01; // make 0% 1% so as to avoid
// zeroes in the success rate
} else {
success*=(s->status.nodelist[n->data.index].conn_from/
(float)(100 *
s->status.nodelist[n->data.index].nack_count+1));
}
if ((s->status.nodelist[n->data.index].conn_to) < 1e-6) {
success*=0.1; // make 0% 1% so as to avoid
// zeroes in the success rate
} else {
success*=(s->status.nodelist[n->data.index].conn_to/
(float)100);
}
// if this is a full tree, then NULL is the terminating
// clause. If not, head[i] is the one (since there exists a
// parent of head[i], but is invalid for the subtree)
if (n!=head[i]) {
n=n->parent;
} else {
n=NULL;
}
}
success*=100;
if (success-max_success > 1e-20) {
max_success=success;
// NOTE: there is no tie-breaking meckanism
// if several success scores are equal, this will pick the
// first one
solutionindex=i;
solution=tmp;
}
/* printf(" -- Sum = %d, Success probability = %6.3f%%\n",
sum, success);
fflush(stdout);
*/
tmp=tmp->next;
}
}
if ( (solution != NULL)
&& (solutionindex > -1)
&& (solutionindex < num_elements)) {
// We have a solution!
struct node *n=solution->element;
printf("Solution (index=%d) --> ", solutionindex);
j=0;
while (n!=NULL) {
j++;
printf("%d(%d %7.4f %7.4f) ",
s->status.nodelist[n->data.index].id,
s->status.nodelist[n->data.index].importance,
s->status.nodelist[n->data.index].conn_from,
s->status.nodelist[n->data.index].conn_to);
// copy the solution nodes to their own table, then use
// that table to nack them
// Assign scores too
memcpy(&nt[j-1], &(s->status.nodelist[n->data.index]),
sizeof(node_rec_t));
nt[j-1].score=assign_score(&nt[j-1]);
printf("Score[%d]=%6.4f\n", j-1, nt[j-1].score);
if (n!=head[solutionindex]) {
n=n->parent;
} else {
n=NULL;
}
}
/*
// One last check
if (s->status.nodelist[n->data.index].retx_count==0) {
s->status.nodelist[n->data.index].nack_count++;
send_nack(s, s->status.nodelist[n->data.index].id,
s->status.nodelist[n->data.index].nack_count);
nacks_out++;
if (nacks_out>2) {
elog(LOG_ERR,
"Maximum number of nacks per round reached\n");
break;
}
} else {
elog(LOG_ERR,
"This element shouldn't be in the solution list!\n");
}
if (n!=head[solutionindex]) {
n=n->parent;
} else {
n=NULL;
}
}
*/
fflush(stdout);
} else {
printf("No solution?! (%d, %d)\n", (int)solution, solutionindex);
fflush(stdout);
}
printf("\n");
fflush(stdout);
// Sort the nt table based on the scores
sort_scores(nt, j);
// Now go ahead and nack up to MAX_NACKS_PER_ROUND
for (i=0; i<j; i++) {
int index=-1;
index=get_index(s, nt[i].id);
if (index>-1) {
s->status.nodelist[index].nack_count++;
send_nack(s, s->status.nodelist[index].id,
s->status.nodelist[index].nack_count);
nacks_out++;
} else {
elog(LOG_ERR, "Invalid index!\n");
}
if (nacks_out >MAX_NACKS_PER_ROUND) {
elog(LOG_ERR,
"Maximum number of nacks per round reached (%d)\n", nacks_out);
break;
}
}
// we're done, delete the lists
for (i=0; i<num_elements;i++) {
while (candidates[i]!=NULL) {
candidates[i]=delete_element(candidates[i]);
}
}
// Now delete the tree
delete_tree(&head[0]);
fflush(stdout);
return 0;
}
// This is the utility function
// score = a*importance - b*cost
// importance is normalized on a scale of 1-100
// cost is 100-link_status
// link status ranges from 0 to 100 and is a combination of conn_to and
// conn_from
//
// a and b are weighting factors
// currently, a and b are both 1
//
float assign_score(node_rec_t *noderec)
{
float retval=0.0;
if (noderec->retx_count == 0) {
retval = (float)((float)(noderec->importance)/(float)(10)) +
(float)(noderec->conn_from) + (float)(noderec->conn_to);
} // leave the value at zero if we got a reply
return retval;
}
// a simple insertsort algorithm, since the number of elements that need to
// be sorted is so small
// sorted elements are in decreasing order
void sort_scores(node_rec_t noderec[], int count)
{
int i,j;
node_rec_t tmp_element;
for (i=0; i<count; i++) {
/*
if (noderec[i].id==0 || noderec[i].retx_count > 0)
continue;
*/
for (j=i+1; j<count; j++) {
/*
if (noderec[j].id==0)
continue;
*/
if (noderec[j].score - noderec[i].score > 1e-10) {
// swap the elements
memcpy(&tmp_element, &(noderec[i]), sizeof(node_rec_t));
memcpy(&(noderec[i]), &(noderec[j]), sizeof(node_rec_t));
memcpy(&(noderec[j]), &tmp_element, sizeof(node_rec_t));
}
}
}
}
// same as above, only for importance
void sort_importance(node_rec_t noderec[], int count)
{
int i,j;
node_rec_t tmp_element;
for (i=0; i<count; i++) {
// imul and jmul are multipliers that bias the sorting algorithm
// If I have received this packet before, they will be set to 0 so
// the element will have a modified importance of 0 (for sorting
// purposes only)
int8_t imul;
if (noderec[i].retx_count==0) {
imul=1;
} else {
imul=0;
}
for (j=i+1; j<count; j++) {
int8_t jmul;
if (noderec[j].retx_count==0) {
jmul=1;
} else {
jmul=0;
}
if ((noderec[j].importance*jmul)>(noderec[i].importance*imul)) {
// swap the elements
memcpy(&tmp_element, &(noderec[i]), sizeof(node_rec_t));
memcpy(&(noderec[i]), &(noderec[j]), sizeof(node_rec_t));
memcpy(&(noderec[j]), &tmp_element, sizeof(node_rec_t));
}
}
}
}
See more files for this project here