Show deleteblock.c syntax highlighted
/*
Deleteblock's tasks are:
1. Delete block directories that have been around for too long.
2. Immediately delete block directories that have been acked.
Deleteblock subscribes to:
haveblock
ackblock
*/
char deleteblock_c_cvsid[] = "$Id: deleteblock.c,v 1.8 2005/06/21 07:44:15 adparker Exp $";
#define _GNU_SOURCE
#include <dirent.h>
#include <time.h>
#include <ftw.h>
#include <stdlib.h>
#include <glib.h>
#include <stdio.h>
#include "libdev/glib_dev.h"
#include "libmisc/misc.h"
#include "libmisc/misc_buf.h"
#include "emrun/emrun.h"
#include "libdev/status_client.h"
#include "devel/block_tree/block.h"
#include "devel/block_tree/haveblock.h"
#include "devel/block_tree/ackblock.h"
#include "devel/state/ssync.h"
#include "devel/state/cluster_map.h"
#include "libmisc/misc_sim.h"
extern int in_sim;
int use_mhsync;
char * use_prefix = SSYNC_CLUSTER_PREFIX;
typedef struct deleteblock_state_s
{
haveblock_table_t *all_haveblocks;
int all_haveblocks_count;
ackblock_table_t *all_ackblocks;
int all_ackblocks_count;
ssync_sub_t *haveblocks_sub;
ssync_sub_t *ackblocks_sub;
status_client_context_t *status_context;
g_event_t *timer_context;
uint8_t sinkstatus;
char * blocks_dir;
char * original_dir;
char * acked_dir;
char * assembled_dir;
} deleteblock_state_t;
cluster_map_t * cluster_map = NULL;
// The manual page gives no guidance as to whether or not I should
// free struct FTW * s. Googling for other examples suggests that I
// don't need to.
int
nftw_remove_files (const char * file, const struct stat * sb, int flag, struct FTW * s)
{
elog (LOG_NOTICE, "Removing file: %s", file);
remove (file);
return 0;
}
static deleteblock_state_t
init_state ()
{
buf_t *blockbuf = buf_new ();
buf_t *originalbuf = buf_new ();
buf_t *ackedbuf = buf_new ();
buf_t *assembledbuf = buf_new ();
bufprintf (blockbuf, BLOCK_DIR_PATTERN, my_node_id);
bufprintf (ackedbuf, ACKED_DIR_PATTERN, my_node_id);
if (in_sim)
{
bufprintf (originalbuf, INCOMING_DIR_PATTERN_SIM, my_node_id);
bufprintf (assembledbuf, ASSEMBLED_DIR_PATTERN_SIM, my_node_id);
}
else
{
bufprintf (originalbuf, INCOMING_DIR);
bufprintf (assembledbuf, ASSEMBLED_DIR);
}
deleteblock_state_t state = {
all_haveblocks:NULL,
all_haveblocks_count:0,
all_ackblocks:NULL,
all_ackblocks_count:0,
haveblocks_sub:NULL,
ackblocks_sub:NULL,
status_context:NULL,
timer_context:NULL,
sinkstatus:0,
blocks_dir:strdup (blockbuf->buf),
original_dir: strdup (originalbuf->buf),
acked_dir: strdup (ackedbuf->buf),
assembled_dir: strdup (assembledbuf->buf)
};
buf_free (originalbuf);
buf_free (ackedbuf);
buf_free (assembledbuf);
buf_free (blockbuf);
return state;
}
static void
deleteblock_dir_name (const char * dirname)
{
elog (LOG_NOTICE, "dirname is %s", dirname);
if (nftw (dirname, nftw_remove_files, 4, FTW_DEPTH) < 0)
{
// This is OK, since we may have already deleted this before.
elog (LOG_INFO, "ftw encountered an error on %s: %m", dirname);
}
return;
}
// Recursively deletes the parent directory of block.
static void
deleteblock_dir (deleteblock_state_t * state, const block_t * block)
{
// Create directory name.
elog (LOG_INFO, " ");
buf_t * dirname = buf_new ();
bufprintf (dirname, "%s/%s", state->blocks_dir, block->uid.name);
deleteblock_dir_name (dirname->buf);
buf_free (dirname);
}
// Checks both incoming dirs.
// One dir is used for testing, while the other is used "in the real world".
static void
deleteblock_original (deleteblock_state_t * state, const block_t * block)
{
// Create path name;
elog (LOG_INFO, " ");
buf_t * dirname = buf_new ();
bufprintf (dirname, "%s/%s", state->original_dir, block->uid.name);
elog (LOG_NOTICE, "Removing file: %s", dirname->buf);
remove (dirname->buf);
buf_free (dirname);
}
// Detail on acked blocks:
//
// If I'm a sink, skip.
// Get a list of local haveblocks.
// Get a list of ack blocks.
// Loop through the ack blocks:
// Loop through the haveblocks:
// IF the ack block has the same name as the haveblock
// THEN call block delete on the haveblock
// remove item from haveblock array.
// decrement index.
static void
deleteblock_acked (deleteblock_state_t * state)
{
elog (LOG_INFO, " ");
GArray * haveblock_array = haveblock_table_to_GArray (state->all_haveblocks,
state->all_haveblocks_count,
ONLY_LOCAL);
GArray * ackblock_array = ackblock_table_to_GArray (state->all_ackblocks,
state->
all_ackblocks_count,
ALLOW_ALL);
int have_index;
int ack_index;
for (ack_index = 0; ack_index < ackblock_array->len; ++ack_index)
{
block_t ackblock = g_array_index (ackblock_array, block_t, ack_index);
for (have_index = 0; have_index < haveblock_array->len; ++have_index)
{
block_t haveblock = g_array_index (haveblock_array, block_t, have_index);
if ( (ackblock.uid.node_id == haveblock.uid.node_id)
&& (strcmp (ackblock.uid.name, haveblock.uid.name) >= 0) )
{
// Delete /tmp/RDD_ID_16/block_repository/NAME
elog (LOG_NOTICE, "Deleting %s:%d", ackblock.uid.name,
haveblock.offset);
deleteblock_dir (state, &haveblock);
// Delete the original if I have it.
if (ackblock.uid.node_id == my_node_id)
{
deleteblock_original (state, &haveblock);
}
}
}
}
g_array_free (ackblock_array, 1);
g_array_free (haveblock_array, 1);
}
static int
scandir_filter (const struct dirent * ent)
{
elog (LOG_INFO, " ");
if (strcmp (ent->d_name, ".") == 0)
return 0;
if (strcmp (ent->d_name, "..") == 0)
return 0;
else
return 1;
}
static void
get_rid_of_it (const struct dirent * name, const char * fullpath)
{
elog (LOG_INFO, " ");
deleteblock_dir_name (fullpath);
/* if (DT_DIR == name->d_type) */
/* { */
/* deleteblock_dir_name (fullpath); */
/* } */
/* else */
/* { */
/* if (remove (fullpath) < 0) */
/* { */
/* elog (LOG_ERR, "Unable to remove file: %s: %m", */
/* fullpath); */
/* } */
/* } */
return;
}
// Scan the given directory for files and directories
// fstat them. If they're too old, delete them.
static void
deleteblock_old (deleteblock_state_t * state, const char * dir)
{
elog (LOG_INFO, " ");
struct dirent **namelist = NULL;
int n;
mkdir_with_parents (dir);
n = scandir (dir, &namelist, scandir_filter, NULL);
if (n < 0)
{
elog (LOG_CRIT, "scandir failed while scanning: %s: %m", dir);
goto final;
}
// successful scandir
while (n--)
{
struct stat file_stat;
buf_t * fullpath = buf_new ();
bufprintf (fullpath, "%s/%s", dir, namelist[n]->d_name);
if (stat (fullpath->buf , &file_stat) < 0)
{
elog (LOG_ERR, "Unable to stat file: %s", fullpath->buf);
}
else
{ // successful stat
time_t age = time (NULL) - file_stat.st_mtime;
if ( age > DELETE_MAX_AGE )
{ // delete it, whether it's a dir or file
elog (LOG_NOTICE, "Deleting file: %s: Since it's %ld seconds old",
fullpath->buf, age);
get_rid_of_it (namelist[n], fullpath->buf);
}
else
{
elog (LOG_INFO, "Skipping %s since it's %ld seconds old",
fullpath->buf, age);
}
}
// no matter what, free this element
free (namelist [n]);
buf_free (fullpath);
}
free (namelist);
final:
return;
}
// It's a lot simpler now.
//
// Remove any block that I have that's been acked.
//
// Clears out the following directories:
// BLOCK_DIR_PATTERN
// ACKED_DIR_PATTERN
static int
deleteblock_timer (gpointer data, int interval, g_event_t * ev)
{
elog (LOG_INFO, " ");
deleteblock_state_t * state = (deleteblock_state_t *) data;
deleteblock_old (state, state->blocks_dir);
deleteblock_old (state, state->acked_dir);
if (state->sinkstatus == 0)
{
deleteblock_acked (state);
}
return TIMER_RENEW;
}
static void
init_timer (deleteblock_state_t * state)
{
if (g_timer_add
(DELETE_TIMER, deleteblock_timer, state, NULL, &state->timer_context) < 0)
{
elog (LOG_ERR, "Unable to create timer event.");
exit (1);
}
}
// Handles incoming messages on ackblock channel.
// Copies (and possibly converts) data to state data structure.
// You don't have to call ackblock_table_convert_src_to_id because
// ackblocks are always subscribed via mhsync.
static int
ackblock_sub_handler (ssync_sub_t * sub, ackblock_table_t * table,
int count, void *data)
{
elog (LOG_INFO, " ");
deleteblock_state_t *state = (deleteblock_state_t *) data;
int new_count;
ackblock_table_t * new_table = NULL;
ackblock_remove_empty_blocks (table, count, &new_table, &new_count);
if (state->all_ackblocks != NULL)
{
free (state->all_ackblocks);
}
state->all_ackblocks = new_table;
state->all_ackblocks_count = new_count;
{
buf_t *buf = buf_new ();
ackblock_table_unparse (buf, new_table, new_count);
elog (LOG_NOTICE, "%s", buf->buf);
buf_free (buf);
}
free (table);
return EVENT_RENEW;
}
// Handles incoming messages on haveblock channel.
// Copies (and possibly converts) data to state data structure.
static int
haveblock_sub_handler (ssync_sub_t * sub, haveblock_table_t * table,
int count, void *data)
{
elog (LOG_INFO, " ");
deleteblock_state_t *state = (deleteblock_state_t *) data;
if (!use_mhsync)
{
haveblock_table_convert_src_to_id(table, count, cluster_map);
}
int new_count;
haveblock_table_t * new_table = NULL;
haveblock_remove_empty_blocks (table, count, &new_table, &new_count);
if (state->all_haveblocks != NULL)
{
free (state->all_haveblocks);
}
state->all_haveblocks = new_table;
state->all_haveblocks_count = new_count;
{
buf_t *buf = buf_new ();
haveblock_table_unparse (buf, new_table, new_count);
elog (LOG_NOTICE, "%s", buf->buf);
buf_free (buf);
}
free (table);
return EVENT_RENEW;
}
// Handles subscribing to haveblock channel
static void
haveblock_sub (deleteblock_state_t * state)
{
elog (LOG_INFO, " ");
ssync_sub_opts_t opts =
{
reread_period:REREAD_PERIOD,
read_refractory: REFRACTORY
};
if (haveblock_sub_open_full (use_prefix, haveblock_sub_handler, state, &opts,
&state->haveblocks_sub)
< 0)
{
elog (LOG_ERR, "Unable to subscribe to haveblock channel: %m");
exit (1);
}
return;
}
// Handles subscribing to ackblock channel
static void
ackblock_sub (deleteblock_state_t * state)
{
elog (LOG_INFO, " ");
ssync_sub_opts_t opts =
{
reread_period:REREAD_PERIOD,
read_refractory: REFRACTORY
};
if (ackblock_sub_open_full (SSYNC_MULTIHOP_PREFIX, ackblock_sub_handler, state,
&opts, &state->ackblocks_sub)
< 0)
{
elog (LOG_ERR, "Unable to subscribe to ackblock channel: %m");
exit (1);
}
return;
}
// When we get a status notify, it indicates a possible change in sink
// status. This function updates a global variable to reflect the
// current sink status.
static int
status_handler (void *new_buf, size_t size, void *data)
{
elog (LOG_INFO, "Received %s, size %d", (char *) new_buf, size);
// Input check
if (size == 0)
{
elog (LOG_ERR, "Size is not 0!");
goto final;
}
if (!new_buf)
{
elog (LOG_ERR, "new_buf is NULL");
goto final;
}
// Work
deleteblock_state_t *state = (deleteblock_state_t *) data;
int new_status = atoi ((char *) new_buf);
if ((new_status == 0) || (new_status == 1))
{
if (new_status != state->sinkstatus)
{
elog (LOG_INFO, "Changing status to %d", new_status);
state->sinkstatus = new_status;
}
else
{
elog (LOG_INFO, "Status is the same.");
}
}
else
{
elog (LOG_ERR, "Unrecognized status");
goto final;
}
final:
if (new_buf)
free (new_buf);
return EVENT_RENEW;
}
static void
status_init (deleteblock_state_t * state)
{
status_client_opts_t opts = {
devname:sim_path ("/dev/block/sinkstatus"),
handler:status_handler,
private_data:state
};
if (g_status_client_full (&opts, &state->status_context) < 0)
{
elog (LOG_CRIT, "Unable to open status client to %s", opts.devname);
exit (1);
}
}
static void
usage (char *name)
{
misc_print_usage (name, "", "");
exit (1);
}
int
main (int argc, char **argv)
{
misc_init (&argc, argv, CVSTAG);
{
emrun_opts_t emrun_opts = {
shutdown:block_generic_shutdown,
data:"deleteblock"
};
emrun_init (&emrun_opts);
}
elog (LOG_INFO, " ");
if (argc > 2)
{
usage (argv[0]);
}
use_mhsync = misc_parse_out_switch(&argc, argv, "use_mhsync", '\0');
if (use_mhsync)
{
elog (LOG_CRIT,"Using mhsync");
use_prefix = SSYNC_MULTIHOP_PREFIX;
}
deleteblock_state_t state = init_state ();
haveblock_sub (&state);
ackblock_sub (&state);
init_timer (&state);
status_init (&state);
/* cluster map event */
cluster_map_opts_t cl_opts = {
private_data: &state
};
if (cluster_map_new(&cl_opts, &cluster_map) < 0) {
elog(LOG_CRIT, "Can't connect to cluster map: %m");
exit(1);
}
g_main ();
return 1;
}
See more files for this project here