broadcastblock.c from EmStar at Krugle
Show broadcastblock.c syntax highlighted
char broadcastblock_c_cvsid[] =
"$Id: broadcastblock.c,v 1.14 2005/04/07 05:09:03 adparker Exp $";
#include "libdev/glib_dev.h"
#include "libmisc/misc.h"
#include "emrun/emrun.h"
#include "devel/block/block.h"
#include "devel/block/haveblock.h"
#include "devel/block/needblock.h"
#include "devel/state/ssync.h"
#include "link/link.h"
#include "libdev/status_client.h"
#include "devel/block/rddhop.h"
#include "devel/state/cluster_map.h"
int use_mhsync = 0;
char * use_prefix = SSYNC_CLUSTER_PREFIX;
typedef struct broadcastblock_state_s
{
haveblock_table_t *all_haveblocks;
int all_haveblocks_count;
needblock_table_t *all_needblocks;
int all_needblocks_count;
ssync_sub_t *haveblocks_sub;
ssync_sub_t *needblocks_sub;
lu_context_t *link_context;
link_pkt_t hdr;
char *blocks_dir;
rddhop_t * downstream;
int downstream_count;
status_client_context_t * status_context;
} broadcastblock_state_t;
cluster_map_t * cluster_map = NULL;
static int
needblock_sub_handler (ssync_sub_t * sub, needblock_table_t * table,
int count, void *data)
{
elog (LOG_NOTICE, " ");
broadcastblock_state_t *state = (broadcastblock_state_t *) data;
if (!use_mhsync)
needblock_table_convert_src_to_id(table, count, cluster_map);
needblock_table_t * filtered = NULL;
int filtered_count = 0;
rddhop_filter_needblock_table(state->downstream,
state->downstream_count,
table,
count,
&filtered,
&filtered_count);
if (state->all_needblocks != NULL)
free (state->all_needblocks);
state->all_needblocks = filtered;
state->all_needblocks_count = filtered_count;
{
buf_t *buf = buf_new ();
needblock_table_unparse (buf, filtered, filtered_count);
elog (LOG_NOTICE, "%s", buf->buf);
buf_free (buf);
}
free(table);
return EVENT_RENEW;
}
static int
haveblock_sub_handler (ssync_sub_t * sub, haveblock_table_t * table,
int count, void *data)
{
elog (LOG_NOTICE, " ");
broadcastblock_state_t *state = (broadcastblock_state_t *) data;
if (!use_mhsync)
haveblock_table_convert_src_to_id(table, count, cluster_map);
if (state->all_haveblocks != NULL)
free (state->all_haveblocks);
state->all_haveblocks = table;
state->all_haveblocks_count = count;
{
buf_t *buf = buf_new ();
haveblock_table_unparse (buf, table, count);
elog (LOG_NOTICE, "%s", buf->buf);
buf_free (buf);
}
return EVENT_RENEW;
}
static broadcastblock_state_t
init_state ()
{
buf_t *buf = buf_new ();
bufprintf (buf, BLOCK_DIR_PATTERN, my_node_id);
broadcastblock_state_t state = {
all_haveblocks:NULL,
all_haveblocks_count:0,
all_needblocks:NULL,
all_needblocks_count:0,
haveblocks_sub:NULL,
needblocks_sub:NULL,
link_context:NULL,
blocks_dir:strdup (buf->buf),
hdr:{
dst:{
id:LINK_BROADCAST}
,
type:PKT_TYPE_RDD_BLOCK},
status_context:NULL,
downstream:NULL,
downstream_count:0
};
return state;
}
static void
usage (char *name)
{
misc_print_usage
(name, "-U <device>", " --uses <device>: Specify link device to use.");
exit (1);
}
static void
init_link (char *uses, char **argv, broadcastblock_state_t * state)
{
if (uses == NULL)
{
elog (LOG_CRIT, "Please specify a link to use!");
usage (argv[0]);
}
lu_opts_t lu_opts = {
opts:{
name:uses,
data:state}
,
};
if (lu_open (&lu_opts, &state->link_context) < 0)
{
elog (LOG_CRIT, "Can't open %s: %m", link_name (&lu_opts.opts, NULL));
exit (1);
}
return;
}
static void
haveblock_sub (broadcastblock_state_t * state)
{
ssync_sub_opts_t opts = {
reread_period: REREAD_PERIOD,
read_refractory: REFRACTORY
};
if (haveblock_sub_open_full
(use_prefix, haveblock_sub_handler, state, &opts, &state->haveblocks_sub) < 0)
{
elog (LOG_ERR, "Unable to subscribe to haveblock channel: %m");
exit (1);
}
return;
}
static void
needblock_sub (broadcastblock_state_t * state)
{
ssync_sub_opts_t opts = {
reread_period:REREAD_PERIOD,
read_refractory: REFRACTORY
};
if (needblock_sub_open_full
(use_prefix, needblock_sub_handler, state, &opts, &state->needblocks_sub) < 0)
{
elog (LOG_ERR, "Unable to subscribe to needblock channel: %m");
exit (1);
}
return;
}
static void
broadcastblock_send_random (broadcastblock_state_t * state,
GTree * candidate_tree)
{
elog (LOG_NOTICE, " ");
elog (LOG_NOTICE, "Number of candidates: %d",
g_tree_nnodes (candidate_tree));
if (g_tree_nnodes (candidate_tree) == 0)
{
return;
}
block_t *data_block = NULL;
buf_t *packet = buf_new ();
GArray *array = block_tree_to_array (candidate_tree);
int count = array->len;
// randomly select an index between [0, count-1]
int index = (int) (1.0 * count * random () / (RAND_MAX + 1.0));
elog (LOG_NOTICE, "Randomly selected index %d of %d", index, count - 1);
// retrieve random block
block_t block = g_array_index (array, block_t, index);
// read the file into memory.
data_block = block_get_from_disk (state->blocks_dir, &block);
if (data_block == NULL)
goto done;
bufcpy (packet, &state->hdr, sizeof (state->hdr));
bufcpy (packet, data_block, sizeof (block_t) + data_block->length);
// send off the packet
{
buf_t *buf = buf_new ();
block_unparse (buf, data_block, "");
elog (LOG_NOTICE, "BROADCASTING block: %s", buf->buf);
buf_free (buf);
}
if (lu_send (state->link_context, (link_pkt_t *) packet->buf,
packet->len - sizeof (state->hdr)) < 0)
{
elog (LOG_CRIT, "Unable to broadcast message! : %m");
}
elog (LOG_NOTICE, "SUCCESSFUL BROADCAST");
done:
if (data_block)
free (data_block);
buf_free (packet);
g_array_free (array, 1);
return;
}
static int
broadcastblock_timer_cb (gpointer data, int interval, g_event_t * ev)
{
elog (LOG_NOTICE, " ");
broadcastblock_state_t *state = (broadcastblock_state_t *) data;
if (state->all_needblocks_count == 0)
{
return EVENT_RENEW;
}
GTree *haveblock_local_tree =
haveblock_table_to_GTreeFull (state->all_haveblocks,
state->all_haveblocks_count,
ONLY_LOCAL);
GTree *haveblock_local_notneeded_tree =
haveblock_table_to_GTreeFull (state->all_haveblocks,
state->all_haveblocks_count,
ONLY_LOCAL);
GArray *needblock_nonlocal_array =
needblock_table_to_GArray (state->all_needblocks,
state->all_needblocks_count,
ONLY_NONLOCAL);
block_remove_array_from_tree (needblock_nonlocal_array,
haveblock_local_notneeded_tree);
// haveblock_local_notneeded now contains blocks that no one needs.
GArray *haveblock_notneeded_array =
block_tree_to_array (haveblock_local_notneeded_tree);
block_remove_array_from_tree (haveblock_notneeded_array,
haveblock_local_tree);
// now haveblock_local_tree contains the blocks that people need.
// just need to remove empty block...
block_remove_empty_from_tree(haveblock_local_tree);
broadcastblock_send_random (state, haveblock_local_tree);
g_tree_destroy (haveblock_local_tree);
g_tree_destroy (haveblock_local_notneeded_tree);
g_array_free (needblock_nonlocal_array, 1);
g_array_free (haveblock_notneeded_array, 1);
return EVENT_RENEW;
}
static void
init_timer (broadcastblock_state_t * state)
{
// set up timer for broadcasting
if (g_timer_add (BROADCAST_TIMER, broadcastblock_timer_cb, state, NULL, NULL) < 0)
{
elog (LOG_CRIT, "Unable to create timer event.");
exit (1);
}
}
static int
status_handler(void*new_buf, size_t size, void * data)
{
elog(LOG_INFO," ");
broadcastblock_state_t * state = (broadcastblock_state_t*)data;
if (state->downstream)
free(state->downstream);
state->downstream = new_buf;
state->downstream_count = size / (sizeof(rddhop_t));
if ((size > 0) && (state->downstream_count == 0))
{
elog (LOG_ERR, "Somekind of division problem...count:%d = size:%d / rddhop_size: %d",state->downstream_count, size, sizeof(rddhop_t));
exit(1);
}
return EVENT_RENEW;
}
static void
status_init (broadcastblock_state_t * state)
{
status_client_opts_t opts = {
devname:sim_path("/dev/block/down_bin"),
handler:status_handler,
private_data:state
};
if (g_status_client_full(&opts, &state->status_context) < 0)
{
elog (LOG_ERR,"Unable to open status client to %s", opts.devname);
exit(1);
}
}
int
main (int argc, char **argv)
{
misc_init (&argc, argv, CVSTAG);
{ // EmRun will trigger this callback on shutdown
emrun_opts_t emrun_opts = {
shutdown:block_generic_shutdown,
// shutdown handler
data:"broadcastblock" // pointer passed to shutdown handler
};
emrun_init (&emrun_opts);
}
char *uses = link_parse_uses (&argc, argv, NULL);
elog (LOG_NOTICE, " ");
if (argc > 2)
{
usage (argv[0]);
}
use_mhsync = misc_parse_out_switch(&argc, argv, "use_mhsync", '\0');
if (use_mhsync)
{
elog (LOG_CRIT, "Using mhsync");
use_prefix = SSYNC_MULTIHOP_PREFIX;
}
broadcastblock_state_t state = init_state ();
haveblock_sub (&state);
needblock_sub (&state);
init_link (uses, argv, &state);
init_timer (&state);
status_init(&state);
/* cluster map event */
cluster_map_opts_t cl_opts = {
private_data: &state
};
if (cluster_map_new(&cl_opts, &cluster_map) < 0) {
elog(LOG_CRIT, "Can't connect to cluster map: %m");
exit(1);
}
g_main ();
return 1;
}
See more files for this project here