Code Search for Developers
 
 
  

haveblock.c from EmStar at Krugle


Show haveblock.c syntax highlighted

// Maintain table of blocks. 
//
// Publishes a list of blocks that represents, for each file, the last
// of a continguous segment of blocks starting from the beginning of
// that file.


char haveblock_c_cvsid[] =
  "$Id: haveblock.c,v 1.8 2005/06/11 03:52:02 adparker Exp $";

#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <glib.h>
#include <ftw.h>
#include "libmisc/file.h"
#include "libmisc/misc_buf.h"
#include "libmisc/elog.h"
#include "emrun/emrun.h"
#include "libdev/glib_dev.h"
#include "link/link.h"
#include "link/link_headers.h"
#include "devel/state/ssync.h"
#include "devel/block_tree/block.h"
#include "devel/block_tree/haveblock.h"
#include "devel/block_tree/ioutils.h"
#include "devel/state/cluster_map.h"


int use_mhsync = 0;
char * use_prefix = SSYNC_CLUSTER_PREFIX;
cluster_map_t * cluster_map = NULL;

#define COUNT 3
#define FILE_COUNT_MAX 32

typedef struct haveblock_state_s
{
  char *blocks_dir;
  GArray *available_blocks;
  int file_count;
  lu_context_t * link_context;
} haveblock_state_t;

static haveblock_state_t *global_state = NULL;

// Checks the blocks directory and updates block info if required.
// * check to see if it's a regular file 
// * open file for reading
// * read in sizeof(block_t) bytes
// * append this to the state's array
int
update_blocks (const char *file, const struct stat *sb, int flag)
{
  
  elog (LOG_NOTICE, " ");
  haveblock_state_t *state = global_state;
  block_t block;
  FILE *fp = NULL;

  if (FTW_F != flag)
    {
      goto done;
    }

  fp = fopen (file, "rb");
  if (fp == NULL)
    {
      elog (LOG_CRIT, "Unable to open file: %s: %m", file);
      goto done;
    }

  size_t items_read = fread (&block, sizeof (block_t), 1, fp);
  if (items_read != 1)
    {
      if (feof (fp))
        elog (LOG_CRIT, "Unexpected EOF while reading %s", file);

      if (ferror (fp) && !feof (fp))
        elog (LOG_CRIT, "Error while reading %s: %m", file);

      goto done;
    }
  g_array_append_val (state->available_blocks, block);
done:
  if (fp != NULL)
    fclose (fp);
  return 0;
}

// The end result is a list of blocks that represents, for each file,
// the last block of a continguous segment of blocks starting from the
// beginning of that file.
static void
collapse_available_blocks(GArray ** array) // array of block_t
{
  // sort the array:
  g_array_sort_with_data(*array, block_GCompareData, NULL);
  GArray * cumulative_array = g_array_new(0, 1, sizeof (block_t) );
  
  // loop through the array

  // yes, I don't need both looking_for_state and looking_for_end, but
  // sometimes I like to be explicit, so I'm using both.

  int looking_for_start = 1;
  int looking_for_end = 0;
  block_t candidate_block;
  block_t current_block;
  memset(&candidate_block,0,sizeof(block_t));
  int i;
  for (i = 0; i < (*array)->len; ++i)
    {
     current_block = g_array_index(*array, block_t, i);
      if (looking_for_start && (current_block.offset == 0))
	{
	  candidate_block = current_block;
	  looking_for_start = 0;
	  looking_for_end = 1;
	}
      else if (looking_for_end)
	{
	  if ((block_uid_GCompareData(&current_block.uid, &candidate_block.uid,
				      NULL))
	      || (current_block.offset
		  > (candidate_block.offset + candidate_block.length)))
	    { // candidate was the last of the segment
	      g_array_append_val (cumulative_array, candidate_block);
	      looking_for_start = 1;
	      looking_for_end = 0;
	      --i; // try this block again
	    }
	  else 
	    {
	      candidate_block = current_block;
	    }
	}
    }
  
  if (looking_for_end)
    {
      // the candidate_block is the end of the segment.
      g_array_append_val (cumulative_array, candidate_block);
    }
  g_array_free (*array, 1);

  *array = cumulative_array;
}

// Timer cb that periodically traverses the blocks dir and updates the 
// list of available blocks 
static int
update_blocks_timer_cb (gpointer data, int interval, g_event_t * ev)
{
  elog (LOG_INFO, " ");
  haveblock_state_t *state = (haveblock_state_t *) data;
  state->file_count = 0;
  g_array_free (state->available_blocks, 1);
  state->available_blocks = g_array_new (0, 1, sizeof (block_t));

  if (ftw (state->blocks_dir, update_blocks, 4) < 0)
    {
      elog (LOG_CRIT, "ftw encountered an error on %s: %m",
            state->blocks_dir);
    }
  else
    {
      // collapse state->available_blocks down to containing only
      // the last block that they have.
      collapse_available_blocks(&state->available_blocks);
    }
  int delay = 1000 + (state->available_blocks->len) * DELAY_PER_FILE;
  elog (LOG_NOTICE, "Number of total files: %d", state->available_blocks->len);
  elog (LOG_NOTICE, "Delaying for %d ms.",delay);
  return TIMER_RENEW;
}


static void
print_table_entry (void *entry)
{
  buf_t *buf = buf_new ();
  block_unparse (buf, entry, "");
  elog (LOG_INFO, "Publishing the following...\n");
  elog (LOG_INFO, "%s\n", buf->buf);
  buf_free (buf);
}

// Returns allocated memory. Caller must free.
// State has a pointer to a GArray block_t's. This function just converts that
// GArray into a haveblock_table_t.
static haveblock_table_t *
create_table (const haveblock_state_t * state)
{
  elog (LOG_NOTICE, " ");
  int count = state->available_blocks->len;

  haveblock_table_t *table =
    (haveblock_table_t *) malloc (sizeof (haveblock_table_t) * count);
  memset (table, 0, sizeof (haveblock_table_t) * count);

  // This loop creates individual entries: haveblock_table_t gets block_t's.
  int i;
  for (i = 0; i < count; ++i)
    {
      table[i].haveblock = g_array_index (state->available_blocks,
                                          block_t, i);
      print_table_entry (&table[i].haveblock);
    }
  return table;
}


static int
publish_blocks_timer_cb (gpointer data, int interval, g_event_t * ev)
{
  elog (LOG_NOTICE, " ");
  haveblock_state_t *state = (haveblock_state_t *) data;
  flow_id_t fid = {
    src_if:1,
    dst_if:1,
    dst:LINK_BROADCAST,
    max_hops:MHSYNC_ONE_HOP
  };
  if (!use_mhsync) {
    if (lu_get_if_id(state->link_context, &fid.src) != 0)
      {
	elog (LOG_ERR, "Unable to get if_id");
	exit(1);
      }
  }
  int count = state->available_blocks->len;
  haveblock_table_t *table = create_table (state);
  
  if (haveblock_pub_add_empty (use_prefix, table, count, &fid) < 0)
    {
      elog (LOG_CRIT, "Unable to publish.");
    }
  if (table)
    free (table);
  return TIMER_RENEW;
}


static haveblock_state_t
haveblock_init ()
{
  buf_t *buf = buf_new ();
  bufprintf (buf, BLOCK_DIR_PATTERN, my_node_id);

  haveblock_state_t state = {
  blocks_dir:strdup (buf->buf),
  available_blocks:g_array_new (0, 1, sizeof (block_t))
  };

  buf_free (buf);
  return state;
}


static void
usage (char *name)
{
  misc_print_usage
    (name, "-U <device>", "  --uses <device>: Specify link device to use.");
  exit (1);
}


static void
init_link (char *uses, char **argv, haveblock_state_t * state)
{
  if (uses == NULL)
    {
      elog (LOG_CRIT, "Please specify a link to use!");
      usage (argv[0]);
    }

  lu_opts_t lu_opts = {
  opts:{
    name:uses,
  data:state}
    ,
  };
  if (lu_open (&lu_opts, &state->link_context) < 0)
    {
      elog (LOG_CRIT, "Can't open %s: %m", link_name (&lu_opts.opts, NULL));
      exit (1);
    }
  return;
}

int
main (int argc, char *argv[])
{

  misc_init (&argc, argv, CVSTAG);
  {
    emrun_opts_t emrun_opts = {
    shutdown:block_generic_shutdown,
    data:"haveblock"
    };
    emrun_init (&emrun_opts);
  }
  
  char * uses = link_parse_uses(&argc, argv, NULL);
  if(argc > 2)
    {
      usage (argv[0]);
    }
  use_mhsync = misc_parse_out_switch(&argc, argv, "use_mhsync", '\0');
  if (use_mhsync)
    {
      elog (LOG_CRIT, "Using mhsync");
      use_prefix = SSYNC_MULTIHOP_PREFIX;
    }
  
  haveblock_state_t state = haveblock_init ();
  global_state = &state;

  init_link(uses, argv, &state);
  
  if (mkdir_with_parents (state.blocks_dir) < 0)
    {
      elog (LOG_CRIT, "Failed to make directory: %s, %m", state.blocks_dir);
      exit (1);
    }

  if (g_timer_add (PUBLISH_BLOCKS_TIMER, publish_blocks_timer_cb,
		   &state, NULL, NULL)
      < 0)
    {
      elog (LOG_CRIT, "Unable to create timer event.");
      exit (1);
    }
  if (g_timer_add (UPDATE_BLOCKS_TIMER, update_blocks_timer_cb, &state, NULL, NULL)
      < 0)
    {
      elog (LOG_CRIT, "Unable to create timer event.");
      exit (1);
    }

   /* cluster map event */
   cluster_map_opts_t cl_opts = {
     private_data: &state
   };
   if (cluster_map_new(&cl_opts, &cluster_map) < 0) {
     elog(LOG_CRIT, "Can't connect to cluster map: %m");
     exit(1);
   }

  g_main ();
  elog (LOG_ALERT, "Event system terminated abnoramlly.");
  return 1;
}




See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  haveblock.c
  haveblock_type.c