Code Search for Developers
 
 
  

mhf_net.c from EmStar at Krugle


Show mhf_net.c syntax highlighted


/*
 *
 * Copyright (c) 2004 The Regents of the University of California.  All 
 * rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Neither the name of the University nor the names of its
 *   contributors may be used to endorse or promote products derived
 *   from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */
 
#include "mhf_i.h"


void mhf_routes_update(mhf_state_t *mh);


static
void *mhf_net_start_msg(retx_context_t *r)
{
  mhf_per_link_t *mhp = (mhf_per_link_t *)retx_get_data(r);

  /* get the MTU */
  lu_multi_link_t *link = lu_multi_index_link(mhp->parent->link_ref, mhp->iface);
  lu_context_t *lu = lu_multi_link_get_lu(link);
  uint16_t MTU = 200;
  if (lu_get_mtu(lu, &MTU) < 0)
    elog(LOG_WARNING, "can't get MTU for interface %d", mhp->iface);

  /* create packet header */
  link_pkt_t hdr = {
    dst: {
      id: LINK_BROADCAST
    },
    type: mhp->parent->pkt_type
  };

  buf_t *pkt = buf_new();
  bufcpy(pkt, &hdr, sizeof(hdr));

  /* initialize target */
  mhp->curr_target.byte = 0;

  /* create iterator */
  ssync_msg_iter_t *iter = ssync_msg_iter_new(NULL, 0, MTU);
  ssync_msg_iter_init_msg(iter, 0, pkt);

  /* return it ... */
  return iter;
}


static
int mhf_net_append_msg
(retx_context_t *r, void *msg_state, source_t *src,
 uint8_t list_index, log_seqno_t seqno, 
 uint8_t entry_type, char *entry_data, uint8_t entry_len)
{
  mhf_per_link_t *mhp = (mhf_per_link_t *)retx_get_data(r);
  ssync_msg_iter_t *iter = (ssync_msg_iter_t *)msg_state;

  /* define flow index */
  uint8_t flow_index=0;
  ssync_msg_iter_maybe_flow_map(iter, &flow_index, &(src->flow_id), src->hops_away+1);

  /* if this is a NOP going out, append the seqno */
  if (entry_type == RETX2_CTRL_NOP) {
    per_flow_state_t *pf = mhf_maybe_init_source(src);
    entry_data = (char*)&(pf->refresh_seqno);
    entry_len = sizeof(int16_t);
  }

  return ssync_msg_iter_append_msg
    (iter, entry_type, flow_index, mhp->curr_target,
     list_index, seqno, 0, 0, entry_data, entry_len);
}


static
int mhf_net_append_nack
(retx_context_t *r, void *msg_state, source_t *src, int is_init,
 uint8_t list_index, log_seqno_t seqno, uint16_t nack_count, int retry)
{
  mhf_per_link_t *mhp = (mhf_per_link_t *)retx_get_data(r);
  ssync_msg_iter_t *iter = (ssync_msg_iter_t *)msg_state;
  
  /* choose our retransmitter */
  per_flow_state_t *pf = mhf_maybe_init_source(src);
  cl_index_t target_index = { byte: 0 };
  if (pf->upstream.if_id && !retry) {
    ssync_msg_iter_maybe_addr_map(iter, &target_index, pf->upstream.if_id);
  }

  /* define flow index */
  uint8_t flow_index=0;
  ssync_msg_iter_maybe_flow_map(iter, &flow_index, &(src->flow_id), src->hops_away+1);
  
  /* save this new target */
  mhp->curr_target = target_index;
  return ssync_msg_iter_append_msg(iter, RETX2_CTRL_NACK, flow_index, 
				   mhp->curr_target, list_index, seqno,
				   is_init, nack_count, NULL, 0);  
}


static
int mhf_net_complete_msg(retx_context_t *r, void *msg_state)
{
  mhf_per_link_t *mhp = (mhf_per_link_t *)retx_get_data(r);
  ssync_msg_iter_t *iter = (ssync_msg_iter_t *)msg_state;
  int retval = -1;

  buf_t *pkt = ssync_msg_iter_finalize_msg(iter);
  
  /* only send if there is actually data there */
  if (pkt->len > (sizeof(link_pkt_t) + sizeof(ssync_hdr_t))) {
    link_pkt_t *hdr = (link_pkt_t *)pkt->buf;
    
    /* send it ! */
    retval = lu_multi_send(mhp->parent->link_ref, mhp->iface, 
			   hdr, pkt->len - sizeof(link_pkt_t));
    if (retval < 0) {
      elog(LOG_WARNING, "unable to send message to interface %d: %m", mhp->iface);
    }
    else {
      if (mhp->parent->no_throttle)
	retval = RETX_NODELAY;
    }
  }
  
  /* clean up */
  ssync_msg_iter_destroy(iter);
  buf_free(pkt);
  
  return retval;
}


static
int mhf_net_get_msg_len(retx_context_t *r, void *msg_state)
{
  ssync_msg_iter_t *iter = (ssync_msg_iter_t *)msg_state;
  return ssync_msg_iter_get_len(iter);
}


static
void mhf_net_new_link(lu_multi_context_t *lu_m, lu_multi_link_t *l)
{
  mhf_per_link_t *mhp = (mhf_per_link_t *)g_new0(mhf_per_link_t, 1);
  mhp->parent = (mhf_state_t *)lu_multi_link_get_data(l);
  lu_multi_link_set_data(l, mhp);
  mhp->iface = lu_multi_link_get_index(l);

  if (lu_multi_get_if_id(lu_m, mhp->iface, &(mhp->if_id)) < 0) {
    elog(LOG_CRIT, "failed to get if id: %m");
    exit(1);
  }
  
  retx_opts_t ro = {
    start_msg: mhf_net_start_msg,
    append: mhf_net_append_msg,
    append_nack: mhf_net_append_nack,
    complete_msg: mhf_net_complete_msg,    
    get_msg_len: mhf_net_get_msg_len,    
    iface: mhp->iface,
    private_data: mhp,
    ssync: mhp->parent->ref,
    refresh_msg_mode: 1,
    fast_flood: mhp->parent->no_throttle,
    fast_retx: mhp->parent->fast_refresh,
    rate: mhp->parent->rate,
    variance: mhp->parent->rate * 0.025,
  };

  if (retx_new(&ro, &(mhp->retx)) < 0) {
    elog(LOG_CRIT, "unable to start retx protocol");
    exit(1);
  }
}


static
int mhf_timed_out(void *data, int interval, g_event_t *ev)
{
  source_t *src = (source_t *)data;
  per_flow_state_t *pf = mhf_maybe_init_source(src);

  elog(LOG_NOTICE, "Source timed out!");

  g_event_destroy(ev);
  pf->timed_out = 1;
  ssync_source_set_hidden(src, 1);

  return TIMER_DONE;
}


static
int mhf_net_input(lu_multi_link_t *lu_l, link_pkt_t *pkt, ssize_t data_len)
{  
  mhf_per_link_t *mhp = (mhf_per_link_t *)lu_multi_link_get_data(lu_l);
  mhf_state_t *mh = mhp->parent;

  /* check for looped packets */
  if (pkt->src.id == mhp->if_id) {
    elog(LOG_WARNING, "dropping looped packet");
    goto out;
  }

  /* handle statesync packets */
  if ((pkt->type == mh->pkt_type) &&
      (data_len >= sizeof(ssync_hdr_t))) {
    ssync_hdr_t *msg = (ssync_hdr_t *)(pkt->data);
    if (msg->version == SSYNC_RETRANS_V2) {

      ssync_msg_iter_t *iter = 
	ssync_msg_iter_new((char *)msg->data, data_len - sizeof(*msg), 0);
      
      for (ssync_msg_iter_top(iter); ssync_msg_iter_valid(iter); 
	   ssync_msg_iter_next(iter)) {
	
	/* we ignore some commands: */
	switch (iter->command) {
	case RETX2_CTRL_SEQNO: 
	case RETX2_CTRL_LIST: 
	case RETX2_CTRL_ADDR: 
	case RETX2_CTRL_FLOW: 
	  goto skip;

	case 0xD:
	case 0xE:
	  elog(LOG_WARNING, "unrecognized command %d", iter->command);
	  goto skip;
	  
	default:
	  break;
	}

	/* skip flow index 0.. that's cluster sync */
	if (iter->flow_index == 0)
	  goto skip;

	if (iter->curr_flow.id.src == 0) {
	  elog(LOG_WARNING, "no flow id set");
	  goto skip;
	}

	/* lookup or create source */
	source_t *target_src =
	  ssync_source_lookup(mh->ref, &(iter->curr_flow.id));
	if (target_src == NULL) {
	  if (iter->curr_flow.id.src != my_node_id)
	    target_src = ssync_source_create(mh->ref, &(iter->curr_flow.id), 0, 0);
	  else {
	    elog(LOG_WARNING, "not creating local source from data from net!");
	    goto skip;
	  }
	}
	per_flow_state_t *pf = mhf_maybe_init_source(target_src);
	int maybe_refresh = 0;

	/* $$$ need to filter hopcount? */

	/* set hops and upstream neighbor */
	if (!ssync_source_is_local(target_src)) {
	  
	  /* if it's a NOP.. */
	  if (iter->command == RETX2_CTRL_NOP) {
	    
	    /* verify length */
	    if (iter->length == sizeof(int16_t)) {
		int16_t seqno;
		memmove(&seqno, iter->msg, sizeof(int16_t));

		/* if this seqno is newer, forward it, and reset our min hopcount */
		if ((seqno - pf->refresh_seqno) > 0 ||
		    (seqno - pf->refresh_seqno) < -3) {
		  pf->refresh_seqno = seqno;
		  maybe_refresh = 1;
		  target_src->hops_away = -1;
		  elog(LOG_DEBUG(0), "forwarding refresh, hop %d/%d, from %d",
		       iter->curr_flow.hops, target_src->flow_id.max_hops,
		       target_src->flow_id.src);
		}
	    }
	    else 
	      elog(LOG_WARNING, "refresh message, wrong length");
	  }

	  if (target_src->hops_away < 0 || target_src->hops_away > iter->curr_flow.hops) {
	    target_src->hops_away = iter->curr_flow.hops;
	    pf->upstream.if_id = pkt->src.id;
	    pf->upstream.interface = mhp->iface;
	    mhf_routes_update(mh);
	  }
	}

	/* set forwarding based on hopcount */
	int in_hopcount = (target_src->hops_away <= target_src->flow_id.max_hops);
	int fwd_hopcount = (target_src->hops_away < target_src->flow_id.max_hops);
	memset(pf->forward_vector, boolify(fwd_hopcount), sizeof(pf->forward_vector));

	/* set pending if we got new data, and either we have overflood set
	 * or we are still in the fwd hopcount region */
	if ((mh->overflood || fwd_hopcount) && maybe_refresh)
	  target_src->log.refresh_pending = 1;  

	elog(LOG_DEBUG(0), "msg %d from %d to %d, fwd %d in %d",
	     iter->command, target_src->flow_id.src, my_node_id,
	     fwd_hopcount, in_hopcount);

	/* update the expect refresh timer */
	if (!ssync_source_is_local(target_src)) {
	  pf->timed_out = 0;
	  g_event_destroy(pf->expect_refresh_timer);
	  int expire = MHF_SEQNO_REFRESH * 2.5;
	  if (mh->slow_refresh) expire *= MHF_SLOW_REFRESH_FACTOR;
	  g_timer_add(expire, mhf_timed_out, target_src, NULL, 
		      &(pf->expect_refresh_timer));
	}
	
	/* based on hopcount, (un)hide */
	ssync_source_set_hidden(target_src, pf->timed_out || !in_hopcount);
	
	/* if no target, or our if id targeted, set ourselves as the target */
	cl_id_t src_id = {};
	cl_id_t target_id = {};
	if ((iter->target.byte == 0) ||
	    (mhp->if_id == iter->if_addr))
	  target_id.node_id = my_node_id;
	
	/* process this into retx protocol */
	if (iter->command == RETX2_CTRL_NACK) 
	  retx_process_nack(mhp->retx, &src_id, &target_id,
			    target_src, iter->nack_init,
			    iter->nack_list_index, iter->nack_seqno, 
			    iter->nack_count);
	
	else {
	  if (!ssync_source_is_local(target_src)) {
	    retx_process_msg(mhp->retx, &src_id, &target_id,
			     target_src, iter->command,
			     iter->list_index, iter->seqno, 
			     iter->msg, iter->length, 0);
	  }
	}

      skip:
	continue;
      }
      
      ssync_msg_iter_destroy(iter);      
    }
    
    else 
      elog(LOG_DEBUG(0), "dropping non-sync packet");
  }
  
 out:
  free(pkt);
  return EVENT_RENEW;
}


static
int mhf_routes_print(status_context_t *info, buf_t *buf)
{
  mhf_state_t *mh = (mhf_state_t *) sd_data(info);
  
  bufprintf(buf, "Routing Table, node %d\n", my_node_id);
  bufprintf(buf, "-----------------------\n");
  
  source_t *ptr;
  for (ptr=ssync_sources_top(&(mh->ref->sources)); ptr;
       ptr=ssync_sources_next(ptr)) {
    per_flow_state_t *pf = mhf_maybe_init_source(ptr);

    bufprintf(buf, "%s: ", ssync_flowid_to_str(&(ptr->flow_id)));

    bufprintf(buf, "%s%s",
	      ptr->hidden ? "**HIDDEN** " : "",
	      ssync_source_is_local(ptr) ? "**SOURCED** " : "");

    bufprintf(buf, "\n");

    if (!ptr->hidden && !ssync_source_is_local(ptr)) {
      if (pf->upstream.if_id) 
	bufprintf(buf, "    NextHop: %s, iface %d, %d hops away\n", 
		  print_if_id(pf->upstream.if_id), pf->upstream.interface,
		  ptr->hops_away);
    }

    /* dump routing state */
    int i;
    int first = 1;
    for (i=0; i<MAX_IFACES; i++) {
      if (pf->forward_vector[i]) {
	if (first)
	  bufprintf(buf, "    Forward to ifaces: ");
	bufprintf(buf, "%d, ", i);
	first = 0;
      }
    }
    if (!first) bufprintf(buf, "\n");
    
    /* dump retx state */
    retx_state_to_buf(buf, ptr, "    ");
    
  }
  bufprintf(buf, "\n");

  return STATUS_MSG_COMPLETE;
}


static
int mhf_routes_bin(status_context_t *info, buf_t *buf)
{
  mhf_state_t *mh = (mhf_state_t *) sd_data(info);
  
  source_t *ptr;
  for (ptr=ssync_sources_top(&(mh->ref->sources)); ptr;
       ptr=ssync_sources_next(ptr)) {
    if (!ptr->hidden && !ssync_source_is_local(ptr)) {
      per_flow_state_t *pf = mhf_maybe_init_source(ptr);
      if (pf->upstream.if_id && !pf->timed_out) {
	route_entry_t entry = {
	  dst: ptr->flow_id.src,
	  next_hop: pf->upstream.if_id,
	  interface: pf->upstream.interface,
	  hops_metric: ptr->hops_away
	};
	bufcpy(buf, &entry, sizeof(entry));
      }
    }
  }

  route_entry_t entry = {};
  bufcpy(buf, &entry, sizeof(entry));

  return STATUS_MSG_COMPLETE;
}


void mhf_routes_update(mhf_state_t *mh)
{
  g_status_dev_notify(mh->routes_status);
}


void mhf_net_init(mhf_state_t *mh, int *argc, char **argv)
{
  /* options */
  mh->no_throttle = misc_parse_out_switch(argc, argv, "no_throttle", 0);

  /* parse optional packet type */
  mh->pkt_type = PKT_TYPE_STATESYNC;
  char *type = misc_parse_out_option(argc, argv, "pkt_type", 0);
  if (type) mh->pkt_type = link_parse_pkt_type(type);
  if (mh->pkt_type < 0) {
    elog(LOG_CRIT, "Unable to parse packet type '%s'", type);
  }
  else if (type) {
    elog(LOG_NOTICE, "Using packet type %s (%d)", type, mh->pkt_type);
  }
  if (misc_parse_option_as_int(argc, argv, "send_rate", 0, &(mh->rate)) < 0) {
    if (mh->fast_refresh) {
      mh->rate = 50;
    }
  }  

  /* parse link config flags */
  mh->if_class = misc_parse_out_option(argc, argv, "if_class", 0);
  mh->neighbors = misc_parse_out_option(argc, argv, "neighbors", 0);

  /* options struct for opening a link user connection */
  lu_multi_opts_t opts = {
    lu_opts: {
      opts: {
	pkt_type: mh->pkt_type,
	data: mh
      },
    },
    if_class: mh->if_class ? mh->if_class : LINK_CLASS_LINK,
    neighbors_list: mh->neighbors,
    receive: mhf_net_input,
    new_link: mhf_net_new_link
  };
      
  if (lu_open_multi(&opts, &(mh->link_ref)) < 0) {
    elog(LOG_CRIT, "Can't open link class %s: %m", opts.if_class);
    exit(1);
  }

  /* create routing output */
  status_dev_opts_t t_opts = {
    device: {
      devname: ssync_path(mh->ref->opts.prefix, SSYNC_SUFFIX_ROUTES),
      device_info: mh
    },
    printable: mhf_routes_print,
    binary: mhf_routes_bin
  };
  
  if (g_status_dev(&t_opts, &(mh->routes_status)) < 0) {
    elog(LOG_CRIT, "Unable to create routes status device %s: %m",
	 t_opts.device.devname);
    exit(1);
  }

}





See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  mhf_devs.c
  mhf_i.h
  mhf_main.c
  mhf_net.c