Code Search for Developers
 
 
  

emproxy.c from EmStar at Krugle


Show emproxy.c syntax highlighted

/*
 *
 * Copyright (c) 2003 The Regents of the University of California.  All 
 * rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Neither the name of the University nor the names of its
 *   contributors may be used to endorse or promote products derived
 *   from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */
 

/*
 *  emProxy
 *
 *  This program provides a UDP interface that enables a remote client
 *  to watch device interfaces on an em* system.
 *
 *  $Id: emproxy.c,v 1.28 2006/07/16 10:05:50 girod Exp $
 */

char emproxy_c_id[] = "$Id: emproxy.c,v 1.28 2006/07/16 10:05:50 girod Exp $";

#include <stdio.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>

#include <libmisc/misc.h>
#include "emproxy_i.h"
#include <emrun/emrun.h>

#define _GNU_SOURCE
#include <getopt.h>
#undef _GNU_SOURCE

/* prototypes */
static void emproxy_status_update();
static int emproxy_handle_device(void *new_buffer, size_t size, void *data);

/* queue functions */
QUEUE_FUNCTION_INSTANTIATIONS(emproxy_client, _, clients, struct emp_client, struct emp_state); 
QUEUE_FUNCTION_INSTANTIATIONS(emproxy_request, _, requests, struct emp_request, struct emp_client);
QUEUE_FUNCTION_INSTANTIATIONS(emproxy_command, _, commands, struct emp_command, struct emp_client);
QUEUE_FUNCTION_INSTANTIATIONS(emproxy_cached_node, _, nodes, struct cached_emp_node, struct emp_state);

/* 
 * global emProxy state var 
 */

struct emp_state emproxy_state = {
  server_fd: -1
};

/* default parameters for requests */
struct emp_request template = {
  client_opts: {
    read_refractory: 1000,
    handler: emproxy_handle_device
  },
  data_id: -1
};

/*
 * Client lookup / creation code
 */

struct emp_client *
emproxy_lookup_client_by_addr(struct emp_state *es, struct sockaddr *addr, int addr_len, int src_proto, int src_iface)
{
  struct emp_client *ptr;

  /* search current clients */
  for (ptr = emproxy_client_top(es); ptr; ptr = emproxy_client_next(ptr)) {
    /* compare return addresses */
    if ((ptr->addr_len == addr_len) && 
	(ptr->src_proto == src_proto) &&
	(ptr->src_iface == src_iface) &&
	(memcmp(&(ptr->src_addr), addr, addr_len) == 0)) {
      /* got a match! */
      return ptr;
    }
  }

  /* create a new client */
  ptr = g_new0(struct emp_client, 1);
  ptr->addr_len = addr_len;
  ptr->src_proto = src_proto;
  ptr->src_iface = src_iface;
  ptr->parent = es;

  if (ptr->addr_len > sizeof(ptr->src_addr)) {
    elog(LOG_WARNING, "Address length mismatch!");
    ptr->addr_len = sizeof(ptr->src_addr);
  }

  memmove(&(ptr->src_addr), addr, ptr->addr_len);
  ptr->last_heard = time(0);

  /* add to list */
  emproxy_client_push(es, ptr);

  emproxy_gw_update_client(ptr, es);

  return ptr;
}


char *emproxy_unparse_client(struct emp_client *client)
{
  DECLARE_STATIC_BUF_RING(str, 5, 80);
  
  switch (client->src_proto) {
  case EMPROXY_PROTO_LINK:
    sprintf(str, "%s:link", print_if_id(client->src_addr.sin_addr.s_addr));
    break;

  case EMPROXY_PROTO_UDP:
    return net_unparse_address(&(client->src_addr));

  default:
    sprintf(str, "%s:unknown", print_if_id(client->src_addr.sin_addr.s_addr));
  }
  
  return str;
}


/*
 *  request comparison
 */

static
int emproxy_request_cmp(struct emp_request *r1, struct emp_request *r2) 
{
  struct emp_request R1 = *r1;
  struct emp_request R2 = *r2;

  /* clear string pointers */
  R1.client_opts.argument = NULL;
  R2.client_opts.argument = NULL;
  R1.client_opts.devname = NULL;
  R2.client_opts.devname = NULL;
  R1.client_opts.private_data = NULL;
  R2.client_opts.private_data = NULL;

  /* memcmp */
  if ((R1.dst_node != R2.dst_node) ||
      (R1.data_id != R2.data_id) ||
      memcmp(&(R1.client_opts), &(R2.client_opts), sizeof(R1.client_opts)) ||
      misc_safe_strcmp(r1->client_opts.devname, r2->client_opts.devname) ||
      misc_safe_strcmp(r1->client_opts.argument, r2->client_opts.argument))
    return 1;
  
  /* same */
  return 0;
}


/* 
 * destroy request destroys sub allocations, does not free request itself
 */

static
void emproxy_request_destroy(struct emp_request *req)
{
  /* free the request subelements */
  if (req->client_opts.argument) {
    free(req->client_opts.argument);
    req->client_opts.argument = NULL;
  }

  if (req->client_opts.devname) {
    free(req->client_opts.devname);
    req->client_opts.devname = NULL;
  }  
}


/*
 *  request and client cancelation
 */

static 
void emproxy_cancel_request(struct emp_client *client, struct emp_request *req)
{
  /* cancel event */
  g_status_client_destroy(req->event);
  
  /* dequeue and destroy */
  emproxy_request_remove(client, req);
  emproxy_request_destroy(req);
  free(req);

  /* log message */
  elog(LOG_DEBUG(0), "Client: %s, canceled request id %d", 
       emproxy_unparse_client(client), req->data_id);
}


static
void emproxy_cancel_all_pending(struct emp_client *client)
{
  /* cancel all the requests */
  struct emp_request *er;
  while ((er = emproxy_request_top(client)))
	emproxy_cancel_request(client, er);

  /* cancel all the commands */
  struct emp_command *cmd;
  while ((cmd = emproxy_command_top(client)))
	emproxy_command_cancel(cmd);
}


static
void emproxy_cancel_client(struct emp_client *client)
{
  emproxy_cancel_all_pending(client);

  /* kill timer */
  g_event_destroy(client->send_timer);

  /* remove from queue and free */
  emproxy_client_remove(&emproxy_state, client);
  free(client);
}


/*
 *  packing and sending to network
 */


int emproxy_send_data(struct emp_client *client, char *buf, size_t len)
{
  int status = -1;

  switch (client->src_proto) {
  case EMPROXY_PROTO_LINK: {
    buf_t *pkt_buf = buf_new();
    int status;
    
    link_pkt_t hdr = {
      dst: { id: client->src_addr.sin_addr.s_addr },
      type: PKT_TYPE_EMPROXY,
      max_hops: 10
    };
    bufcpy(pkt_buf, &hdr, sizeof(hdr));
    char reply = EMPROXY_LINK_REPLY_TYPE;
    bufcpy(pkt_buf, &reply, sizeof(reply));
    bufcpy(pkt_buf, buf, len);

    status = lu_multi_send(client->parent->link_ref, client->src_iface, 
			   (link_pkt_t*)pkt_buf->buf, pkt_buf->len - sizeof(link_pkt_t));
    buf_free(pkt_buf);
    break;
  }
  
  case EMPROXY_PROTO_UDP:
    status = sendto(emproxy_state.server_fd, buf, len, 0, (struct sockaddr *)&(client->src_addr), client->addr_len);
    break;

  default:
    elog(LOG_CRIT, "unknown protocol");
  }

  if (status < 0) {
    elog(LOG_WARNING, "Client: %s, error sending message len %d: %m",
	 emproxy_unparse_client(client), len);
  }

  return status;
}


/* flush the current packet from the client's aggregation buffer */
static
int emproxy_flush_data(struct emp_client *client)
{ 
  int status = 0;  
  if (client->udp_len > 0)
    status = emproxy_send_data(client, client->udp_buf, client->udp_len);
  client->udp_len = 0;
  g_event_destroy(client->send_timer);
  return status;
}

/* timeout to flush after a delay */
static
int emproxy_do_send(void *data, int interval, g_event_t *event) 
{
  emproxy_flush_data((struct emp_client *)data);
  return TIMER_DONE;
}


int emproxy_push_data(struct emp_client *client, struct emproxy_reply_hdr *reply)
{
  /* determine outgoing MTU */
  uint16_t outbound_mtu = 0;

  switch (client->src_proto) {
  case EMPROXY_PROTO_UDP:
    outbound_mtu = EMPROXY_UDP_THRESHOLD;
    break;

  case EMPROXY_PROTO_LINK: {
    lu_multi_link_t *lu = lu_multi_index_link(client->parent->link_ref, client->src_iface);
    if (lu == NULL || lu_get_mtu(lu_multi_link_get_lu(lu), &outbound_mtu) < 0) {
      elog(LOG_WARNING, "unable to get MTU for link: %m");
      return -1;
    }
    if (outbound_mtu <= 0) {
      elog(LOG_WARNING, "MTU for link too small %d: %m", outbound_mtu);
      return -1;
    }
    /* room for link type code */
    outbound_mtu--;
    break;
  }

  default:
    elog(LOG_WARNING, "unknown protocol");
  }
  
  /* compute the data length */
  int data_len = reply->data_length + sizeof(struct emproxy_reply_hdr);

  /* if the new data would put the message over threshold, send the current packet now */
  if ((data_len + client->udp_len) > outbound_mtu) 
    emproxy_flush_data(client);
  
  /* append the new data if it is under threshold */
  if (data_len < outbound_mtu) {
      
    /* if this is the first thing queued, set timer to guarantee send */
    if (client->udp_len == 0) {
      g_timer_add(EMPROXY_AGGREGATION_DELAY, emproxy_do_send, client, NULL, &(client->send_timer));
    }
    
    /* copy the new data in */
    memmove(client->udp_buf + client->udp_len, (char*)reply, data_len);
    client->udp_len += data_len;    
  }
  
  /* otherwise send it now */
  else 
    emproxy_send_data(client, (char*)reply, data_len);

  return 0;
}


/*
 *  process device data
 */

static
int emproxy_handle_device(void *new_buffer, size_t size, void *data)
{
  struct emp_request *req = (struct emp_request *)data;
  int out_buf[65536/sizeof(int)];
  struct emproxy_reply_hdr *hdr = (struct emproxy_reply_hdr *)out_buf;
  int max_size = sizeof(out_buf) - sizeof(struct emproxy_reply_hdr);

  elog(LOG_DEBUG(0), "Client: %s, got data on ID %d",
       emproxy_unparse_client(req->parent), req->data_id);

  /*
   * construct a return buffer and send it 
   */
  
  hdr->node_id = my_node_id;
  hdr->data_id = req->data_id;
  gettimeofday(&(hdr->report_time), NULL);
  req->last_heard = hdr->report_time.tv_sec;

  /* max size.. */
  if (size > max_size) {
    elog(LOG_WARNING, "Truncating very large response from device %s (%d bytes)",
	 req->client_opts.devname, size);
    size = max_size;
  }
  hdr->data_length = size;

  /* copy the data */
  memmove(hdr->data, (char*)new_buffer, size);

  /* push the update */
  emproxy_push_data(req->parent, hdr);

  /* free the status report! */
  if (new_buffer) 
    free(new_buffer);

  return EVENT_RENEW;
}



/*
 *  process requests 
 */

static
void emproxy_add_request(struct emp_client *client, struct emp_request *req)
{
  struct emp_request *er;

  /* 
   * first check to make sure the request is valid 
   */

  /* verify ID present */
  if (req->data_id < 0) {
    if (req->client_opts.devname) 
      elog(LOG_WARNING, "Client: %s, ignoring request with no ID", 
	   emproxy_unparse_client(client));
    goto free;
  }

  /* verify file name present */
  if (req->client_opts.devname == NULL) {
    elog(LOG_WARNING, "Client: %s, ignoring request with no device specified", 
	 emproxy_unparse_client(client));
    goto free;
  }

  /*
   * check for redundant request 
   */
  for (er = emproxy_request_top(client); er; er = emproxy_request_next(er)) {
    if (emproxy_request_cmp(req, er) == 0) {
      elog(LOG_DEBUG(0), "Request already present.. ignoring");
      er->mark = 0;
      goto free;
    }
  }

  /* 
   * add new request: copy and don't free 
   */

  er = g_copy(struct emp_request, req);
  er->mark = 0;
  er->parent = client;
  er->client_opts.private_data = er;
  emproxy_request_push(client, er);

  /* 
   * spawn the necessary event.
   * test node id if dst node is set
   */

  if ((!er->dst_node || er->dst_node == my_node_id) &&
      ((my_node_id != SIM_COMPONENT_ID) || (er->dst_node == my_node_id)) &&
      !client->parent->gw_only)
    g_status_client_full(&(er->client_opts), &(er->event));
  
  return;

 free:
  emproxy_request_destroy(req);
}


#define CASE(str) \
  if (strncmp(key, (str), strlen(str)) == 0)

static
void emproxy_parse_pair(struct emp_request *req, char *key, char *value)
{
  CASE("ascii") {
    req->client_opts.read_as_ascii = 1;
  }

  if (value) {
    CASE("dev") {
      req->client_opts.devname = strdup(sim_path(value));
    }

    CASE("arg") {
      req->client_opts.argument = strdup(value);
      req->client_opts.arg_len = strlen(value);
    }

    CASE("reread") {
      req->client_opts.reread_period = atoi(value);
    }

    CASE("ratelimit") {
      req->client_opts.read_refractory = atoi(value);
    }
    
    CASE("node") {
      req->dst_node = atoi(value); 
      if (req->dst_node == -1) 
	req->dst_node = SIM_COMPONENT_ID;
    }
    
    CASE("id") {
      req->data_id = atoi(value); 
    }
  }
}



static
void emproxy_parse_header_pair(struct emp_client *client, struct client_message *msg, 
			       char *key, char *value)
{
  if (value == NULL) {
    elog(LOG_WARNING, "Parse error in header form client %s: no value for key %s",
	 emproxy_unparse_client(client), key);
    return;
  }

  /* check the session nonces */
  CASE("session") {
    int session = atoi(value);
      
    /* clear all requests if nonces mismatch */
    if (session != client->session) {
      if (client->session) 
	elog(LOG_NOTICE, "Session nonce mismatch.. clearing old requests for client %s",
	     emproxy_unparse_client(client));
      emproxy_cancel_all_pending(client);
    }
    client->session = session;
  }
  
  /* parse the type key */
  CASE("type") {
    key = value;
    CASE("request") {
      msg->request_type=EMPROXY_REQ_REQUEST;	      
    }
    CASE("command") {
      msg->request_type=EMPROXY_REQ_COMMAND;
    }
    return;
  }

  CASE("seq") {
    msg->seq_no = atoi(value);
  }

  CASE("src") {
    msg->src = atoi(value);
  }

  CASE("dest") {
    msg->dest_node = atoi(value);
  }  
}


static
void emproxy_process_request(struct emp_state *es, char *request, int req_len,
			     struct sockaddr *src_addr, int addr_len, int src_proto, int src_iface)
{
  parser_state_t *ps = misc_parse_init(request, MISC_PARSE_COLON_SCHEME);
  struct emp_request new_req = template;
  struct emp_request *er;
  struct client_message msg = {
    request_type: EMPROXY_REQ_REQUEST
  };
  
  /* lookup the client */
  struct emp_client *client;
  client = emproxy_lookup_client_by_addr(es, src_addr, addr_len, src_proto, src_iface);
  
  /* update client heard time */
  client->last_heard = time(0);

  /* parse the request header 
   * format: session=4534534:type=request\n  */

  while (misc_parse_next_kvp(ps) >= 0) {
    emproxy_parse_header_pair(client, &msg, ps->key, ps->value);
    if ((ps->pair_delimit == '\n') || (ps->pair_delimit == 0))
      break;
  }

  /* filter on destination node */
  if (msg.dest_node && (my_node_id != msg.dest_node))
    goto out;

  /* handle request type... */
  switch (msg.request_type) {

  case EMPROXY_REQ_REQUEST:

    /* mark all requests */ 
    for (er = emproxy_request_top(client); er; er = emproxy_request_next(er)) 
      er->mark = 1;
    
    /* parse the request buffer
     * format: key=val:...\n */
    
    while (misc_parse_next_kvp(ps) >= 0) {
      emproxy_parse_pair(&new_req, ps->key, ps->value);
      if ((ps->pair_delimit == '\n') || (ps->pair_delimit == 0)) {
	emproxy_add_request(client, &new_req);
	memmove(&new_req, &template, sizeof(new_req));
      }
    }
    emproxy_request_destroy(&new_req);
    
    /* gc any marked requests */
    for (er = emproxy_request_top(client); er; ) {
      struct emp_request *tmp = er;
      er = emproxy_request_next(er);
      if (tmp->mark) {
	elog(LOG_DEBUG(0), "Removing old request %s", tmp->client_opts.devname);
	emproxy_cancel_request(client, tmp);
      }
    }
    break;

  case EMPROXY_REQ_COMMAND:

    /* ignore if from us and local */
    if (es->ignore_local_cmds && my_node_id == msg.src) {
      goto out;
    }

    while (misc_parse_next_kvp(ps) >= 0) {
      if (strcmp(ps->key, "cmd") == 0) {
	int length;
	char *raw = misc_parse_get_raw_value(ps, &length);
	emproxy_process_command(es, client, &msg, raw, length);
	break;
      }
    }
    break;

  default:
    elog(LOG_WARNING, "Unhandled request type %d", msg.request_type);
    break;

  }

  /* trigger client status update */
  emproxy_status_update();

 out:
  misc_parse_cleanup(ps);
}


static 
int emproxy_request_handler(void *data, int fd, int cond, g_event_t *event)
{ 
  if (cond == FUSD_NOTIFY_INPUT) {
    char request[65536];
    int msg_len;
    uint addr_len;
    struct sockaddr_in src_addr;
    
    /* receive message */
    addr_len = sizeof(src_addr);
    msg_len = recvfrom(fd, request, sizeof(request)-1, 0, (struct sockaddr *)&src_addr, &addr_len);

    elog(LOG_DEBUG(0), "Got request: %s", request);

    /* read error? */
    if (msg_len < 0) {
      elog(LOG_CRIT, "Read error from server socket: %m");
      exit(1);
    }

    /* forward if we are demuxing */
    if (emproxy_state.proxy_demux) {
      buf_t *buf = buf_new();
      bufcpy(buf, (char*)&src_addr, sizeof(struct sockaddr_in));
      bufcpy(buf, request, msg_len);
      pd_receive(emproxy_state.proxy_demux, buf->buf, buf->len);
      buf_free(buf);
    }

    /* null-term */
    request[msg_len] = 0;

    /* process request */
    emproxy_process_request(&emproxy_state, request, msg_len+1, 
			    (struct sockaddr *)&src_addr, addr_len, EMPROXY_PROTO_UDP, 0);
  }
  
  else {
    elog(LOG_CRIT, "Exception from udp server socket: %m");
    exit(1);
  }

  return EVENT_RENEW;
}


static
int emproxy_link_handler(lu_multi_link_t *lu_l, link_pkt_t *pkt, ssize_t data_len)
{
  struct sockaddr_in src_addr;
  src_addr.sin_addr.s_addr = pkt->src.id;

  /* null term */
  char *buf = (char*)pkt;
  if (data_len > 0 && pkt->data[0] == EMPROXY_LINK_REQUEST_TYPE) {
    memmove(buf, pkt->data + 1, data_len - 1);
    buf[data_len-1] = 0;
    
    /* process request */
    emproxy_process_request(&emproxy_state, buf, data_len,
			    (struct sockaddr *)&src_addr, sizeof(src_addr), 
			    EMPROXY_PROTO_LINK, lu_multi_link_get_index(lu_l));
  }
  else 
    elog(LOG_DEBUG(0), "dropping message, len=%d, first char is 0x%x", 
	 data_len, data_len > 0 ? pkt->data[0] : 0);
  
  free(pkt);
  return EVENT_RENEW;
}


static
int emproxy_demux_handler(void *buffer, ssize_t size, pd_client_context_t *client)
{
  struct emp_state *state = (struct emp_state *)(pd_client_opts(client)->data);
  char bufcpy[65536];
  struct sockaddr_in addr;

  /* check length */
  if (size < sizeof(struct sockaddr_in)) {
    elog(LOG_WARNING, "Received bogus message from emproxy demux, len=%d", size);
    goto out;
  }
  
  /* parse out the sockaddr */
  memmove(&addr, buffer, sizeof(struct sockaddr_in));

  /* parse out data and null-term */
  size -= sizeof(struct sockaddr_in);
  if (size > sizeof(bufcpy)-1) {
    elog(LOG_WARNING, "Received msg too large from demux, %d", size);
    goto out;
  }
  memmove(bufcpy, buffer+sizeof(struct sockaddr_in), size);
  bufcpy[size] = 0;

  elog(LOG_DEBUG(0), "Got request: %s", bufcpy);

  /* process it */
  emproxy_process_request(state, bufcpy, size+1, (struct sockaddr *)&addr,
			  sizeof(struct sockaddr_in), EMPROXY_PROTO_UDP, 0);
  
 out:
  free(buffer);
  return EVENT_RENEW;
}


static
int emproxy_link_configure(lu_context_t *lu)
{
  int arg=1;
  elog(LOG_DEBUG(0), "configuring link for loop mode");
  if (lu_ioctl(lu, PD_SET_LOOP_MODE, &arg) < 0) {
    elog(LOG_WARNING, "failed to configure link for loop mode: %m");
  }
  return EVENT_RENEW;
}


static
int emproxy_open_link(struct emp_state *emp)
{
  lu_multi_opts_t opts = {
    lu_opts: {
      opts: {
	pkt_type: PKT_TYPE_EMPROXY
      },
      configure: emproxy_link_configure
    },
    if_class: LINK_CLASS_NETWORK,
    receive: emproxy_link_handler,
    withhold_warning: 1
  };
  
  if (lu_open_multi(&opts, &(emp->link_ref)) < 0) {
    elog(LOG_WARNING, "Unable to open network link class: %m");
    return -1;
  }
  
  return 0;
}


static
int emproxy_config_port()
{
  /* get a socket */
  emproxy_state.server_fd = socket(AF_INET, SOCK_DGRAM,0);
  if (emproxy_state.server_fd < 0) {
    elog(LOG_CRIT, "Socket call failed: %m");
    exit(1);
  }

  /* if we are in_sim, and we are NOT the master,
   * then we open the master emproxy device and
   * watch that */

  if (in_sim && !sim_is_component()) {
    pd_client_opts_t opts = {
      devname: EMPROXY_DEMUX_DEV, 
      receive: emproxy_demux_handler, 
      data: &emproxy_state
    };

    if (pd_client_open(&opts, NULL) < 0) {
      elog(LOG_CRIT, "Unable to open demux device: %m");
      exit(1);
    }
  }

  /* if we are not sim (or we ARE the master), we bind the socket
   * and watch it */

  else {
    g_event_opts_t opts = {
      event_name: "udp server",
      close_on_destroy: 1
    };

    /* configure address */
    struct sockaddr_in serv_addr = {};
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(emproxy_get_port());
    
    /* bind the socket */
    if (bind(emproxy_state.server_fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
      elog(LOG_CRIT, "Can't bind socket: %m");
      elog(LOG_CRIT, "... perhaps another emproxy is running?  I'll just hang out then");
      return -1;
    }

    /* Install event to handle UDP */
    if (g_event_add(emproxy_state.server_fd, FUSD_NOTIFY_INPUT | FUSD_NOTIFY_EXCEPT,
		    emproxy_request_handler, NULL, &opts, NULL) < 0) {
      elog(LOG_CRIT, "Can't install udp server event: %m");
      exit(1);
    }
  }

  return 0;
}


/*
 *  garbage collection
 */

static
int emproxy_gc_timeout(void *data, int interval, g_event_t *event) 
{
  int now = time(0);
  struct emp_client *ptr;
  int change = 0;

  /* check for clients that are aging out */ 
  for (ptr = emproxy_client_top(&emproxy_state); ptr; ) { 
    struct emp_client *tmp = ptr;
    ptr = emproxy_client_next(ptr);

    /* gc the old commands */
    emproxy_command_gc_now(tmp);
    
    if ((now - tmp->last_heard) > EMPROXY_CLIENT_TIMEOUT) {
      elog(LOG_DEBUG(0), "Client: %s, timing out!",
	   emproxy_unparse_client(tmp));
      emproxy_cancel_client(tmp);
      change = 1;
    }
  }

  /* trigger update */
  if (change)
    emproxy_status_update();
  
  return TIMER_RENEW;
}


/*
 *  emproxy status device
 */


static
int emproxy_status_printable(status_context_t *info, buf_t *buf)
{
  struct emp_client *ptr;
  struct emp_request *er;
  int now = time(0);

  bufprintf(buf, "Active Emproxy clients:\n"); 

  for (ptr = emproxy_client_top(&emproxy_state); ptr; 
       ptr = emproxy_client_next(ptr)) {

    bufprintf(buf, " %s: (req %d sec ago) [session nonce=%x]\n", 
	      emproxy_unparse_client(ptr),
	      now - ptr->last_heard, ptr->session);
    
    if (emproxy_request_top(ptr)) {
      bufprintf(buf, "Requests:\n");
      for (er = emproxy_request_top(ptr); er;
	   er = emproxy_request_next(er)) {    
	
	bufprintf(buf, "  %d: %s, ", er->data_id, er->client_opts.devname);
	if (er->client_opts.argument)
	  bufprintf(buf, "arg=%s, ", er->client_opts.argument);
	if (er->client_opts.read_as_ascii)
	  bufprintf(buf, "ascii, ");
	if (er->client_opts.reread_period)
	  bufprintf(buf, "reread=%d, ", er->client_opts.reread_period);
	if (er->client_opts.read_refractory)
	  bufprintf(buf, "ratelimit=%d, ", er->client_opts.read_refractory);
	if (er->dst_node) {
	  if (er->dst_node == my_node_id)
	    bufprintf(buf, "dst=US, ");
	  else if (er->dst_node == SIM_COMPONENT_ID)
	    bufprintf(buf, "dst=SIM, ");
	  else
	    bufprintf(buf, "dst=%d, ", er->dst_node);
	}
	if (er->last_heard)
	  bufprintf(buf, "last read %d sec ago\n", now - er->last_heard);
	else
	  bufprintf(buf, "never read\n");
      }
    }

    if (emproxy_command_top(ptr)) {
      struct emp_command *cmd;
      bufprintf(buf, "Commands:\n");
      int now = time(0);
      for (cmd = emproxy_command_top(ptr); cmd; cmd = emproxy_command_next(cmd)) {
	bufprintf(buf, "  \"%s\" (%d bytes)\n", cmd->command, cmd->command_length);
	bufprintf(buf, "    seqno=%d/dst=%d: ", cmd->msg.seq_no, cmd->msg.dest_node);
	if (cmd->time_started) bufprintf(buf, "Started %d secs ago, ", now - cmd->time_started);
	if (cmd->time_ended) bufprintf(buf, "Completed %d secs ago, ", now - cmd->time_ended);
	bufprintf(buf, "pid=%d ", cmd->child_pid);
	if (cmd->exited) bufprintf(buf, "exited, status=%x", cmd->exit_status);
	bufprintf(buf, "\n");
      }
    }

    bufprintf(buf, "\n");
  }

  return STATUS_MSG_COMPLETE;
}


static
void emproxy_status_init()
{
  status_dev_opts_t opts = {
    device: {
      devname: EMPROXY_STATUS_DEV
    },
    printable: emproxy_status_printable
  };
  
  g_status_dev(&opts, &(emproxy_state.proxy_status));
}

static
void emproxy_status_update()
{
  g_status_dev_notify(emproxy_state.proxy_status);
}


/*
 *  proxy demux device for emsim
 */

void emproxy_demux_init()
{
  packet_dev_opts_t opts = {
    device: {
      devname: EMPROXY_DEMUX_DEV
    }
  };
  if (g_packet_dev(&opts, &(emproxy_state.proxy_demux)) < 0) {
    elog(LOG_CRIT, "Unable to create proxy demux device: %m");
    exit(1);
  }
}


/*
 *  Main Program and Initialization
 */

void usage(char *s)
{
  fprintf(stderr, "Usage: %s [-h] [--gw-only] [--ignore-local-cmds]\n"
	  "  Use ignore-local-cmds to disable local execution of commands \n"
	  "  (e.g. on control laptop) \n", s);
  exit(1);
}


void emproxy_shutdown(void *data)
{
  elog(LOG_NOTICE, "EmProxy shutting down..");
  exit(0);
}


int main(int argc, char *argv[])
{
  emrun_opts_t emrun_opts = {
    shutdown: emproxy_shutdown,
  };
  int dormant = 0;

  /* emstar generic init */
  misc_init(&argc, argv, CVSTAG);

  /* enable demux if we are a sim component */
  if (sim_is_component())
    emproxy_state.sim_demux = 1;

  emproxy_state.gw_only = misc_parse_out_switch(&argc, argv, "gw-only", 0);  
  emproxy_state.ignore_local_cmds = misc_parse_out_switch(&argc, argv, "ignore-local-cmds", 0);  

  /* Bind to UDP port, install event handler */
  dormant = (emproxy_config_port() < 0);

  if (misc_args_remain(&argc, argv)) {
    usage(argv[0]);
  }

  /* do rest of setup only if we're not "dormant" */
  if (!dormant) {
    /* open the network class */
    emproxy_open_link(&emproxy_state);

    /* Create client list device */
    emproxy_status_init();
    
    /* Create request summary device */
    emproxy_gw_init(&emproxy_state);

    /* Create demux device if requested */
    if (emproxy_state.sim_demux)
      emproxy_demux_init();
    
    /* create gc timer */
    g_timer_add(EMPROXY_GC_INTERVAL, emproxy_gc_timeout, NULL,
		NULL, NULL);

    /* set signal handler */
    emproxy_command_handler_init(&emproxy_state);
  }

  /* Register with emrun and start the event loop */
  emrun_init(&emrun_opts);
  elog(LOG_NOTICE, "Emproxy service starting%s",
       emproxy_state.sim_demux ? ", sim_demux mode" : "");
  g_main();

  elog(LOG_CRIT, "event loop exited unexpectedly!");
  return 1;
}





See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  emproxy.c
  emproxy_cmd.c
  emproxy_gw.c
  emproxy_i.h