Code Search for Developers
 
 
  

reassembler.c from EmStar at Krugle


Show reassembler.c syntax highlighted

/* Reassembler: Reassemble files dropped off in ACKED_DIR_PATTERN
   Periodically check ACKED_DIR for directories using scandir.
   
   fopen  in ACKED_DIR/NAME.tmp
   
   Loop through files ACKED_DIR/NAME using ftw.
   
   write to ACKED_DIR/NAME.tmp using fseek / fwrite
   close when finished.
   rename file to ASSEMBED_DIR/NAME
*/

#define _GNU_SOURCE

#include <ftw.h>
#include <stdlib.h>
#include <glib.h>
#include <stdint.h>
#include <errno.h>
#include <dirent.h>
#include "libmisc/file.h"
#include "libmisc/misc_buf.h"
#include "emrun/emrun.h"
#include "libmisc/misc_sim.h"
#include "libdev/status_client.h"
#include "devel/block_tree/block.h"

extern int errno;
extern int in_sim;



typedef struct
{
  uint8_t sinkstatus;
  status_client_context_t * status_context;
  char * assembled_dir;
  char * acked_dir;
  FILE * fp; 
} reassembler_state_t;

reassembler_state_t * global_state = NULL;


static FILE *
my_fopen (const char *fname, const char *opts)
{
  elog (LOG_INFO, " ");
  FILE *fp = fopen (fname, opts);
  if (fp == NULL)
    {
      elog (LOG_ERR, "Error while trying to open %s: %m", fname);
    }
  return fp;
}

// Reads block into a buf_t
// Does sanity checking of block's length and offset.
// Writes to correct portion of the temporary file: state->fp
// Frees the buf_t.
int
ftw_process_file (const char * file, const struct stat * sb, int flag)
{
  elog (LOG_INFO, " ");
  // Setup
  reassembler_state_t * state = global_state;
  buf_t * block_buf = buf_new ();
  if (file_to_buf (block_buf, file) < 0)
    {
      elog (LOG_ERR, "Unable to read file: %s: %m", file);
      goto final;
    }
  
  // Check
  if (block_buf->len < sizeof (block_t))
    {
      elog (LOG_ERR, "File is smaller than block_t: %s ", file);
      goto final;
    }
  block_t * block = (block_t *) block_buf->buf;
  if (block_buf->len != (sizeof (block_t) + block->length))
  {
    elog (LOG_ERR, "File size does not equal (block_t size + block->length): %s",
	  file);
    goto final;
  }
  if ( (block->offset + block->length) > block->uid.total_length )
    {
      elog (LOG_ERR, "Block claims to have an offset and length that goes beyond the block's total_length: file: %s, offset: %d, length: %d, total_length: %d",
	    file, block->offset, block->length, block->uid.total_length);
      goto final;
    }
  
  // Work
  fseek (state->fp, block->offset, SEEK_SET);
  size_t items_written = fwrite (block->data, 1, block->length, state->fp);
  if (items_written != block->length)
    {
      elog (LOG_ERR,
	    "Error writing data from block to file: %s, offset %d, length %d",
	    file, block->offset, block->length);
      goto final;
    }

 final:
  buf_free (block_buf);
  return 0;
}


int
nftw_remove_files (const char * file, const struct stat * sb, int flag,
		   struct FTW * s)
{
  elog (LOG_INFO, "Removing file: %s", file);
  if (remove (file) < 0)
    elog (LOG_ERR, "Unable to remove file %s", file);

  return 0;
}


static void
delete_dir (const char * dir)
{
  elog (LOG_INFO, " ");
  if (nftw (dir, nftw_remove_files, 4, FTW_DEPTH) < 0)
    {
      elog (LOG_INFO, "nftw encountered an error on %s: %m", dir);
    }
}

// Open tmp file
// For each file in the fragmented dir, write to tmp file.
// Close the tmp file
// Rename tmp file to final file.
// Delete the tmp file.
// Delete the old directory
static void
reassembler_process_dir (reassembler_state_t * state, const char * dirname)
{
  elog (LOG_INFO, " ");
  buf_t * dirpath = buf_new ();
  buf_t * tmpfile = buf_new ();
  buf_t * finalfile = buf_new ();
  
  bufprintf (dirpath, "%s/%s", state->acked_dir, dirname);
  bufprintf (tmpfile, "%s/TEMPFILE", state->assembled_dir);
  bufprintf (finalfile, "%s/%s", state->assembled_dir, dirname);

  // make sure the parent directory exists before renaming tmp file
  if (mkdir_with_parents (state->assembled_dir) < 0)
    {
      elog (LOG_ERR, "Unable to create directory: %s", state->assembled_dir);
      goto final;
    }

  state->fp = my_fopen (tmpfile->buf, "w");
  if (state->fp == NULL)
    {
      elog (LOG_ERR, "Error opening file: %s: %m", tmpfile->buf);
      goto final;
    }
  
  if (ftw (dirpath->buf, ftw_process_file, 4) < 0)
    {
      elog (LOG_CRIT, "ftw encountered an error on %s: %m",
	    dirpath->buf);
      goto final;
    }
  
  fclose (state->fp);
  state->fp = NULL;

  if (rename (tmpfile->buf, finalfile->buf) < 0)
    {
      elog (LOG_ERR, "Unable to rename %s --> %s: %m", tmpfile->buf, finalfile->buf);
      goto final;
    }

  
 final:
  if (state->fp)
    fclose (state->fp);
  state->fp = NULL;

  // Clean up tmp file and directory
  if (remove (tmpfile->buf) < 0)
    {
      elog (LOG_ERR, "Unable to remove file: %s", tmpfile->buf);
    }
  delete_dir (dirpath->buf);
  buf_free (dirpath);
  buf_free (tmpfile);
  buf_free (finalfile);
}

// Filter out everything except directories not equal to "." and ".."
static int
scandir_filter (const struct dirent * ent)
{
  elog (LOG_INFO, " ");
  if (strcmp (ent->d_name, ".") == 0)
    return 0;
  if (strcmp (ent->d_name, "..") == 0)
    return 0;
  else
    return 1;
}

// Scan acked_dir for directories.
// For each found directory, attempt to reassemble the associated file
// and place it into assembled_dir.
static void
reassembler_timer_helper (reassembler_state_t * state)
{
  elog (LOG_INFO, " ");
  struct dirent **namelist = NULL;
  int n;
  n = scandir(state->acked_dir, &namelist, scandir_filter, NULL);
  if (n < 0)
    {
      elog (LOG_CRIT, "scandir failed while scanning: %s: %m", state->acked_dir);
      goto final;
    }
  
  while (n--)
    {
      reassembler_process_dir (state, namelist[n]->d_name);
      free (namelist [n]);
    }
  free (namelist);

  
 final:
  return;
}


// If I'm the sink, don't bother.
static int
reassembler_timer_cb (gpointer data, int interval, g_event_t * ev)
{
  elog (LOG_INFO, " ");
  reassembler_state_t * state = (reassembler_state_t *) data;

  if (state->sinkstatus == 0)
    goto final;

  reassembler_timer_helper (state);

 final:
  return TIMER_RENEW;
}


static void
usage (char *name)
{
  misc_print_usage (name, "TODO", "TODO");
  exit (1);
}

static reassembler_state_t
init_state ()
{
  elog (LOG_INFO, " ");
  buf_t * acked_dir_buf = buf_new ();
  bufprintf (acked_dir_buf, ACKED_DIR_PATTERN, my_node_id);

  buf_t * assembled_dir_buf = buf_new ();
  if (in_sim)
      bufprintf (assembled_dir_buf, ASSEMBLED_DIR_PATTERN_SIM, my_node_id);
  else
      bufprintf (assembled_dir_buf, ASSEMBLED_DIR);
  
  reassembler_state_t state = {
    status_context: NULL,
    sinkstatus:0,
    acked_dir: strdup (acked_dir_buf->buf),
    assembled_dir: strdup (assembled_dir_buf->buf)
  };

  buf_free (assembled_dir_buf);
  buf_free (acked_dir_buf);
  
  return state;
}


// Notice when our sink status changes, and updates state.
static int
status_handler (void *new_buf, size_t size, void *data)
{
  elog (LOG_INFO,"Received %s, size %d", (char *) new_buf, size);
  if (size == 0)
    {
      elog (LOG_ERR, "Size is 0!");
      goto final;
    }
  if (!new_buf)
    {
      elog (LOG_ERR, "new_buf is NULL");
      goto final;
    }

  reassembler_state_t *state = (reassembler_state_t *) data;
  int new_status = atoi ((char *) new_buf);
  if ((new_status == 0) || (new_status == 1))
    {
      if (new_status != state->sinkstatus)
        {
          elog (LOG_NOTICE, "Changing status to %d", new_status);
          state->sinkstatus = new_status;
        }
      else
        {
          elog (LOG_NOTICE, "Status is the same.");
        }
    }
  else
    {
      elog (LOG_ERR, "Unrecognized status");
      goto final;
    }

 final:
  free (new_buf);
  return EVENT_RENEW;
}


static void
init_status (reassembler_state_t * state)
{
  elog (LOG_INFO, " ");
  status_client_opts_t opts = {
    devname:sim_path ("/dev/block/sinkstatus"),
    handler:status_handler,
    private_data:state
  };
  if (g_status_client_full (&opts, &state->status_context) < 0)
    {
      elog (LOG_CRIT, "Unable to open status client to %s", opts.devname);
      exit (1);
    }
}


int
main (int argc, char * argv[])
{
  misc_init (&argc, argv, CVSTAG);
  {
    emrun_opts_t emrun_opts = {
      shutdown: block_generic_shutdown,
      data: "reassembler"
    };
    emrun_init (&emrun_opts);
  }

  if (argc > 1)
    {
      usage (argv [0]);
    }

  reassembler_state_t state = init_state();

  if (g_timer_add (REASSEMBLER_TIMER, reassembler_timer_cb, &state, NULL, NULL)
      < 0)
    {
      elog (LOG_CRIT, "Unable to create timer event.");
      exit (1);
    }

  init_status (&state);
  global_state = &state;
  
  g_main ();
  elog (LOG_ALERT, "Event system terminated abnormally.");
  return 1;
}
  





See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  reassembler.c