Show ackblock.c syntax highlighted
/*
ackblock uses the haveblock table to determine when a file has been
completely downloaded.
Ack's job is to acknowledge the "highest" seen file thus far, on a
per originating-node basis.
Ack maintains an array of acks.
Periodically, ack will check the list of haveblocks. For any
completed file, ack will move the directory to ACKED_DIR_PATTERN,
update its array of acks, and publish the ackblock table. Simple huh?
*/
char ackblock_c_cvsid[] = "$Id: ackblock.c,v 1.6 2005/06/15 13:13:18 adparker Exp $";
#include <glib.h>
#include "libdev/glib_dev.h"
#include "libmisc/misc.h"
#include "emrun/emrun.h"
#include "libdev/status_client.h"
#include "devel/block_tree/block.h"
#include "devel/block_tree/haveblock.h"
#include "devel/block_tree/ackblock.h"
#include "devel/state/ssync.h"
#include "link/link.h"
#include "devel/state/cluster_map.h"
int use_mhsync = 0;
char * use_prefix = SSYNC_CLUSTER_PREFIX;
typedef struct ackblock_state_s
{
haveblock_table_t *all_haveblocks;
int all_haveblocks_count;
ssync_sub_t *haveblocks_sub;
status_client_context_t *status_context;
g_event_t *timer_context;
uint8_t sinkstatus;
flow_id_t fid;
lu_context_t * link_context;
char * blocks_dir;
char * acked_dir;
GArray * ackblocks;
} ackblock_state_t;
static ackblock_state_t
init_state ()
{
buf_t * buf = buf_new ();
bufprintf (buf, BLOCK_DIR_PATTERN, my_node_id);
buf_t * buf2 = buf_new ();
bufprintf (buf2, ACKED_DIR_PATTERN, my_node_id);
ackblock_state_t state = {
all_haveblocks:NULL,
all_haveblocks_count:0,
haveblocks_sub:NULL,
status_context:NULL,
timer_context:NULL,
sinkstatus:0,
fid:{
src:my_node_id,
dst:LINK_BROADCAST,
max_hops:MAX_HOPS,
},
blocks_dir: strdup (buf->buf),
acked_dir: strdup (buf2->buf)
};
state.ackblocks = g_array_new (0, 1, sizeof (block_t));
return state;
}
cluster_map_t * cluster_map = NULL;
// Allocates memory.
//
// The array is assumed to contain blocks that we have on disk. For
// each completed file, get_finished_blocks returns the last block of
// that file.
static GArray *
get_finished_blocks (GArray * array)
{
GArray * finished_array = g_array_new (0, 1, sizeof (block_t));
int i;
for (i = 0; i < array->len; ++i)
{
block_t block = g_array_index (array, block_t, i);
if ((block.offset + block.length)
== (block.uid.total_length))
{
g_array_append_val (finished_array, block);
}
}
return finished_array;
}
static void
move_finished_blocks_helper (ackblock_state_t * state, block_t block)
{
elog (LOG_INFO, " ");
// Create the old and new directory name.
buf_t * olddir = buf_new ();
buf_t * newdir = buf_new ();
int old_exists = 0;
int new_exists = 0;
DIR * foo = NULL;
bufprintf(olddir, "%s/%s", state->blocks_dir, block.uid.name);
bufprintf(newdir, "%s/%s", state->acked_dir, block.uid.name);
if (mkdir_with_parents (state->acked_dir) < 0)
{
elog (LOG_CRIT, "Unable to create directory to place acked blocks: %s: %m",
state->acked_dir);
goto final;
}
foo = opendir (olddir->buf);
if (foo) old_exists = 1;
closedir (foo);
foo = opendir (newdir->buf);
if (foo) new_exists = 1;
closedir (foo);
if ( old_exists && !new_exists && (rename (olddir->buf, newdir->buf) < 0) )
{
elog (LOG_CRIT, "Unable to carry out the following rename: %s --> %s: %m",
olddir->buf, newdir->buf);
goto final;
}
final:
buf_free (newdir);
buf_free (olddir);
}
// finished_blocks should contain a list of the last blocks of
// completed files.
//
// It simply moves the block's directory to ACKED_DIR_PATTERN
static void
move_finished_blocks (ackblock_state_t * state, GArray * finished_blocks)
{
elog (LOG_INFO, " ");
int i;
for (i = 0; i < finished_blocks->len; ++i)
{
block_t block = g_array_index (finished_blocks, block_t, i);
move_finished_blocks_helper (state, block);
}
}
static void
update_ackblock_table (ackblock_state_t * state, GArray * finished_blocks)
{
elog (LOG_NOTICE, " ");
// For each of the finished blocks
// Search for an ack block with the same origin node id.
// if one is found,
// then if ack block is < finish, replace ack with finish.
// else
// none is found. so append finish to ack array.
int finished_index;
for (finished_index = 0; finished_index < finished_blocks->len;
++finished_index)
{
int found = 0;
int ack_index = 0;
block_t finished_block = g_array_index (finished_blocks, block_t,
finished_index);
for (ack_index = 0; ack_index < state->ackblocks->len; ++ack_index)
{
block_t ackblock = g_array_index (state->ackblocks, block_t, ack_index);
if (ackblock.uid.node_id == finished_block.uid.node_id)
{
found = 1;
// If finished_block's name is greater than ackblock's,
// then replace ackblock with finished_block in the array.
if (strcmp (finished_block.uid.name, ackblock.uid.name) > 0 )
{
g_array_remove_index (state->ackblocks, ack_index);
g_array_insert_val (state->ackblocks, ack_index, finished_block);
}
// Regardless, break out of the for loop, since there
// should only be at most one match between ack and
// finished blocks, at the level of node_ids.
break;
} // end if
} // end for (ack_index ...)
// If no matching block was found in the ackblock array, append
// the finished block.
if (found == 0)
{
g_array_append_val (state->ackblocks, finished_block);
}
} // end for (finished_index ...
return;
}
// This is where the real work happens.
//
// Filter the haveblock table down to completed files.
// For each of the completed files, move to ACKED_DIR_PATTERN.
// For each of the completed files, update the ackblock table.
static void
publish_ackblock_table (ackblock_state_t * state)
{
int count = state->ackblocks->len;
ackblock_table_t *ackblock_table =
ackblock_create_table_from_array (state->ackblocks);
if (ackblock_pub_add_empty (SSYNC_MULTIHOP_PREFIX, ackblock_table, count,
&state->fid)
< 0)
{
elog (LOG_CRIT, "Unable to publish ackblock_table");
}
{
buf_t *buf = buf_new ();
bufprintf (buf, "Publishing the following ACKBLOCK_TABLE...");
ackblock_table_unparse (buf, ackblock_table, count);
elog (LOG_NOTICE, "%s", buf->buf);
buf_free (buf);
}
free (ackblock_table);
return;
}
// If I'm not the sink, bail.
//
// Move the finished blocks over to ACKED_DIR_PATTERN
// Update ackblock table and publish.
static int
publish_ackblock_timer (gpointer data, int interval, g_event_t * ev)
{
elog (LOG_NOTICE, " ");
ackblock_state_t * state = (ackblock_state_t *)data;
if (state->sinkstatus == 0)
{
return TIMER_RENEW;
}
elog (LOG_NOTICE,"I'm a sink... taking action...");
// convert haveblocks to local array
GArray *haveblock_local_array =
haveblock_table_to_GArray (state->all_haveblocks,
state->all_haveblocks_count, ONLY_LOCAL);
// Copy over those that have the last block.
GArray * finished_blocks = get_finished_blocks (haveblock_local_array);
// Move over those finished blocks to ACKED_DIR_PATTERN
move_finished_blocks (state, finished_blocks);
// Update ackblock table with the latest info.
update_ackblock_table (state, finished_blocks);
// Publish
publish_ackblock_table (state);
g_array_free (finished_blocks, 1);
g_array_free (haveblock_local_array, 1);
return TIMER_RENEW;
}
// When there's a new haveblock table, update my copy, possibly
// converting source interface ids to node ids.
static int
haveblock_sub_handler (ssync_sub_t * sub, haveblock_table_t * table,
int count, void *data)
{
elog (LOG_NOTICE, " ");
ackblock_state_t *state = (ackblock_state_t *) data;
if (!use_mhsync)
haveblock_table_convert_src_to_id(table, count, cluster_map);
int new_count;
haveblock_table_t * new_table = NULL;
haveblock_remove_empty_blocks (table, count, &new_table, &new_count);
if (state->all_haveblocks != NULL)
free (state->all_haveblocks);
state->all_haveblocks = new_table;
state->all_haveblocks_count = new_count;
{
buf_t *buf = buf_new ();
haveblock_table_unparse (buf, new_table, new_count);
elog (LOG_NOTICE, "%s", buf->buf);
buf_free (buf);
}
free (table);
return EVENT_RENEW;
}
// Timer to periodically publish the ackblock table.
static void
init_timer (ackblock_state_t * state)
{
if (g_timer_add
(ACKBLOCK_TIMER, publish_ackblock_timer, state, NULL, &state->timer_context) < 0)
{
elog (LOG_CRIT, "Unable to create timer event.");
exit (1);
}
}
// haveblock channel is used to indicate when a file has completed its
// download.
//
// The assumption is that haveblock reports the last block of a
// contiguous series of blocks starting from the beginning.
//
// We can tell when a file has completely downloaded when haveblock
// reports the last block of a file.
static void
haveblock_sub (ackblock_state_t * state)
{
ssync_sub_opts_t opts = {
reread_period: REREAD_PERIOD,
read_refractory: REFRACTORY
};
if (haveblock_sub_open_full
(use_prefix, haveblock_sub_handler, state, &opts, &state->haveblocks_sub) < 0)
{
elog (LOG_ERR, "Unable to subscribe to haveblock channel: %m");
exit (1);
}
return;
}
// Notice when our sink status changes, and updates state.
static int
status_handler (void *new_buf, size_t size, void *data)
{
elog (LOG_NOTICE,"Received %s, size %d", (char *) new_buf, size);
if (size == 0)
{
elog (LOG_ERR, "Size is 0!");
goto final;
}
if (!new_buf)
{
elog (LOG_ERR, "new_buf is NULL");
goto final;
}
ackblock_state_t *state = (ackblock_state_t *) data;
int new_status = atoi ((char *) new_buf);
if ((new_status == 0) || (new_status == 1))
{
if (new_status != state->sinkstatus)
{
elog (LOG_NOTICE, "Changing status to %d", new_status);
state->sinkstatus = new_status;
}
else
{
elog (LOG_NOTICE, "Status is the same.");
}
}
else
{
elog (LOG_ERR, "Unrecognized status");
goto final;
}
final:
if (new_buf)
free (new_buf);
return EVENT_RENEW;
}
// Status client of /dev/block/sinkstatus, to notify me when sink
// status is changed.
static void
status_init (ackblock_state_t * state)
{
status_client_opts_t opts = {
devname:sim_path ("/dev/block/sinkstatus"),
handler:status_handler,
private_data:state
};
if (g_status_client_full (&opts, &state->status_context) < 0)
{
elog (LOG_CRIT, "Unable to open status client to %s", opts.devname);
exit (1);
}
}
static void
usage (char *name)
{
misc_print_usage
(name, "-U <device>", " --uses <device>: Specify link device to use.");
exit (1);
}
// Initialize a link client. This might be left over from something,
// since I don't seem to be using the link for anything...
static void
init_link (char *uses, char **argv, ackblock_state_t * state)
{
if (uses == NULL)
{
elog (LOG_CRIT, "Please specify a link to use!");
usage (argv[0]);
}
lu_opts_t lu_opts = {
opts:{
name:uses,
data:state}
,
};
if (lu_open (&lu_opts, &state->link_context) < 0)
{
elog (LOG_CRIT, "Can't open %s: %m", link_name (&lu_opts.opts, NULL));
exit (1);
}
return;
}
// emrun init
// parse out which link to use
// parse out if we're using mhsync or cluster sync directly.
// initialize state data structure
// subscribe to haveblock channel
// initialize timer
// initialize status client (client of /dev/block/sinkstatus)
// initialize link (Now that I look at it, I'm not sure why I need the link...)
// cluster map stuff (Need to convert interface id to node id
int
main (int argc, char **argv)
{
misc_init (&argc, argv, CVSTAG);
{
emrun_opts_t emrun_opts = {
shutdown: block_generic_shutdown, data:"ackblock"
};
emrun_init (&emrun_opts);
}
elog (LOG_NOTICE, " ");
char * uses = link_parse_uses (&argc, argv, NULL);
if (argc > 2)
{
usage (argv[0]);
}
use_mhsync = misc_parse_out_switch (&argc, argv, "use_mhsync",'\0');
if (use_mhsync)
{
elog (LOG_CRIT,"Using mhsync");
use_prefix = SSYNC_MULTIHOP_PREFIX;
}
ackblock_state_t state = init_state ();
haveblock_sub (&state);
init_timer (&state);
status_init (&state);
init_link(uses, argv, &state);
/* cluster map event */
cluster_map_opts_t cl_opts = {
private_data: &state
};
if (cluster_map_new (&cl_opts, &cluster_map) < 0) {
elog(LOG_CRIT, "Can't connect to cluster map: %m");
exit(1);
}
g_main ();
return 1;
}
See more files for this project here