Show needblock.c syntax highlighted
char needblock_c_cvsid[] =
"$Id: needblock.c,v 1.19 2005/04/07 03:20:44 adparker Exp $";
#include <time.h>
#include "libdev/glib_dev.h"
#include "libmisc/misc.h"
#include "emrun/emrun.h"
#include "devel/block/block.h"
#include "devel/block/haveblock.h"
#include "devel/block/needblock.h"
#include "devel/block/ackblock.h"
#include "devel/state/ssync.h"
#include "link/link.h"
#include "libdev/status_client.h"
#include "devel/block/rddhop.h"
#include "devel/state/cluster_map.h"
int use_mhsync = 0;
char * use_prefix = SSYNC_CLUSTER_PREFIX;
typedef struct needblock_state_s
{
haveblock_table_t *all_haveblocks;
int all_haveblocks_count;
ackblock_table_t *all_ackblocks;
int all_ackblocks_count;
ssync_sub_t *haveblocks_sub;
ssync_sub_t *ackblocks_sub;
flow_id_t fid;
status_client_context_t * status_context;
rddhop_t * upstream;
int upstream_count;
lu_context_t * link_context;
} needblock_state_t;
cluster_map_t * cluster_map = NULL;
int
haveblock_sub_handler (ssync_sub_t * sub, haveblock_table_t * table,
int count, void *data)
{
elog (LOG_NOTICE, " ");
needblock_state_t *state = (needblock_state_t *) data;
if (!use_mhsync)
haveblock_table_convert_src_to_id(table, count, cluster_map);
haveblock_table_t * filtered = NULL;
int filtered_count = 0;
rddhop_filter_haveblock_table(state->upstream,
state->upstream_count,
table,
count,
&filtered,
&filtered_count);
if (state->all_haveblocks != NULL)
free (state->all_haveblocks);
state->all_haveblocks = filtered;
state->all_haveblocks_count = filtered_count;
{
buf_t *buf = buf_new ();
haveblock_table_unparse (buf, filtered, filtered_count);
elog (LOG_INFO, "%s", buf->buf);
buf_free (buf);
}
free(table);
return EVENT_RENEW;
}
int
ackblock_sub_handler (ssync_sub_t * sub, ackblock_table_t * table,
int count, void *data)
{
elog (LOG_NOTICE, " ");
needblock_state_t *state = (needblock_state_t *) data;
if (state->all_ackblocks != NULL)
free (state->all_ackblocks);
state->all_ackblocks = table;
state->all_ackblocks_count = count;
{
buf_t *buf = buf_new ();
ackblock_table_unparse (buf, table, count);
elog (LOG_INFO, "%s", buf->buf);
buf_free (buf);
}
return EVENT_RENEW;
}
needblock_state_t
init_state ()
{
needblock_state_t state = {
all_haveblocks:NULL,
all_haveblocks_count:0,
all_ackblocks:NULL,
all_ackblocks_count:0,
haveblocks_sub:NULL,
ackblocks_sub:NULL,
fid:{
src_if:1,
dst_if:1,
dst:LINK_BROADCAST,
max_hops:MHSYNC_ONE_HOP
},
status_context:NULL,
upstream:NULL,
upstream_count:0
};
return state;
}
// needblock = all_have - i_have - acked
static int
publish_needblock_timer_cb (gpointer data, int interval, g_event_t * ev)
{
elog (LOG_NOTICE, " ");
needblock_state_t *state = (needblock_state_t *) data;
GTree *haveblock_nonlocal_tree =
haveblock_table_to_GTreeFull (state->all_haveblocks,
state->all_haveblocks_count,
ONLY_NONLOCAL);
GArray *ackblock_array = ackblock_table_to_GArray (state->all_ackblocks,
state->
all_ackblocks_count,
ALLOW_ALL);
GArray * haveblock_local_array = haveblock_table_to_GArray (state->all_haveblocks,
state->
all_haveblocks_count,
ONLY_LOCAL);
block_remove_array_from_tree (ackblock_array, haveblock_nonlocal_tree);
block_remove_array_from_tree (haveblock_local_array, haveblock_nonlocal_tree);
int count = g_tree_nnodes (haveblock_nonlocal_tree);
needblock_table_t *needblock_table =
needblock_create_table_from_tree (haveblock_nonlocal_tree);
if (needblock_pub_add_empty (use_prefix, needblock_table, count, &state->fid) < 0)
{
elog (LOG_CRIT, "Unable to publish needblock_table");
}
{
buf_t *buf = buf_new ();
bufprintf (buf, "Publishing the following NEEDBLOCK_TABLE...");
needblock_table_unparse (buf, needblock_table, count);
elog (LOG_NOTICE, "%s", buf->buf);
buf_free (buf);
}
free (needblock_table);
g_tree_destroy (haveblock_nonlocal_tree);
g_array_free (ackblock_array, 1);
g_array_free (haveblock_local_array, 1);
return TIMER_RENEW_MS(TIMER_DELAY(count));
}
static int
status_handler(void*new_buf, size_t size, void * data)
{
elog(LOG_INFO," ");
needblock_state_t * state = (needblock_state_t*)data;
if (state->upstream)
free(state->upstream);
state->upstream = new_buf;
state->upstream_count = size / (sizeof(rddhop_t));
if ((size > 0) && (state->upstream_count == 0))
{
elog (LOG_ERR, "Somekind of division problem...count:%d = size:%d / rddhop_size: %d",state->upstream_count, size, sizeof(rddhop_t));
exit(1);
}
return EVENT_RENEW;
}
static void
usage (char *name)
{
misc_print_usage
(name, "-U <device>", " --uses <device>: Specify link device to use.");
exit (1);
}
static void
status_init (needblock_state_t * state)
{
status_client_opts_t opts = {
devname:sim_path("/dev/block/up_bin"),
handler:status_handler,
private_data:state
};
if (g_status_client_full(&opts, &state->status_context) < 0)
{
elog (LOG_ERR,"Unable to open status client to %s", opts.devname);
exit(1);
}
}
static void
init_link (char *uses, char **argv, needblock_state_t * state)
{
if (uses == NULL)
{
elog (LOG_CRIT, "Please specify a link to use!");
usage (argv[0]);
}
lu_opts_t lu_opts = {
opts:{
name:uses,
data:state}
,
};
if (lu_open (&lu_opts, &state->link_context) < 0)
{
elog (LOG_CRIT, "Can't open %s: %m", link_name (&lu_opts.opts, NULL));
exit (1);
}
if (lu_get_if_id(state->link_context, &state->fid.src) != 0)
{
elog (LOG_CRIT, "Error looking up my interface id.");
exit(1);
}
if (use_mhsync)
{
state->fid.src = my_node_id;
}
return;
}
int
main (int argc, char **argv)
{
misc_init (&argc, argv, CVSTAG);
{ // EmRun will trigger this callback on shutdown
emrun_opts_t emrun_opts = {
shutdown:block_generic_shutdown,
// shutdown handler
data:"needblock" // pointer passed to shutdown handler
};
emrun_init (&emrun_opts);
}
elog (LOG_NOTICE, " ");
char * uses = link_parse_uses(&argc, argv, NULL);
needblock_state_t state = init_state ();
if (argc > 2)
{
usage("needblock");
}
// subscribe to haveblock_table_t
use_mhsync = misc_parse_out_switch(&argc, argv, "use_mhsync", '\0');
if (use_mhsync)
{
elog (LOG_CRIT,"Using mhsync");
use_prefix = SSYNC_MULTIHOP_PREFIX;
}
ssync_sub_opts_t opts = {
reread_period: REREAD_PERIOD,
read_refractory: REFRACTORY
};
if (haveblock_sub_open_full
(use_prefix, haveblock_sub_handler, &state, &opts, &state.haveblocks_sub) < 0)
{
elog (LOG_ERR, "Unable to subscribe to haveblock channel: %m");
exit (1);
}
// subscribe to ackblock_table_t
if (ackblock_sub_open_full
(SSYNC_MULTIHOP_PREFIX, ackblock_sub_handler, &state, &opts, &state.ackblocks_sub) < 0)
{
elog (LOG_ERR, "Unable to subscribe to ackblock channel: %m");
exit (1);
}
// set up timer for publishing
if (g_timer_add (NEEDBLOCK_TIMER, publish_needblock_timer_cb, &state, NULL, NULL) < 0)
{
elog (LOG_CRIT, "Unable to create timer event.");
exit (1);
}
status_init(&state);
init_link(uses, argv, &state);
/* cluster map event */
cluster_map_opts_t cl_opts = {
private_data: &state
};
if (cluster_map_new(&cl_opts, &cluster_map) < 0) {
elog(LOG_CRIT, "Can't connect to cluster map: %m");
exit(1);
}
g_main ();
return 1;
}
See more files for this project here