Code Search for Developers
 
 
  

ackblock.c from EmStar at Krugle


Show ackblock.c syntax highlighted

/*
  ackblock uses the haveblock table to determine when a file has been
  completely downloaded.

  Ack's job is to acknowledge the "highest" seen file thus far, on a
  per originating-node basis.

  Ack maintains an array of acks. 

  Periodically, ack will check the list of haveblocks. For any
  completed file, ack will move the directory to ACKED_DIR_PATTERN,
  update its array of acks, and publish the ackblock table. Simple huh?

*/


char ackblock_c_cvsid[] = "$Id: ackblock.c,v 1.6 2005/06/15 13:13:18 adparker Exp $";

#include <glib.h>
#include "libdev/glib_dev.h"
#include "libmisc/misc.h"
#include "emrun/emrun.h"
#include "libdev/status_client.h"
#include "devel/block_tree/block.h"
#include "devel/block_tree/haveblock.h"
#include "devel/block_tree/ackblock.h"
#include "devel/state/ssync.h"
#include "link/link.h"
#include "devel/state/cluster_map.h"

int use_mhsync = 0;
char * use_prefix = SSYNC_CLUSTER_PREFIX;
typedef struct ackblock_state_s
{
  haveblock_table_t *all_haveblocks;
  int all_haveblocks_count;

  ssync_sub_t *haveblocks_sub;
  status_client_context_t *status_context;
  g_event_t *timer_context;
  uint8_t sinkstatus;
  flow_id_t fid;
  lu_context_t * link_context;
  char * blocks_dir;
  char * acked_dir;
  GArray * ackblocks;
} ackblock_state_t;


static ackblock_state_t
init_state ()
{
  buf_t * buf = buf_new ();
  bufprintf (buf, BLOCK_DIR_PATTERN, my_node_id);
  buf_t * buf2 = buf_new ();
  bufprintf (buf2, ACKED_DIR_PATTERN, my_node_id);
  
  ackblock_state_t state = {
  all_haveblocks:NULL,
  all_haveblocks_count:0,
  haveblocks_sub:NULL,
  status_context:NULL,
  timer_context:NULL,
  sinkstatus:0,
  fid:{
    src:my_node_id,
    dst:LINK_BROADCAST,
    max_hops:MAX_HOPS,
  },
  blocks_dir: strdup (buf->buf),
  acked_dir: strdup (buf2->buf)
  };

  state.ackblocks = g_array_new (0, 1, sizeof (block_t));
  
  return state;
}

cluster_map_t * cluster_map = NULL;




// Allocates memory.
//
// The array is assumed to contain blocks that we have on disk.  For
// each completed file, get_finished_blocks returns the last block of
// that file.
static GArray *
get_finished_blocks (GArray * array)
{
  GArray * finished_array = g_array_new (0, 1, sizeof (block_t));
  int i;
  for (i = 0; i < array->len; ++i)
    {
      block_t block = g_array_index (array, block_t, i);
      if ((block.offset + block.length)
	   == (block.uid.total_length))
	{
	  g_array_append_val (finished_array, block);
	}
    }
  return finished_array;
}

static void
move_finished_blocks_helper (ackblock_state_t * state, block_t block)
{
  elog (LOG_INFO, " ");
  // Create the old and new directory name.
  buf_t * olddir = buf_new ();
  buf_t * newdir = buf_new ();
  int old_exists = 0;
  int new_exists = 0;
  DIR * foo = NULL;
  
  bufprintf(olddir, "%s/%s", state->blocks_dir, block.uid.name);
  bufprintf(newdir, "%s/%s", state->acked_dir, block.uid.name);

  if (mkdir_with_parents (state->acked_dir) < 0)
    {
      elog (LOG_CRIT, "Unable to create directory to place acked blocks: %s: %m",
	    state->acked_dir);
      goto final;
    }

  foo = opendir (olddir->buf);
  if (foo) old_exists = 1;
  closedir (foo);
  foo = opendir (newdir->buf);
  if (foo) new_exists = 1;
  closedir (foo);
  
  if ( old_exists && !new_exists && (rename (olddir->buf, newdir->buf) < 0) )

    {
      elog (LOG_CRIT, "Unable to carry out the following rename: %s --> %s: %m",
	    olddir->buf, newdir->buf);
      goto final;
    }

 final:
  buf_free (newdir);
  buf_free (olddir);
  
}

// finished_blocks should contain a list of the last blocks of
// completed files.
//
// It simply moves the block's directory to ACKED_DIR_PATTERN
static void
move_finished_blocks (ackblock_state_t * state, GArray * finished_blocks)
{
  elog (LOG_INFO, " ");

  int i;
  for (i = 0; i < finished_blocks->len; ++i)
    {
      block_t block = g_array_index (finished_blocks, block_t, i);
      move_finished_blocks_helper (state, block);
    }
}


static void
update_ackblock_table (ackblock_state_t * state, GArray * finished_blocks)
{
  elog (LOG_NOTICE, " ");

  // For each of the finished blocks
  //  Search for an ack block with the same origin node id.
  //    if one is found, 
  //    then if ack block is < finish, replace ack with finish.
  //    else 
  //      none is found. so append finish to ack array.
  int finished_index;
  for (finished_index = 0; finished_index < finished_blocks->len;
       ++finished_index)
    {
      int found = 0;
      int ack_index = 0;
      block_t finished_block = g_array_index (finished_blocks, block_t,
					      finished_index);
      for (ack_index = 0; ack_index < state->ackblocks->len; ++ack_index)
	{
	  block_t ackblock = g_array_index (state->ackblocks, block_t, ack_index);
	  if (ackblock.uid.node_id == finished_block.uid.node_id)
	    {
	      found = 1;
	      // If finished_block's name is greater than ackblock's,
	      // then replace ackblock with finished_block in the array.
	      if (strcmp (finished_block.uid.name, ackblock.uid.name) > 0 )
		{
		  g_array_remove_index (state->ackblocks, ack_index);
		  g_array_insert_val (state->ackblocks, ack_index, finished_block);
		}
	      
	      // Regardless, break out of the for loop, since there
	      // should only be at most one match between ack and
	      // finished blocks, at the level of node_ids.
	      break;
	    } // end if
	  
	} // end for (ack_index ...)

      // If no matching block was found in the ackblock array, append
      // the finished block.
      if (found == 0)
	{
	  g_array_append_val (state->ackblocks, finished_block);
	}
    } // end for (finished_index ...
  return;
}

// This is where the real work happens.
//
// Filter the haveblock table down to completed files.
// For each of the completed files, move to ACKED_DIR_PATTERN.
// For each of the completed files, update the ackblock table.

static void
publish_ackblock_table (ackblock_state_t * state)
{
  int count = state->ackblocks->len;
  ackblock_table_t *ackblock_table =
    ackblock_create_table_from_array (state->ackblocks);

  if (ackblock_pub_add_empty (SSYNC_MULTIHOP_PREFIX, ackblock_table, count,
			      &state->fid)
      < 0)
    {
      elog (LOG_CRIT, "Unable to publish ackblock_table");
    }

  {
    buf_t *buf = buf_new ();
    bufprintf (buf, "Publishing the following ACKBLOCK_TABLE...");
    ackblock_table_unparse (buf, ackblock_table, count);
    elog (LOG_NOTICE, "%s", buf->buf);
    buf_free (buf);
  }

  free (ackblock_table);
  return;
}

// If I'm not the sink, bail.
//
// Move the finished blocks over to ACKED_DIR_PATTERN
// Update ackblock table and publish.
static int
publish_ackblock_timer (gpointer data, int interval, g_event_t * ev)
{
  elog (LOG_NOTICE, " ");
  ackblock_state_t * state = (ackblock_state_t *)data;
  if (state->sinkstatus == 0)
  {
    return TIMER_RENEW;
  }
  elog (LOG_NOTICE,"I'm a sink... taking action...");
  // convert haveblocks to local array
  GArray *haveblock_local_array =
    haveblock_table_to_GArray (state->all_haveblocks,
                               state->all_haveblocks_count, ONLY_LOCAL);
  // Copy over those that have the last block.
  GArray * finished_blocks = get_finished_blocks (haveblock_local_array);

  // Move over those finished blocks to ACKED_DIR_PATTERN
  move_finished_blocks (state, finished_blocks);

  // Update ackblock table with the latest info.
  update_ackblock_table (state, finished_blocks);

  // Publish
  publish_ackblock_table (state);
   
  g_array_free (finished_blocks, 1);
  g_array_free (haveblock_local_array, 1);
  
  return TIMER_RENEW;

}


// When there's a new haveblock table, update my copy, possibly
// converting source interface ids to node ids.
static int
haveblock_sub_handler (ssync_sub_t * sub, haveblock_table_t * table,
                       int count, void *data)
{
  elog (LOG_NOTICE, " ");
  ackblock_state_t *state = (ackblock_state_t *) data;

  if (!use_mhsync)
    haveblock_table_convert_src_to_id(table, count, cluster_map);

  int new_count;
  haveblock_table_t * new_table = NULL;
  haveblock_remove_empty_blocks (table, count, &new_table, &new_count);

  
  if (state->all_haveblocks != NULL)
    free (state->all_haveblocks);
  state->all_haveblocks = new_table;
  state->all_haveblocks_count = new_count;

  {
    buf_t *buf = buf_new ();
    haveblock_table_unparse (buf, new_table, new_count);
    elog (LOG_NOTICE, "%s", buf->buf);
    buf_free (buf);
  }
  
  free (table);
  return EVENT_RENEW;
}


// Timer to periodically publish the ackblock table.
static void
init_timer (ackblock_state_t * state)
{
  if (g_timer_add
      (ACKBLOCK_TIMER, publish_ackblock_timer, state, NULL, &state->timer_context) < 0)
    {
      elog (LOG_CRIT, "Unable to create timer event.");
      exit (1);
    }
}


// haveblock channel is used to indicate when a file has completed its
// download.
//
// The assumption is that haveblock reports the last block of a
// contiguous series of blocks starting from the beginning.
//
// We can tell when a file has completely downloaded when haveblock
// reports the last block of a file.
static void
haveblock_sub (ackblock_state_t * state)
{
  ssync_sub_opts_t opts = {
    reread_period: REREAD_PERIOD,
    read_refractory: REFRACTORY
  };

  if (haveblock_sub_open_full
      (use_prefix, haveblock_sub_handler, state, &opts, &state->haveblocks_sub) < 0)
    {
      elog (LOG_ERR, "Unable to subscribe to haveblock channel: %m");
      exit (1);
    }
  return;
}


// Notice when our sink status changes, and updates state.
static int
status_handler (void *new_buf, size_t size, void *data)
{
  elog (LOG_NOTICE,"Received %s, size %d", (char *) new_buf, size);
  if (size == 0)
    {
      elog (LOG_ERR, "Size is 0!");
      goto final;
    }
  if (!new_buf)
    {
      elog (LOG_ERR, "new_buf is NULL");
      goto final;
    }
  ackblock_state_t *state = (ackblock_state_t *) data;
  int new_status = atoi ((char *) new_buf);
  if ((new_status == 0) || (new_status == 1))
    {
      if (new_status != state->sinkstatus)
        {
          elog (LOG_NOTICE, "Changing status to %d", new_status);
          state->sinkstatus = new_status;
        }
      else
        {
          elog (LOG_NOTICE, "Status is the same.");
        }
    }
  else
    {
      elog (LOG_ERR, "Unrecognized status");
      goto final;
    }
final:
  if (new_buf)
    free (new_buf);
  return EVENT_RENEW;
}

// Status client of /dev/block/sinkstatus, to notify me when sink
// status is changed.
static void
status_init (ackblock_state_t * state)
{
  status_client_opts_t opts = {
  devname:sim_path ("/dev/block/sinkstatus"),
  handler:status_handler,
  private_data:state
  };
  if (g_status_client_full (&opts, &state->status_context) < 0)
    {
      elog (LOG_CRIT, "Unable to open status client to %s", opts.devname);
      exit (1);
    }
}


static void
usage (char *name)
{
  misc_print_usage
    (name, "-U <device>", "  --uses <device>: Specify link device to use.");
  exit (1);
}


// Initialize a link client. This might be left over from something,
// since I don't seem to be using the link for anything...
static void
init_link (char *uses, char **argv, ackblock_state_t * state)
{
  if (uses == NULL)
    {
      elog (LOG_CRIT, "Please specify a link to use!");
      usage (argv[0]);
    }

  lu_opts_t lu_opts = {
  opts:{
    name:uses,
  data:state}
    ,
  };
  if (lu_open (&lu_opts, &state->link_context) < 0)
    {
      elog (LOG_CRIT, "Can't open %s: %m", link_name (&lu_opts.opts, NULL));
      exit (1);
    }
  return;
}


// emrun init
// parse out which link to use
// parse out if we're using mhsync or cluster sync directly.
// initialize state data structure
// subscribe to haveblock channel
// initialize timer
// initialize status client (client of /dev/block/sinkstatus)
// initialize link (Now that I look at it, I'm not sure why I need the link...)
// cluster map stuff (Need to convert interface id to node id
int
main (int argc, char **argv)
{
  misc_init (&argc, argv, CVSTAG);
  {
    emrun_opts_t emrun_opts = {
    shutdown: block_generic_shutdown, data:"ackblock"
    };
    emrun_init (&emrun_opts);
  }
  elog (LOG_NOTICE, " ");
  char * uses = link_parse_uses (&argc, argv, NULL);
  if (argc > 2)
    {
      usage (argv[0]);
    }
  use_mhsync = misc_parse_out_switch (&argc, argv, "use_mhsync",'\0');
  if (use_mhsync)
    {
      elog (LOG_CRIT,"Using mhsync");
      use_prefix = SSYNC_MULTIHOP_PREFIX;
    }
  
  ackblock_state_t state = init_state ();
  haveblock_sub (&state);
  init_timer (&state);
  status_init (&state);
  init_link(uses, argv, &state);
  
   /* cluster map event */
   cluster_map_opts_t cl_opts = {
     private_data: &state
   };
   if (cluster_map_new (&cl_opts, &cluster_map) < 0) {
     elog(LOG_CRIT, "Can't connect to cluster map: %m");
     exit(1);
   }

  g_main ();
  return 1;
}




See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  ackblock.c
  ackblock_type.c