Code Search for Developers
 
 
  

deleteblock.c from EmStar at Krugle


Show deleteblock.c syntax highlighted

/*
  Deleteblock's tasks are:
    1. Delete block directories that have been around for too long.
    2. Immediately delete block directories that have been acked.

  Deleteblock subscribes to: 
    haveblock
    ackblock
*/

char deleteblock_c_cvsid[] = "$Id: deleteblock.c,v 1.8 2005/06/21 07:44:15 adparker Exp $";

#define _GNU_SOURCE

#include <dirent.h>
#include <time.h>
#include <ftw.h>
#include <stdlib.h>
#include <glib.h>
#include <stdio.h>
#include "libdev/glib_dev.h"
#include "libmisc/misc.h"
#include "libmisc/misc_buf.h"
#include "emrun/emrun.h"
#include "libdev/status_client.h"
#include "devel/block_tree/block.h"
#include "devel/block_tree/haveblock.h"
#include "devel/block_tree/ackblock.h"
#include "devel/state/ssync.h"
#include "devel/state/cluster_map.h"
#include "libmisc/misc_sim.h"

extern int in_sim;

int use_mhsync;
char * use_prefix = SSYNC_CLUSTER_PREFIX;


typedef struct deleteblock_state_s
{
  haveblock_table_t *all_haveblocks;
  int all_haveblocks_count;

  ackblock_table_t *all_ackblocks;
  int all_ackblocks_count;

  ssync_sub_t *haveblocks_sub;
  ssync_sub_t *ackblocks_sub;
  status_client_context_t *status_context;
  g_event_t *timer_context;
  uint8_t sinkstatus;
  char * blocks_dir;
  char * original_dir;
  char * acked_dir;
  char * assembled_dir;
} deleteblock_state_t;

cluster_map_t * cluster_map = NULL;

// The manual page gives no guidance as to whether or not I should
// free struct FTW * s. Googling for other examples suggests that I
// don't need to.
int
nftw_remove_files (const char * file, const struct stat * sb, int flag, struct FTW * s)
{
  elog (LOG_NOTICE, "Removing file: %s", file);
  remove (file);
  return 0;
}

static deleteblock_state_t
init_state ()
{
  buf_t *blockbuf = buf_new ();
  buf_t *originalbuf = buf_new ();
  buf_t *ackedbuf = buf_new ();
  buf_t *assembledbuf = buf_new ();
  
  bufprintf (blockbuf, BLOCK_DIR_PATTERN, my_node_id);
  bufprintf (ackedbuf, ACKED_DIR_PATTERN, my_node_id);
  
  if (in_sim)
    {
      bufprintf (originalbuf, INCOMING_DIR_PATTERN_SIM, my_node_id);
      bufprintf (assembledbuf, ASSEMBLED_DIR_PATTERN_SIM, my_node_id);      
    }
  else
    {
      bufprintf (originalbuf, INCOMING_DIR);
      bufprintf (assembledbuf, ASSEMBLED_DIR);
    }
  

  deleteblock_state_t state = {
    all_haveblocks:NULL,
    all_haveblocks_count:0,
    all_ackblocks:NULL,
    all_ackblocks_count:0,
    haveblocks_sub:NULL,
    ackblocks_sub:NULL,
    status_context:NULL,
    timer_context:NULL,
    sinkstatus:0,
    blocks_dir:strdup (blockbuf->buf),
    original_dir: strdup (originalbuf->buf),
    acked_dir: strdup (ackedbuf->buf),
    assembled_dir: strdup (assembledbuf->buf)
  };

  buf_free (originalbuf);
  buf_free (ackedbuf);
  buf_free (assembledbuf);
  buf_free (blockbuf);
  return state;
}

static void
deleteblock_dir_name (const char * dirname)
{
  elog (LOG_NOTICE, "dirname is %s", dirname);
  if (nftw (dirname, nftw_remove_files, 4, FTW_DEPTH) < 0)
    {
      // This is OK, since we may have already deleted this before.
      elog (LOG_INFO, "ftw encountered an error on %s: %m", dirname);
    }
  return;
}
// Recursively deletes the parent directory of block.
static void
deleteblock_dir (deleteblock_state_t * state, const block_t * block)
{
  // Create directory name.
  elog (LOG_INFO, " ");
  buf_t * dirname = buf_new ();
  bufprintf (dirname, "%s/%s", state->blocks_dir, block->uid.name);

  deleteblock_dir_name (dirname->buf);
  
  buf_free (dirname);
}

// Checks both incoming dirs.
// One dir is used for testing, while the other is used "in the real world".

static void
deleteblock_original (deleteblock_state_t * state, const block_t * block)
{
  // Create path name;
  elog (LOG_INFO, " ");
  buf_t * dirname = buf_new ();
  bufprintf (dirname, "%s/%s", state->original_dir, block->uid.name);
  elog (LOG_NOTICE, "Removing file: %s", dirname->buf);
  remove (dirname->buf);
  buf_free (dirname);
}


// Detail on acked blocks:
//
// If I'm a sink, skip.
// Get a list of local haveblocks.
// Get a list of ack blocks.
// Loop through the ack blocks:
//   Loop through the haveblocks:
//     IF the ack block has the same name as the haveblock
//     THEN call block delete on the haveblock
//          remove item from haveblock array. 
//          decrement index.            
static void
deleteblock_acked (deleteblock_state_t * state)
{

  elog (LOG_INFO, " ");
  
  GArray * haveblock_array = haveblock_table_to_GArray (state->all_haveblocks,
							state->all_haveblocks_count,
							ONLY_LOCAL);
  GArray * ackblock_array = ackblock_table_to_GArray (state->all_ackblocks,
						      state->
						      all_ackblocks_count,
						      ALLOW_ALL);
  int have_index;
  int ack_index;
  for (ack_index = 0; ack_index < ackblock_array->len; ++ack_index)
    {
      block_t ackblock = g_array_index (ackblock_array, block_t, ack_index);
      for (have_index = 0; have_index < haveblock_array->len; ++have_index)
	{
	  block_t haveblock = g_array_index (haveblock_array, block_t, have_index);
	  if (  (ackblock.uid.node_id == haveblock.uid.node_id)
	  	&& (strcmp (ackblock.uid.name, haveblock.uid.name) >= 0) )
	    {
	      // Delete /tmp/RDD_ID_16/block_repository/NAME
	      elog (LOG_NOTICE, "Deleting %s:%d", ackblock.uid.name,
		    haveblock.offset);
	      deleteblock_dir (state, &haveblock);

	      // Delete the original if I have it.
	      if (ackblock.uid.node_id == my_node_id)
		{
		  deleteblock_original (state, &haveblock);
		}

	    }
	}
    }
  
  g_array_free (ackblock_array, 1);
  g_array_free (haveblock_array, 1);

}


static int
scandir_filter (const struct dirent * ent)
{
  elog (LOG_INFO, " ");
  if (strcmp (ent->d_name, ".") == 0)
    return 0;
  if (strcmp (ent->d_name, "..") == 0)
    return 0;
  else
    return 1;
}

static void
get_rid_of_it (const struct dirent * name, const char * fullpath)
{
  elog (LOG_INFO, " ");

  deleteblock_dir_name (fullpath);
  
/*   if (DT_DIR == name->d_type) */
/*     { */
/*       deleteblock_dir_name (fullpath); */
/*     } */
/*   else */
/*     { */
/*       if (remove (fullpath) < 0) */
/* 	{ */
/* 	  elog (LOG_ERR, "Unable to remove file: %s: %m", */
/* 		fullpath); */
/* 	} */
/*     } */
  return;
}
// Scan the given directory for files and directories
// fstat them. If they're too old, delete them.
static void
deleteblock_old (deleteblock_state_t * state, const char * dir)
{
  elog (LOG_INFO, " ");
  struct dirent **namelist = NULL;
  int n;

  mkdir_with_parents (dir);
  
  n = scandir (dir, &namelist, scandir_filter, NULL);
  if (n < 0)
    {
      elog (LOG_CRIT, "scandir failed while scanning: %s: %m", dir);
      goto final;
    }

  // successful scandir
  while (n--)
    {
      struct stat file_stat;
      buf_t * fullpath = buf_new ();
      bufprintf (fullpath, "%s/%s", dir, namelist[n]->d_name);
	      
      if (stat (fullpath->buf , &file_stat) < 0)
	{
	  elog (LOG_ERR, "Unable to stat file: %s", fullpath->buf);
	}
      else
	{ // successful stat
	  time_t age = time (NULL) - file_stat.st_mtime;
	  
	  if (  age > DELETE_MAX_AGE )
	    { // delete it, whether it's a dir or file
	  
	      elog (LOG_NOTICE, "Deleting file: %s:  Since it's %ld seconds old",
		    fullpath->buf, age);
	      get_rid_of_it (namelist[n], fullpath->buf);
	
	    }
	  else
	    {
	      elog (LOG_INFO, "Skipping %s since it's %ld seconds old",
		    fullpath->buf, age);
	    }
	}
      // no matter what, free this element
      free (namelist [n]);
      buf_free (fullpath);
    }
  free (namelist);
 final:
  return;
}
// It's a lot simpler now.
//
// Remove any block that I have that's been acked.
//
// Clears out the following directories:
// BLOCK_DIR_PATTERN
// ACKED_DIR_PATTERN

static int
deleteblock_timer (gpointer data, int interval, g_event_t * ev)
{
  elog (LOG_INFO, " ");
  deleteblock_state_t * state = (deleteblock_state_t *) data;
  deleteblock_old (state, state->blocks_dir);
  deleteblock_old (state, state->acked_dir);
  if (state->sinkstatus == 0)
    {
      deleteblock_acked (state);
    }
  return TIMER_RENEW;
}



static void
init_timer (deleteblock_state_t * state)
{
  if (g_timer_add
      (DELETE_TIMER, deleteblock_timer, state, NULL, &state->timer_context) < 0)
    {
      elog (LOG_ERR, "Unable to create timer event.");
      exit (1);
    }
}

// Handles incoming messages on ackblock channel.
// Copies (and possibly converts) data to state data structure.
// You don't have to call ackblock_table_convert_src_to_id because
// ackblocks are always subscribed via mhsync.
static int
ackblock_sub_handler (ssync_sub_t * sub, ackblock_table_t * table,
                      int count, void *data)
{
  elog (LOG_INFO, " ");
  deleteblock_state_t *state = (deleteblock_state_t *) data;

  int new_count;
  ackblock_table_t * new_table = NULL;
  ackblock_remove_empty_blocks (table, count, &new_table, &new_count);
  
  
  if (state->all_ackblocks != NULL)
    {
      free (state->all_ackblocks);
    }

  state->all_ackblocks = new_table;
  state->all_ackblocks_count = new_count;

  {
    buf_t *buf = buf_new ();
    ackblock_table_unparse (buf, new_table, new_count);
    elog (LOG_NOTICE, "%s", buf->buf);
    buf_free (buf);
  }

  free (table);
  return EVENT_RENEW;
}

// Handles incoming messages on haveblock channel.
// Copies (and possibly converts) data to state data structure.
static int
haveblock_sub_handler (ssync_sub_t * sub, haveblock_table_t * table,
                       int count, void *data)
{
  elog (LOG_INFO, " ");
  deleteblock_state_t *state = (deleteblock_state_t *) data;

  if (!use_mhsync)
    {
      haveblock_table_convert_src_to_id(table, count, cluster_map);
    }

  int new_count;
  haveblock_table_t * new_table = NULL;
  haveblock_remove_empty_blocks (table, count, &new_table, &new_count);

  if (state->all_haveblocks != NULL)
    {
      free (state->all_haveblocks);
    }

  state->all_haveblocks = new_table;
  state->all_haveblocks_count = new_count;

  {
    buf_t *buf = buf_new ();
    haveblock_table_unparse (buf, new_table, new_count);
    elog (LOG_NOTICE, "%s", buf->buf);
    buf_free (buf);
  }

  free (table);
  return EVENT_RENEW;
}

// Handles subscribing to haveblock channel
static void
haveblock_sub (deleteblock_state_t * state)
{
  elog (LOG_INFO, " ");
  ssync_sub_opts_t opts =
    {
      reread_period:REREAD_PERIOD,
      read_refractory: REFRACTORY
    };
  
  if (haveblock_sub_open_full (use_prefix, haveblock_sub_handler, state, &opts,
			       &state->haveblocks_sub)
      < 0)
    {
      elog (LOG_ERR, "Unable to subscribe to haveblock channel: %m");
      exit (1);
    }
  return;
}

// Handles subscribing to ackblock channel
static void
ackblock_sub (deleteblock_state_t * state)
{
  elog (LOG_INFO, " ");
  ssync_sub_opts_t opts =
    {
      reread_period:REREAD_PERIOD,
      read_refractory: REFRACTORY
    };
  
  if (ackblock_sub_open_full (SSYNC_MULTIHOP_PREFIX, ackblock_sub_handler, state,
			      &opts, &state->ackblocks_sub)
      < 0)
    {
      elog (LOG_ERR, "Unable to subscribe to ackblock channel: %m");
      exit (1);
    }
  return;
}

// When we get a status notify, it indicates a possible change in sink
// status. This function updates a global variable to reflect the
// current sink status.
static int
status_handler (void *new_buf, size_t size, void *data)
{
  elog (LOG_INFO, "Received %s, size %d", (char *) new_buf, size);

  // Input check
  if (size == 0)
    {
      elog (LOG_ERR, "Size is not 0!");
      goto final;
    }
  if (!new_buf)
    {
      elog (LOG_ERR, "new_buf is NULL");
      goto final;
    }

  // Work
  deleteblock_state_t *state = (deleteblock_state_t *) data;
  int new_status = atoi ((char *) new_buf);
  if ((new_status == 0) || (new_status == 1))
    {
      if (new_status != state->sinkstatus)
        {
          elog (LOG_INFO, "Changing status to %d", new_status);
          state->sinkstatus = new_status;
        }
      else
        {
          elog (LOG_INFO, "Status is the same.");
        }
    }
  else
    {
      elog (LOG_ERR, "Unrecognized status");
      goto final;
    }
final:
  if (new_buf)
    free (new_buf);
  return EVENT_RENEW;
}



static void
status_init (deleteblock_state_t * state)
{
  status_client_opts_t opts = {
  devname:sim_path ("/dev/block/sinkstatus"),
  handler:status_handler,
  private_data:state
  };
  if (g_status_client_full (&opts, &state->status_context) < 0)
    {
      elog (LOG_CRIT, "Unable to open status client to %s", opts.devname);
      exit (1);
    }
}

static void
usage (char *name)
{
  misc_print_usage (name, "", "");
  exit (1);
}

int
main (int argc, char **argv)
{
  misc_init (&argc, argv, CVSTAG);
  {
    emrun_opts_t emrun_opts = {
    shutdown:block_generic_shutdown,
    data:"deleteblock"
    };
    emrun_init (&emrun_opts);
  }
  elog (LOG_INFO, " ");
  if (argc > 2)
    {
      usage (argv[0]);
    }
  use_mhsync = misc_parse_out_switch(&argc, argv, "use_mhsync", '\0');
  if (use_mhsync)
    {
      elog (LOG_CRIT,"Using mhsync");
      use_prefix = SSYNC_MULTIHOP_PREFIX;
    }

  deleteblock_state_t state = init_state ();
  haveblock_sub (&state);
  ackblock_sub (&state);
  init_timer (&state);
  status_init (&state);

  /* cluster map event */
  cluster_map_opts_t cl_opts = {
    private_data: &state
   };
  if (cluster_map_new(&cl_opts, &cluster_map) < 0) {
    elog(LOG_CRIT, "Can't connect to cluster map: %m");
    exit(1);
  }

  
  g_main ();

  return 1;
}




See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  deleteblock.c