Show haveblock.c syntax highlighted
// Maintain table of blocks.
//
// Publishes a list of blocks that represents, for each file, the last
// of a continguous segment of blocks starting from the beginning of
// that file.
char haveblock_c_cvsid[] =
"$Id: haveblock.c,v 1.8 2005/06/11 03:52:02 adparker Exp $";
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <glib.h>
#include <ftw.h>
#include "libmisc/file.h"
#include "libmisc/misc_buf.h"
#include "libmisc/elog.h"
#include "emrun/emrun.h"
#include "libdev/glib_dev.h"
#include "link/link.h"
#include "link/link_headers.h"
#include "devel/state/ssync.h"
#include "devel/block_tree/block.h"
#include "devel/block_tree/haveblock.h"
#include "devel/block_tree/ioutils.h"
#include "devel/state/cluster_map.h"
int use_mhsync = 0;
char * use_prefix = SSYNC_CLUSTER_PREFIX;
cluster_map_t * cluster_map = NULL;
#define COUNT 3
#define FILE_COUNT_MAX 32
typedef struct haveblock_state_s
{
char *blocks_dir;
GArray *available_blocks;
int file_count;
lu_context_t * link_context;
} haveblock_state_t;
static haveblock_state_t *global_state = NULL;
// Checks the blocks directory and updates block info if required.
// * check to see if it's a regular file
// * open file for reading
// * read in sizeof(block_t) bytes
// * append this to the state's array
int
update_blocks (const char *file, const struct stat *sb, int flag)
{
elog (LOG_NOTICE, " ");
haveblock_state_t *state = global_state;
block_t block;
FILE *fp = NULL;
if (FTW_F != flag)
{
goto done;
}
fp = fopen (file, "rb");
if (fp == NULL)
{
elog (LOG_CRIT, "Unable to open file: %s: %m", file);
goto done;
}
size_t items_read = fread (&block, sizeof (block_t), 1, fp);
if (items_read != 1)
{
if (feof (fp))
elog (LOG_CRIT, "Unexpected EOF while reading %s", file);
if (ferror (fp) && !feof (fp))
elog (LOG_CRIT, "Error while reading %s: %m", file);
goto done;
}
g_array_append_val (state->available_blocks, block);
done:
if (fp != NULL)
fclose (fp);
return 0;
}
// The end result is a list of blocks that represents, for each file,
// the last block of a continguous segment of blocks starting from the
// beginning of that file.
static void
collapse_available_blocks(GArray ** array) // array of block_t
{
// sort the array:
g_array_sort_with_data(*array, block_GCompareData, NULL);
GArray * cumulative_array = g_array_new(0, 1, sizeof (block_t) );
// loop through the array
// yes, I don't need both looking_for_state and looking_for_end, but
// sometimes I like to be explicit, so I'm using both.
int looking_for_start = 1;
int looking_for_end = 0;
block_t candidate_block;
block_t current_block;
memset(&candidate_block,0,sizeof(block_t));
int i;
for (i = 0; i < (*array)->len; ++i)
{
current_block = g_array_index(*array, block_t, i);
if (looking_for_start && (current_block.offset == 0))
{
candidate_block = current_block;
looking_for_start = 0;
looking_for_end = 1;
}
else if (looking_for_end)
{
if ((block_uid_GCompareData(¤t_block.uid, &candidate_block.uid,
NULL))
|| (current_block.offset
> (candidate_block.offset + candidate_block.length)))
{ // candidate was the last of the segment
g_array_append_val (cumulative_array, candidate_block);
looking_for_start = 1;
looking_for_end = 0;
--i; // try this block again
}
else
{
candidate_block = current_block;
}
}
}
if (looking_for_end)
{
// the candidate_block is the end of the segment.
g_array_append_val (cumulative_array, candidate_block);
}
g_array_free (*array, 1);
*array = cumulative_array;
}
// Timer cb that periodically traverses the blocks dir and updates the
// list of available blocks
static int
update_blocks_timer_cb (gpointer data, int interval, g_event_t * ev)
{
elog (LOG_INFO, " ");
haveblock_state_t *state = (haveblock_state_t *) data;
state->file_count = 0;
g_array_free (state->available_blocks, 1);
state->available_blocks = g_array_new (0, 1, sizeof (block_t));
if (ftw (state->blocks_dir, update_blocks, 4) < 0)
{
elog (LOG_CRIT, "ftw encountered an error on %s: %m",
state->blocks_dir);
}
else
{
// collapse state->available_blocks down to containing only
// the last block that they have.
collapse_available_blocks(&state->available_blocks);
}
int delay = 1000 + (state->available_blocks->len) * DELAY_PER_FILE;
elog (LOG_NOTICE, "Number of total files: %d", state->available_blocks->len);
elog (LOG_NOTICE, "Delaying for %d ms.",delay);
return TIMER_RENEW;
}
static void
print_table_entry (void *entry)
{
buf_t *buf = buf_new ();
block_unparse (buf, entry, "");
elog (LOG_INFO, "Publishing the following...\n");
elog (LOG_INFO, "%s\n", buf->buf);
buf_free (buf);
}
// Returns allocated memory. Caller must free.
// State has a pointer to a GArray block_t's. This function just converts that
// GArray into a haveblock_table_t.
static haveblock_table_t *
create_table (const haveblock_state_t * state)
{
elog (LOG_NOTICE, " ");
int count = state->available_blocks->len;
haveblock_table_t *table =
(haveblock_table_t *) malloc (sizeof (haveblock_table_t) * count);
memset (table, 0, sizeof (haveblock_table_t) * count);
// This loop creates individual entries: haveblock_table_t gets block_t's.
int i;
for (i = 0; i < count; ++i)
{
table[i].haveblock = g_array_index (state->available_blocks,
block_t, i);
print_table_entry (&table[i].haveblock);
}
return table;
}
static int
publish_blocks_timer_cb (gpointer data, int interval, g_event_t * ev)
{
elog (LOG_NOTICE, " ");
haveblock_state_t *state = (haveblock_state_t *) data;
flow_id_t fid = {
src_if:1,
dst_if:1,
dst:LINK_BROADCAST,
max_hops:MHSYNC_ONE_HOP
};
if (!use_mhsync) {
if (lu_get_if_id(state->link_context, &fid.src) != 0)
{
elog (LOG_ERR, "Unable to get if_id");
exit(1);
}
}
int count = state->available_blocks->len;
haveblock_table_t *table = create_table (state);
if (haveblock_pub_add_empty (use_prefix, table, count, &fid) < 0)
{
elog (LOG_CRIT, "Unable to publish.");
}
if (table)
free (table);
return TIMER_RENEW;
}
static haveblock_state_t
haveblock_init ()
{
buf_t *buf = buf_new ();
bufprintf (buf, BLOCK_DIR_PATTERN, my_node_id);
haveblock_state_t state = {
blocks_dir:strdup (buf->buf),
available_blocks:g_array_new (0, 1, sizeof (block_t))
};
buf_free (buf);
return state;
}
static void
usage (char *name)
{
misc_print_usage
(name, "-U <device>", " --uses <device>: Specify link device to use.");
exit (1);
}
static void
init_link (char *uses, char **argv, haveblock_state_t * state)
{
if (uses == NULL)
{
elog (LOG_CRIT, "Please specify a link to use!");
usage (argv[0]);
}
lu_opts_t lu_opts = {
opts:{
name:uses,
data:state}
,
};
if (lu_open (&lu_opts, &state->link_context) < 0)
{
elog (LOG_CRIT, "Can't open %s: %m", link_name (&lu_opts.opts, NULL));
exit (1);
}
return;
}
int
main (int argc, char *argv[])
{
misc_init (&argc, argv, CVSTAG);
{
emrun_opts_t emrun_opts = {
shutdown:block_generic_shutdown,
data:"haveblock"
};
emrun_init (&emrun_opts);
}
char * uses = link_parse_uses(&argc, argv, NULL);
if(argc > 2)
{
usage (argv[0]);
}
use_mhsync = misc_parse_out_switch(&argc, argv, "use_mhsync", '\0');
if (use_mhsync)
{
elog (LOG_CRIT, "Using mhsync");
use_prefix = SSYNC_MULTIHOP_PREFIX;
}
haveblock_state_t state = haveblock_init ();
global_state = &state;
init_link(uses, argv, &state);
if (mkdir_with_parents (state.blocks_dir) < 0)
{
elog (LOG_CRIT, "Failed to make directory: %s, %m", state.blocks_dir);
exit (1);
}
if (g_timer_add (PUBLISH_BLOCKS_TIMER, publish_blocks_timer_cb,
&state, NULL, NULL)
< 0)
{
elog (LOG_CRIT, "Unable to create timer event.");
exit (1);
}
if (g_timer_add (UPDATE_BLOCKS_TIMER, update_blocks_timer_cb, &state, NULL, NULL)
< 0)
{
elog (LOG_CRIT, "Unable to create timer event.");
exit (1);
}
/* cluster map event */
cluster_map_opts_t cl_opts = {
private_data: &state
};
if (cluster_map_new(&cl_opts, &cluster_map) < 0) {
elog(LOG_CRIT, "Can't connect to cluster map: %m");
exit(1);
}
g_main ();
elog (LOG_ALERT, "Event system terminated abnoramlly.");
return 1;
}
See more files for this project here