Code Search for Developers
 
 
  

fragmenter.c from EmStar at Krugle


Show fragmenter.c syntax highlighted

// Fragmenter: files dropped off in one directory are fragmented and
// copied into another directory.

char fragmenter_c_cvsid[] =
"$Id: fragmenter.c,v 1.6 2005/06/21 09:47:24 adparker Exp $";

#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <assert.h>
#include <stdio.h>
#include <glib.h>
#include <ftw.h>
#include "libmisc/file.h"
#include "libmisc/misc_buf.h"
#include "libmisc/elog.h"
#include "emrun/emrun.h"
#include "libdev/status_client.h"
#include "devel/block_tree/block.h"

extern int errno;
extern int in_sim;

// Later, these are to be read from a configuration file.

buf_t * OUTPUT_DIR = NULL;


typedef struct 
{
  uint8_t sinkstatus;
  status_client_context_t * status_context;
} fragmenter_state_t;


typedef struct
{
  char  str_buf [MAX_FILE_STRLEN];
} str;


// Sorry. This is the only way I can have a function called by FTW to
// return data.
GArray * fname_array = NULL;

static FILE *
my_fopen (const char *fname, const char *opts)
{
  elog (LOG_NOTICE, " ");
  FILE *fp = fopen (fname, opts);
  if (fp == NULL)
    {
      elog (LOG_ERR, "Error while trying to open %s: %m", fname);
    }
  return fp;
}


// Writes a fragment under dir, based on block_number.
static void
write_frag (block_t block, int block_number, int length, const char * dir,
	    const buf_t * data)
{
  elog (LOG_NOTICE, " ");
  buf_t * file_base = buf_new ();
  buf_t * file_name_buf = buf_new ();
  const char * file_name = NULL;
  FILE * fout = NULL;

  // Setup block info
  block.offset = block_number * FRAG_SIZE;
  block.length = length;
    
  // Setup output file name is: OUTDIR / block.name / (offset)_(length)_(total)
  bufprintf (file_base, "%d_%d_%d", block.offset, block.length,
	      block.uid.total_length);
  bufprintf (file_name_buf, "%s/%s/%s", OUTPUT_DIR->buf, block.uid.name,
	     file_base->buf);
  file_name = file_name_buf->buf;

  fout = my_fopen (file_name, "w");
  if (fout == NULL)
    {
      elog (LOG_ERR, "Error opening file for write: %s: %m", file_name);
      goto done;
    }

  // Work
  size_t items_written = fwrite (&block, sizeof (block_t), 1, fout);
  if (items_written != 1)
    {
      elog (LOG_ERR, "Error writing block to head of fragment: %s: %m",
	    file_name);
      goto done;
    }
  items_written = fwrite (&data->buf[block.offset], 1, block.length, fout);
  if (items_written != block.length)
    {
      elog (LOG_ERR, "Error writing data to fragment: %s: %m",
	    file_name);
      goto done;
    }
    
 done:
  if (fclose (fout) != 0)
    {
      elog (LOG_ERR, "Error closing FILE handle for %s: %m", file_name);
    }
  buf_free (file_name_buf);
  buf_free (file_base);

  return;
}


// create block header
// fill out block header with appropriate data
// open file
// for every FRAG_SIZE amount of data from file
//    CALL write_frag (FILE * dest, FILE * src, block, offset, length);
static void
frag_file_to_dir (const char * file, const char * dir)
{
  elog (LOG_NOTICE, " ");

  // Setup
  block_t block;
  struct stat mystat;
  buf_t * input_buf = buf_new ();
  gchar * basename = NULL;
  
  if (stat (file, &mystat) < 0)
    {
      elog (LOG_ERR, "Unable to stat file: %s: %m", file);
      goto done;
    }

  basename = g_path_get_basename (file);
  int basename_length = strlen ((char*)basename) + 1;
  if (basename_length > BLOCK_UID_NAME_LEN)
    {
      elog (LOG_ERR, "Basename of %s has length: %d, which is longer than BLOCK_UID_NAME_LEN: %d.", basename, basename_length, BLOCK_UID_NAME_LEN);
      goto done;
    }
  memcpy (block.uid.name, basename, basename_length);

  block.uid.node_id      = my_node_id;
  block.uid.total_length = mystat.st_size;
  int filetobuf_res = file_to_buf (input_buf, file);

  if (filetobuf_res < 0)
    {
      elog (LOG_ERR, "Unable to read file: %s: %m", file);
      goto done;
    }
  if (block.uid.total_length != input_buf->len)
    {
      elog (LOG_ERR,
	    "Weird. The input_buf size != size reported by stat for file %s",
	    file);
      goto done;
    }
  
  div_t div_result = div (block.uid.total_length, FRAG_SIZE);  
  int total_blocks = div_result.quot;

  // Real work
  int block_number;  
  for (block_number = 0; block_number < total_blocks; ++block_number)
    {
      write_frag (block, block_number, FRAG_SIZE, dir, input_buf);
    }
  if (div_result.rem > 0)
    {
      write_frag (block, block_number, div_result.rem, dir, input_buf);
    }

 done:
  buf_free (input_buf);
  g_free (basename);
  return;
}


//   if the directory LOCAL_OUTPUT_DIR/file_name doesn't exist:
//      create it (and parents) 
static void
frag_file (const char * file)
{
  elog (LOG_NOTICE, " ");
  // Setup
  gchar * file_base           = g_path_get_basename ((const gchar*)file);

  buf_t * new_directory_name_buf = buf_new ();
  bufprintf (new_directory_name_buf, "%s/%s", OUTPUT_DIR->buf, file_base);
  const char * new_directory_name = new_directory_name_buf->buf;
  DIR   * new_directory       = opendir (new_directory_name);

  // Work
  if ((new_directory == NULL) &&  (ENOENT == errno))
    {
      // Directory doesn't exist, so create and populate
      if (mkdir_with_parents (new_directory_name) != 0)
	{
	  elog (LOG_ERR, "Unable to create directory with parents: %s: %m",
		new_directory_name);
	  goto done;
	}
      frag_file_to_dir (file, new_directory_name);
    }

  // Done
 done:
  if (new_directory && (closedir (new_directory) != 0))
    {
      elog (LOG_ERR, "Error closing directory handle for %s: %m",
	    new_directory_name);
    }
  buf_free (new_directory_name_buf);
  g_free (file_base);
  return;
}


// Modifies fname_array
static int
collect_fnames (const char * file, const struct stat * sb, int flag)
{
  elog (LOG_NOTICE, " ");
  if ((FTW_F != flag) && (FTW_SL != flag))
    {
      return 0;
    }
  
  if (strlen (file) >= MAX_FILE_STRLEN)
    {
      elog (LOG_CRIT, "Encountered a file whose string length is longer "
	              "than expected:%s \n Skipping it. I'm lame", file);
      return 0;
    }
      
  str foo;
  strncpy (foo.str_buf, file, MAX_FILE_STRLEN);
  g_array_append_val (fname_array, foo);
  
  return 0;
}


// For each file in INCOMING_DIR (or INCOMING_DIR_PATTERN_SIM)
//   CALL collect_fnames
//     Append the file name (can be files or symlinks) to an array.
//   Sort the array of file names.
//   CALL frag_file (array[0])
static void
fragmenter_timer_helper (const char * incoming_dir)
{
  elog (LOG_NOTICE, " ");
  fname_array = g_array_new (0, 1, sizeof (str));
  // FTW FTW FTW FTW FTW FTW FTW FTW FTW FTW FTW FTW 
  if (ftw (incoming_dir, collect_fnames, 4) < 0)
    {
      elog (LOG_CRIT, "ftw encountered an error on %s: %m", incoming_dir);
    }
  else if (fname_array->len > 0)
    {
      frag_file(g_array_index (fname_array, str, 0).str_buf );
    }
  g_array_free (fname_array, 1);

}


static int
fragmenter_timer_cb (gpointer data, int interval, g_event_t * ev)
{
  elog (LOG_NOTICE, " ");
  fragmenter_state_t * state = (fragmenter_state_t *) data;

  if (state->sinkstatus == 1)
    goto final;

  buf_t * buf = buf_new ();

  if (in_sim)
    {
      bufprintf (buf, INCOMING_DIR_PATTERN_SIM, my_node_id);
    }
  else
    {
      bufprintf (buf, INCOMING_DIR);
    }

  fragmenter_timer_helper (buf->buf);
  buf_free (buf);

 final:
  return TIMER_RENEW;
}


static void
usage (char *name)
{
  misc_print_usage (name, "TODO", "TODO");
  exit (1);
}

static fragmenter_state_t
init_state ()
{
  fragmenter_state_t state = {
    status_context: NULL,
    sinkstatus:0
  };
  return state;
}

// Notice when our sink status changes, and updates state.
static int
status_handler (void *new_buf, size_t size, void *data)
{
  elog (LOG_NOTICE,"Received %s, size %d", (char *) new_buf, size);
  if (size == 0)
    {
      elog (LOG_ERR, "Size is 0!");
      goto final;
    }
  if (!new_buf)
    {
      elog (LOG_ERR, "new_buf is NULL");
      goto final;
    }

  fragmenter_state_t *state = (fragmenter_state_t *) data;
  int new_status = atoi ((char *) new_buf);
  if ((new_status == 0) || (new_status == 1))
    {
      if (new_status != state->sinkstatus)
        {
          elog (LOG_NOTICE, "Changing status to %d", new_status);
          state->sinkstatus = new_status;
        }
      else
        {
          elog (LOG_NOTICE, "Status is the same.");
        }
    }
  else
    {
      elog (LOG_ERR, "Unrecognized status");
      goto final;
    }

 final:
  if (new_buf)
    free (new_buf);
  return EVENT_RENEW;
}


// Status client of /dev/block/sinkstatus, to notify me when sink
// status is changed.
static void
init_status (fragmenter_state_t * state)
{
  status_client_opts_t opts = {
  devname:sim_path ("/dev/block/sinkstatus"),
  handler:status_handler,
  private_data:state
  };
  if (g_status_client_full (&opts, &state->status_context) < 0)
    {
      elog (LOG_CRIT, "Unable to open status client to %s", opts.devname);
      exit (1);
    }
}


int
main (int argc, char * argv[])
{
  misc_init (&argc, argv, CVSTAG);
  {
    emrun_opts_t emrun_opts = {
      shutdown: block_generic_shutdown,
      data: "fragmenter"
    };
    emrun_init (&emrun_opts);
  }

  if (argc > 1)
    {
      usage (argv [0]);
    }
  
  fragmenter_state_t state = init_state();
  if (g_timer_add (FRAGMENTER_TIMER, fragmenter_timer_cb, &state, NULL, NULL)
      < 0)
    {
      elog (LOG_CRIT, "Unable to create timer event.");
      exit (1);
    }

  OUTPUT_DIR = buf_new();
  bufprintf (OUTPUT_DIR, BLOCK_DIR_PATTERN, my_node_id);


  init_status (&state);
  g_main ();
  elog (LOG_ALERT, "Event system terminated abnormally.");
  buf_free (OUTPUT_DIR);
  return 1;
}




See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  fragmenter.c