Code Search for Developers
 
 
  

dts_exec.c from EmStar at Krugle


Show dts_exec.c syntax highlighted



char dts_exec_c_cvsid[] = "$Id: dts_exec.c,v 1.7 2007/01/08 01:06:24 mlukac Exp $";

#include "dts_i.h"


/*
  TODO: 
  garbage collect commands - need to add timer to dts_main
*/

static
int dts_exec_toss_it(void *data, int fd, int cond, g_event_t *ev)
{
  char buf[4096];
  
  if (cond == FUSD_NOTIFY_INPUT) {
    int status = read(fd, buf, sizeof(buf));
    if (status < 0) {
      if (errno == EAGAIN) goto out;
      elog(LOG_WARNING, "unusual error reading from shell pipe: %m"); 
      goto done;
    }
    else if (status == 0)
      goto done;
    else
      goto out;
  }

  else if (cond) {
    elog(LOG_WARNING, "exception on shell pipe");
    goto done;
  }

  else {
    elog(LOG_WARNING, "event handler triggered with no event flag?");
    goto done;
  }

 done:
  close(fd);
  return EVENT_DONE;

 out:
  return EVENT_RENEW;
}


void dts_exec_command_cancel(dts_command_el_t *dtsce, exec_command_t *cmd)
{
  elog(LOG_DEBUG(15), "command cancel");
  if (cmd == NULL) return;
  //  if (cmd->parent) emproxy_command_remove(cmd->parent, cmd);
  if (cmd->command) free(cmd->command);
  if (cmd->response) buf_free(cmd->response);

  g_event_destroy(cmd->response_event);

  if (cmd->output_fd >= 0) {
    if (g_event_add(cmd->output_fd, FUSD_NOTIFY_INPUT | FUSD_NOTIFY_EXCEPT,
		    dts_exec_toss_it, NULL, NULL, NULL) < 0) {
      elog(LOG_WARNING, "Can't create event %m");
    }
  }  

  free(cmd);
  dtsce->exec_cmd = NULL;
}

static
void dts_exec_command_reply(dts_t *dts, exec_command_t *cmd, int return_value)
{

  elog(LOG_DEBUG(15), "command reply");
  /* record completion time */
  cmd->time_ended = time(0);

  dts_response_t *response = 
    dts_response_create_response(dts, cmd->dtsce, cmd->response,
				 return_value);

  dts_response_el_t *dtsre = 
    dts_response_find_or_create(dts, response, 
				sizeof(dts_response_t) + 
				buf_len(cmd->response) + 1,
				NULL);

  dts_ondisk_store_new_response(dts, dtsre);
  
  dts_response_push_responses(dts);

  free(response);
  
  if (dts->command_exec_counter > 0) {
    dts_command_trigger_exec(dts);
  }

}

static
int dts_exec_command_handle_response(void *data, int fd, int cond, g_event_t *ev)
{
  elog(LOG_DEBUG(15), "handel response");

  exec_command_t *cmd = (exec_command_t *) data;
  char buf[4096];
  memset(buf, 0, 4096);
  if (cond == FUSD_NOTIFY_INPUT) {
    int status = read(fd, buf, sizeof(buf));
    elog(LOG_DEBUG(5), "got some result: %i: %s", status, buf);
    if (status < 0) {
      if (errno == EAGAIN) goto out;
      if (errno == EPIPE) goto done;
      elog(LOG_WARNING, "unusual error reading from shell pipe: %m"); 
      goto done;
    }
    else if (status == 0)
      goto done;
    else {
      if (cmd->response->len < EXEC_COMMAND_MAXRESP) {
	bufcpy(cmd->response, buf, status);
	if (cmd->response->len >= EXEC_COMMAND_MAXRESP) {
	  bufcpy(cmd->response, "- DTS: Max len", 14); 
	  dts_exec_command_reply(cmd->dts, cmd, -255);
	  //	  dts_exec_command_cancel(cmd->dtsce, cmd);
	}
      }
      goto out;
    }
  }

  else if (cond) {
    elog(LOG_WARNING, "exception on shell pipe: %m");
    goto done;
  }

  else {
    elog(LOG_WARNING, "event handler triggered with no event flag?");
    goto done;
  }

 done:
  //  dts_exec_command_reply(cmd->dts, cmd, -253);
  //  dts_exec_command_cancel(cmd->dtsce, cmd);
  close(fd);
  cmd->output_fd = -1;
  return EVENT_DONE;

 out:
  return EVENT_RENEW;
}



int dts_exec_process_command(dts_t *dts, dts_command_el_t *dtsce)
{
  
  exec_command_t *cmd = NULL;
  int req_len = dtsce->length - sizeof(dts_command_t);

  dtsce->executed = 1;

  /* create new command struct */
  cmd = g_new0(exec_command_t, 1);
  cmd->child_pid = -1;
  cmd->output_fd = -1;
  //  cmd->msg = *msg;
  cmd->dts = dts;
  cmd->dtsce = dtsce;
  dtsce->exec_cmd = cmd;

  cmd->command = malloc(req_len + 1);
  memmove(cmd->command, dtsce->command->data, req_len);
  cmd->command[req_len] = 0;
  cmd->command_length = req_len;
  cmd->time_started = time(0);
  
  /* spawn the process.. */
  
  /* create a pipe */
  int in_fds[2];
  int out_fds[2];
  if (pipe(in_fds) < 0) {
    elog(LOG_WARNING, "Can't create pipe: %m");
    goto fail;
  }
  if (pipe(out_fds) < 0) {
    elog(LOG_WARNING, "Can't create pipe: %m");
    goto fail;
  }

  /* fork */
  int pid = fork();

  /* parent.. */
  if (pid > 0) {
    int in = in_fds[0];
    int out = out_fds[1];

    /* close other side of pipe */
    close(in_fds[1]);
    close(out_fds[0]);

    cmd->output_fd = in;
    cmd->child_pid = pid;
    set_nonblock(in, 1);

    /* write the commands */
    if (write_to_fd(out, cmd->command, cmd->command_length) < 0) {
      elog(LOG_WARNING, "Can't write command strings: %m");
      goto fail;
    }
    close(out);

    /* create response buffer */
    cmd->response = buf_new();

    /* set up event to read back.. */
    if (g_event_add(in, FUSD_NOTIFY_INPUT | FUSD_NOTIFY_EXCEPT,
		    dts_exec_command_handle_response, 
		    cmd, NULL, &(cmd->response_event)) < 0) {
      elog(LOG_WARNING, "Can't create event: %m");
      goto fail;
    }
  }

  /* child... */
  else if (pid == 0) {
    struct rlimit limit;
    int in = out_fds[0];
    int out = in_fds[1];

    /* reinstate default signal handlers */
    signal(SIGCHLD, SIG_DFL);
    signal(SIGTERM, SIG_DFL);
    signal(SIGHUP,  SIG_DFL);
    signal(SIGINT,  SIG_DFL);
 
    /* clear the signal mask - may cause a sigint to be delivered */
    sigset_t sigset;
    sigemptyset(&sigset);
    sigprocmask(SIG_SETMASK, &sigset, NULL);

    /* detach from parent's signals */
    setpgrp();
    
    /* close all file descriptors.  If rlimit doesn't work, guess
     * there are 256 of them. */
    if (getrlimit(RLIMIT_NOFILE, &limit) < 0)
      limit.rlim_cur = 256;

    /* close everything except pipes */
    int i;
    for (i = 0; i < limit.rlim_cur; i++)
      if (i != in && i != out) close(i);
    
    /* connect stdout and stderr to pipe */
    if (out != STDOUT_FILENO && dup2(out, STDOUT_FILENO) != STDOUT_FILENO)
      exit(253);
    if (out != STDERR_FILENO && dup2(out, STDERR_FILENO) != STDERR_FILENO)
      exit(252);
    if (in != STDIN_FILENO && dup2(in, STDIN_FILENO) != STDIN_FILENO)
      exit(251);

    /* close orig.. */
    if (out != STDOUT_FILENO && out != STDERR_FILENO && out != STDIN_FILENO)
      close(out);
    if (in != STDOUT_FILENO && in != STDERR_FILENO && in != STDIN_FILENO)
      close(in);

    /* ok, exec the shell.. */
    execl("/bin/sh", "/bin/sh", NULL);
    
    /* failed?? */
    exit(250);
  }
  
  /* fork failed */
  else {
    elog(LOG_WARNING, "Can't fork!!!");
    goto fail;
  }

  dts_ondisk_store_exec_command(dts, dtsce);

  return 0;

 fail:
  close(in_fds[0]);
  close(in_fds[1]);
  close(out_fds[0]);
  close(out_fds[1]);
  bufcpy(cmd->response, "- DTS: Failed", 13);
  dts_exec_command_reply(dts, cmd, -253);
  dts_exec_command_cancel(dtsce, cmd);
  return 1;
}

static
int dts_exec_command_gc_now(dts_t *dts, dts_command_el_t *dtsce)
{
  int now = time(0);
  exec_command_t *tmp = dtsce->exec_cmd;
  /*  if (tmp->time_ended &&
      ((now - tmp->time_ended) > EXEC_COMMAND_LINGER)) {
    dts_exec_command_cancel(tmp);
  }
  else */
  if ((tmp->time_ended == 0) && tmp->time_started &&
      ((now - tmp->time_started) > EXEC_COMMAND_MAXWAIT)) {
    elog(LOG_WARNING, "timing out on command");
    bufcpy(tmp->response, "- DTS: Timeout", 14);
    dts_exec_command_reply(dts, tmp, -254);
    //    dts_exec_command_cancel(dtsce, tmp);
  }

  return 0;
}

int dts_exec_command_gc_timer(void *data, int interval, g_event_t *event)
{
  dts_t *dts = (dts_t *) data;
  dts_command_el_t *dtsce = NULL;
  for (dtsce = dts_commands_top(dts); dtsce; 
       dtsce = dts_commands_next(dtsce)) {

    if (dtsce->executed && dtsce->exec_cmd) {
      dts_exec_command_gc_now(dts, dtsce);
    }

  }
  return EVENT_RENEW;
}


static
int dts_exec_command_handle_sigchild(void *data, int signo, g_signal_context_t *sig_event)
{
  elog(LOG_DEBUG(15), "command handle sigchild");
  pid_t pid;
  int status;
  dts_t *dts = (dts_t *)data;

  while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
    
    /* search for command that matches this PID */
    dts_command_el_t *dtsce = NULL;
    exec_command_t *cmd = NULL;
    for (dtsce = dts_commands_top(dts); 
	 dtsce; dtsce = dts_commands_next(dtsce)) {
      if (dtsce->exec_cmd)
	if (dtsce->exec_cmd->child_pid == pid) {
	  cmd = dtsce->exec_cmd;
	  goto found;
	}
    }

    /* not found.. */
    elog(LOG_ERR, "error: process with unknown pid %d exited!", pid);
    continue;

  found:
    elog(LOG_DEBUG(0), "caught %s (pid %d), exit status=%d, signal=%d", 
	     //elog(LOG_WARNING, "caught %s (pid %d), exit status=%d, signal=%d", 
	 signal_name(signo), pid,
	 WEXITSTATUS(status), WTERMSIG(status));
    cmd->exit_status = status;
    cmd->exited = 1;

    /* read any remaining data.. */
    if (cmd->output_fd >= 0)
      dts_exec_command_handle_response(cmd, cmd->output_fd, FUSD_NOTIFY_INPUT, NULL);

    /* reply */
    dts_exec_command_reply(dts, cmd, status);
    //    dts_exec_command_cancel(dtsce, cmd);
  }

  if (pid < 0 && errno != ECHILD) 
    elog(LOG_ERR, "calling waitpid: %m");

  return EVENT_RENEW;

}


void dts_exec_command_handler_init(dts_t *dts)
{
  g_signal_opts_t opts = {
    signo: SIGCHLD, 
    callback: dts_exec_command_handle_sigchild,
    data: dts
  };
  if (g_signal_handler(&opts, NULL) < 0) {
    elog(LOG_CRIT, "can't attach to sigchld signal: %m");
    exit(1);
  }
  if (g_timer_add(2000, dts_exec_command_gc_timer,
		  dts, NULL, &(dts->command_gc_timer)) < 0) {
    elog(LOG_CRIT, "Unable to create command gc timer: %m");
    exit(1);
  }
}


int sort_dts_commands(const void *a, const void *b)
{
  return ( ((dts_command_el_t *) b)->command->sequence_number -
	   ((dts_command_el_t *) a)->command->sequence_number );
}

int dts_command_exec_timer_fired(void *data, int interval, g_event_t *event)
{   
  dts_t * dts = (dts_t *) data;
  
  elog(LOG_DEBUG(15), "exec timer fire");
 
  int num_cmds = dts_commands_qlen(dts);

  uint32_t oldest_seqno = 0;
  dts_command_el_t *exec_me = NULL;
  dts_command_el_t *(command_view[num_cmds]);
  
  if (num_cmds == 0) {
    goto done;
  }

  dts_command_fill_array(dts, command_view);

  
  qsort(command_view, num_cmds, sizeof(dts_command_el_t *),
	sort_dts_commands);


  /* test print */
  int k = 0;
  for (k = 0; k < num_cmds; ++k) {
    if (!command_view[k]->executed)
      elog(LOG_WARNING, "To exec: Cmd %i: src: %i seq: %i",
	   k, command_view[k]->command->command_src_node,
	   command_view[k]->command->sequence_number);
  }


  /* NOT HAPPY WITH THIS!!! */

  int i = 0;
  for (i = 0; i < num_cmds; ++i) {
    if (command_view[i]->executed == 0) {
      oldest_seqno = dts_sequence_numbers_find_for_node(dts, 
							command_view[i]->command->command_src_node);
      if (oldest_seqno == 0)
	continue;
      if (oldest_seqno == command_view[i]->command->sequence_number) {
	exec_me = command_view[i];
	break;
      }
      /* find if everything between oldest_seqno and this commands
	 sequence number are executed */
      int j = 0;
      int needed = command_view[i]->command->sequence_number - oldest_seqno;
      for (j = 0; j < num_cmds; ++j) {
	if ( (command_view[j]->command->command_src_node
	      == command_view[i]->command->command_src_node)
	     && 
	     (command_view[j]->command->sequence_number 
	      < command_view[i]->command->sequence_number)
	     &&
	     (command_view[j]->command->sequence_number 
	      >= oldest_seqno) ) {
	  needed -= 1;
	}
      }
      if (needed == 0) {
	exec_me = command_view[i];
	break;
      }
    }
  }
  
  if (exec_me == NULL) {
    goto done;
  }

  dts->command_exec_counter -= 1;

  elog(LOG_DEBUG(4), "csn: %i el: %i cdst: %i me: %i",
       exec_me->command->command_src_node,
       exec_me->dont_exec_localy,
       exec_me->command->command_dst_node,
       my_node_id
       );
  /* final check... make sure we do this locally */
  if (exec_me->command->command_src_node == my_node_id &&
      exec_me->dont_exec_localy) {
    exec_me->executed = 1;
    dts_response_do_response_ack(dts, exec_me);
    elog(LOG_WARNING, "No local set, not executing");
    goto again;
  }

  if (exec_me->command->command_dst_node != 0 &&
      exec_me->command->command_dst_node != my_node_id) {
    exec_me->executed = 1;
    dts_response_do_response_ack(dts, exec_me);
    elog(LOG_WARNING, "Not the dts, not executing");
    goto again;
  }

  int res = 0;
  switch(exec_me->command->command_type) {
  case DTS_COMMAND_TYPE_DEFAULT:
    elog(LOG_WARNING, "Executing %i:%i", 
	 exec_me->command->command_src_node,
	 exec_me->command->sequence_number);
    res = dts_exec_process_command(dts, exec_me);
    break;
  case DTS_COMMAND_TYPE_TRANSFER:
    res = dts_exec_xfer_process_xfer(dts, exec_me);
    break;
  case DTS_COMMAND_TYPE_STATUS:
    res = dts_exec_status_process_status(dts, exec_me);
    break;
  default:
    elog(LOG_WARNING, "Unknown command type %x",
	 exec_me->command->command_type);
    res = 1;
  }
  /* this means something failed so we cannot rely on
     someone else to startup the timer again */
   
  if (res == 0) {
    goto done;
  }


 again:
  if (dts->command_exec_counter > 0) {
    return EVENT_RENEW;
  }

 done:
  elog(LOG_DEBUG(10), "Done exec for now... resched for 15 seconds");
  g_timer_resched(dts->command_exec_timer, 15000);
  //  g_event_destroy(event);
  //  dts->command_exec_timer = NULL;
  return EVENT_RENEW;
  //  return TIMER_RENEW_MS(15000);
}




See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  TODO
  dts_command.c
  dts_exec.c
  dts_exec.h
  dts_exec_status.c
  dts_exec_xfer.c
  dts_i.h
  dts_main.c
  dts_md5.c
  dts_neighbor.c
  dts_ondisk.c
  dts_report.c
  dts_response.c
  dts_sequence_numbers.c
  dts_shell.c
  dts_shell.h
  dts_status.c
  dts_tester.c
  dtsh.c