sinkgradient_with_LENT.c from EmStar at Krugle
Show sinkgradient_with_LENT.c syntax highlighted
char sinkgradient_c_cvsid[] = "$Id: sinkgradient_with_LENT.c,v 1.1 2006/03/24 23:48:56 adparker Exp $";
#include <assert.h>
#include <stdint.h>
#include <glib.h>
#include <time.h>
#include "link/link.h"
#include "emrun/emrun.h"
#include "libmisc/misc.h"
#include "libmisc/misc_buf.h"
#include "devel/state/cluster_map.h"
#include "devel/state/ssync.h"
#include "libdev/status_dev.h"
#include "devel/block/rddhop.h"
#define SINKGRADIENT_TIMER 1000
cluster_map_t * cluster_map = NULL;
int use_mhsync = 0;
char * use_prefix = SSYNC_CLUSTER_PREFIX;
/** state sync stuff start **/
/*
static gint
rddhop_compare (gconstpointer a, gconstpointer b)
{
const rddhop_t *aa = (const rddhop_t *) a;
const rddhop_t *bb = (const rddhop_t *) b;
return aa->hop_number - bb->hop_number;
}
*/
SSYNC_PUB_TYPESAFE_INLINES (rddhop, rddhop_t, "rddhop", sizeof (rddhop_t));
SSYNC_PUB_TYPESAFE_FUNCS (rddhop, rddhop_t);
static inline void
rddhop_unparse (buf_t * LENT buf, rddhop_t * LENT rddhop)
{
bufprintf (buf, "\thop_number: %d, host: %s, ETX*100: %d\n",
rddhop->hop_number, print_if_id (rddhop->host), rddhop->etx);
}
static inline void
rddhop_table_unparse (buf_t * LENT buf, rddhop_table_t * LENT table, int count)
{
elog(LOG_NOTICE," ");
int i;
for (i = 0; i < count; ++i)
{
const char *seconds =
misc_secs_to_str ((int) (time (0) - table[i].header.rcv_time));
bufprintf (buf, " \n\tSRC:%-15s",
print_if_id (table[i].header.flow_id.src));
bufprintf (buf, " HOPS: %d ", table[i].header.hops_away);
bufprintf (buf, " SECONDS_OLD: %s \n", seconds);
rddhop_unparse (buf, &table[i].rddhop);
}
}
/** state sync stuff end **/
typedef struct sinkgradient_state_s
{
rddhop_table_t *all_rddhops;
int all_rddhops_count;
ssync_sub_t *rddhops_sub;
flow_id_t fid;
uint8_t sinkstatus;
status_client_context_t * status_context;
status_context_t * status_dev_context;
status_context_t * status_dev_up_bin_context;
status_context_t * status_dev_down_bin_context;
status_context_t * status_dev_up_ascii_context;
status_context_t * status_dev_down_ascii_context;
rddhop_t * etx_lower;
int etx_lower_count;
rddhop_t * etx_higher;
int etx_higher_count;
uint32_t myetx;
lu_context_t * link_context;
} sinkgradient_state_t;
/****************** stuff for rddhop_path_* **/
typedef struct rddhop_path_key_s
{
uint32_t src;
} rddhop_path_key_t;
static void
key_destroy_notify (gpointer data)
{
if (data != NULL)
free (data);
return;
}
static gint
key_comp (gconstpointer LENT a, gconstpointer LENT b, gpointer user_data)
{
rddhop_path_key_t *aa = (rddhop_path_key_t *) a;
rddhop_path_key_t *bb = (rddhop_path_key_t *) b;
return aa->src - bb->src;
}
typedef struct rddhop_path_data_s
{
GArray *array; // of rddhop_t
} rddhop_path_data_t;
static rddhop_path_data_t *
data_new ()
{
rddhop_path_data_t *data =
(rddhop_path_data_t *) malloc (sizeof (rddhop_path_data_t));
memset (data, 0, sizeof (rddhop_path_data_t));
data->array = g_array_new (0, 1, sizeof (rddhop_t));
return data;
}
static void
data_destroy_notify (gpointer data)
{
if (data)
{
rddhop_path_data_t *d = (rddhop_path_data_t *) data;
if (d->array)
g_array_free (d->array, 1);
}
}
/********************** end rddhop_path_* **/
int
sinkgradient_printable(status_context_t * LENT info, buf_t * LENT buf)
{
sinkgradient_state_t* LENT state=(sinkgradient_state_t*)sd_data(info);
bufprintf(buf,"RDDHOPS_TABLE\n");
rddhop_table_unparse(buf, state->all_rddhops, state->all_rddhops_count);
return STATUS_MSG_COMPLETE;
}
static void
convert_src_to_id(rddhop_table_t * table, int count)
{
int i;
for (i = 0; i < count; ++i)
{
uint32_t * src = &table[i].header.flow_id.src;
cl_id_t * cl_id = cluster_map_lookup_by_address(cluster_map, *src, 1, 0);
if(cl_id)
{
*src = cl_id->node_id;
}
}
}
static int
rddhop_sub_handler (ssync_sub_t * sub, rddhop_table_t * LENT table,
int count, void *data)
{
elog (LOG_NOTICE, " ");
sinkgradient_state_t *state = (sinkgradient_state_t *) data;
if (state->all_rddhops != NULL)
free (state->all_rddhops);
state->all_rddhops = table;
state->all_rddhops_count = count;
if (!use_mhsync)
convert_src_to_id(table,count);
buf_t *buf = buf_new ();
rddhop_table_unparse (buf, state->all_rddhops, state->all_rddhops_count);
elog (LOG_NOTICE, "%s", buf->buf);
buf_free (buf);
g_status_dev_notify(state->status_dev_context);
return EVENT_RENEW;
}
static sinkgradient_state_t
init_state ()
{
sinkgradient_state_t state = {
all_rddhops:NULL,
all_rddhops_count:0,
rddhops_sub:NULL,
fid:{
// src:my_node_id, Do this later
dst:LINK_BROADCAST,
src_if: 1,
dst_if: 1,
max_hops:MHSYNC_ONE_HOP
},
sinkstatus:0,
status_context:NULL,
status_dev_context:NULL,
status_dev_up_bin_context:NULL,
status_dev_up_ascii_context:NULL,
status_dev_down_bin_context:NULL,
status_dev_down_ascii_context:NULL,
etx_lower:NULL,
etx_lower_count:0,
etx_higher:NULL,
etx_higher_count:0,
myetx:-1,
};
return state;
}
static void
usage (char * LENT name)
{
misc_print_usage
(name, "-U <device>", " --uses <device>: Specify link device to use.");
exit (1);
}
static void
init_link (char * LENT uses, char ** LENT argv, sinkgradient_state_t * LENT state)
{
if (uses == NULL)
{
elog (LOG_CRIT, "Please specify a link to use!");
usage (argv[0]);
}
lu_opts_t lu_opts = {
opts:{
name:uses,
data:state}
,
};
if (lu_open (&lu_opts, &state->link_context) < 0)
{
elog (LOG_CRIT, "Can't open %s: %m", link_name (&lu_opts.opts, NULL));
exit (1);
}
if (lu_get_if_id(state->link_context, &state->fid.src) != 0)
{
elog (LOG_CRIT, "Error looking up my interface id.");
exit(1);
}
if (use_mhsync) state->fid.src = my_node_id;
return;
}
static void
rddhop_sub (sinkgradient_state_t * LENT state)
{
ssync_sub_opts_t opts = { reread_period:REREAD_PERIOD, read_refractory:REFRACTORY };
if (rddhop_sub_open_full (use_prefix, rddhop_sub_handler, state, &opts, &state->rddhops_sub)
< 0)
{
elog (LOG_ERR, "Unable to subscribe to rddhop channel: %m");
exit (1);
}
return;
}
static void
key_free (rddhop_path_key_t * key)
{
key_destroy_notify (key);
}
static rddhop_path_key_t *
key_new ()
{
rddhop_path_key_t *key =
(rddhop_path_key_t *) malloc (sizeof (rddhop_path_key_t));
memset (key, 0, sizeof (rddhop_path_key_t));
return key;
}
static rddhop_path_key_t *
key_dup (const rddhop_path_key_t * LENT key)
{
rddhop_path_key_t *newkey = key_new ();
memcpy (newkey, key, sizeof (rddhop_path_key_t));
return newkey;
}
static rddhop_path_key_t *
key_init (const rddhop_table_t table)
{
rddhop_path_key_t *key = key_new ();
key->src = table.header.flow_id.src;
return key;
}
/* static rddhop_path_key_t
key_copy (const rddhop_path_key_t * key)
{
return *key;
} */
static rddhop_path_key_t
key_copy_src (uint32_t src)
{
rddhop_path_key_t key = {
src:src
};
return key;
}
// returns pointer to memory owned by the tree
// caller still responsible for the key
rddhop_path_data_t *
key_get_data (GTree * LENT tree, const rddhop_path_key_t * LENT key)
{
rddhop_path_key_t *mykey = key_dup (key);
int free_key = 1;
rddhop_path_data_t *data = g_tree_lookup (tree, key);
if (!data)
{
data = data_new ();
g_tree_insert (tree, mykey, data);
// tree now owns key. don't have to free.
free_key = 0;
}
if (free_key)
key_free (mykey);
return data;
}
static void
data_append_hop (rddhop_path_data_t * LENT data, rddhop_t hop)
{
g_array_append_val (data->array, hop);
}
GTree *
rddhop_table_to_path_tree (const rddhop_table_t * LENT table, int count)
{
// GTree key is rddhop_path_key_t
// key.src = rddhop_table_t[i].header.flow_id.src
// GTree data is rddhop_path_data_t
// g_array_append (data.array, rddhop_table_t[i].rddhop)
GTree *tree =
g_tree_new_full (key_comp, NULL, key_destroy_notify, data_destroy_notify);
int i;
for (i = 0; i < count; ++i)
{
// create a rddhop_path_key from rddhop_table_t[i]
rddhop_path_key_t *key = key_init (table[i]);
// look up that key.
rddhop_path_data_t *data = key_get_data (tree, key);
// append copy of rddhop_t to result;
data_append_hop (data, table[i].rddhop);
key_free (key);
}
return tree;
}
static inline rddhop_table_t *
rddhop_add_empty_block (rddhop_table_t * table, int count)
{
size_t elem_size = sizeof (rddhop_table_t);
size_t old_table_size = elem_size * count;
size_t new_table_size = elem_size * (count + 1);
rddhop_table_t *new_table = (rddhop_table_t *) malloc (new_table_size);
memset (new_table, 0, new_table_size);
memcpy (new_table, table, old_table_size);
return new_table;
}
static inline int
rddhop_pub_add_empty (char * LENT prefix,
rddhop_table_t * LENT table, int count, flow_id_t * LENT fid)
{
int result = 0;
// rddhop_table_t *new_table = rddhop_add_empty_block (table, count);
result = rddhop_pub (prefix, table, count , fid);
// free (new_table);
return result;
}
static inline int
rddhop_pub_helper(char * LENT prefix,
rddhop_table_t * LENT table, int count, flow_id_t * LENT fid)
{
int result = 0;
if (count == 0)
{
result = rddhop_pub_add_empty(prefix, table, count, fid);
}
else
{
result = rddhop_pub(prefix,table,count,fid);
}
return result;
}
gboolean
traverse_reject_nulls (gpointer LENT key, gpointer LENT value, gpointer LENT data)
{
GArray *reject_array = (GArray *) data;
rddhop_path_key_t *k = (rddhop_path_key_t *) key;
rddhop_path_data_t *d = (rddhop_path_data_t *) value;
// reject if host is zero or equal to self
int i;
for (i = 0; i < d->array->len; ++i)
{
rddhop_t hop = g_array_index (d->array, rddhop_t, i);
if (hop.host == 0)
{
// reject this key
g_array_append_val (reject_array, *k);
break;
}
}
return FALSE;
}
gboolean
traverse_reject_containing_self (gpointer LENT key, gpointer LENT value, gpointer LENT data)
{
GArray *reject_array = (GArray *) data;
rddhop_path_key_t *k = (rddhop_path_key_t *) key;
rddhop_path_data_t *d = (rddhop_path_data_t *) value;
// reject if host is zero or equal to self
int i;
for (i = 0; i < d->array->len; ++i)
{
rddhop_t hop = g_array_index (d->array, rddhop_t, i);
if ( hop.host == my_node_id)
{
// reject this key
g_array_append_val (reject_array, *k);
break;
}
}
return FALSE;
}
static void
path_tree_remove_key_array (GTree * LENT tree, GArray * LENT array)
{
if (!array || !tree)
return;
int i;
for (i = 0; i < array->len; ++i)
{
rddhop_path_key_t key = g_array_index (array, rddhop_path_key_t, i);
g_tree_remove (tree, &key);
}
}
static void
rddhop_path_data_update_etx (rddhop_path_data_t * LENT data, uint32_t etx)
{
assert (data);
int i;
for (i = 0; i < data->array->len; ++i)
{
rddhop_t *hop = &g_array_index (data->array, rddhop_t, i);
hop->etx += etx;
}
}
gboolean
traverse_update_etx (gpointer LENT key, gpointer LENT value, gpointer LENT data)
{
rddhop_path_data_t *d = (rddhop_path_data_t *) value;
// update the etxs.
rddhop_path_data_update_etx (d, 1);
return FALSE;
}
// do this by traversing tree.
// add (src, etx) to array
// find etx by searching array for src
gboolean
traverse_find_src_hop (gpointer LENT key, gpointer LENT value, gpointer LENT data)
{
GArray *nexthop_array = (GArray *) data;
rddhop_path_key_t *k = (rddhop_path_key_t *) key;
rddhop_path_data_t *d = (rddhop_path_data_t *) value;
// find corresponding etx.
int i;
for (i = 0; i < d->array->len; ++i)
{
rddhop_t hop = g_array_index (d->array, rddhop_t, i);
if (hop.host == k->src)
{
g_array_append_val (nexthop_array, hop);
break;
}
}
return FALSE;
}
uint32_t
rddhop_etx_if_next_hop(rddhop_t hop)
{
return 1 + hop.etx;
}
// asserts array is not empty
// DON'T GIVE ME AN EMPTY ARRAY!
static rddhop_t
rddhop_array_min_hop (GArray * LENT array)
{
assert (array->len > 0);
rddhop_t minhop = g_array_index (array, rddhop_t, 0);
int i;
for (i = 0; i < array->len; ++i)
{
rddhop_t hop = g_array_index (array, rddhop_t, i);
if (rddhop_etx_if_next_hop(hop)
< rddhop_etx_if_next_hop(minhop))
{
minhop = hop;
}
}
return minhop;
}
static void
rddhop_path_data_append_self (rddhop_t nexthop, rddhop_path_data_t * LENT data)
{
assert (data); assert(data->array);
rddhop_t hop = {
hop_number:data->array->len,
host:my_node_id,
etx: rddhop_etx_if_next_hop(nexthop)
};
g_array_append_val (data->array, hop);
}
static rddhop_path_data_t *
rddhop_path_data_copy (rddhop_path_data_t * LENT data)
{
assert(data);
rddhop_path_data_t *newdata = data_new ();
int i;
for (i = 0; i < data->array->len; ++i)
{
rddhop_t hop = g_array_index (data->array, rddhop_t, i);
g_array_append_val (newdata->array, hop);
}
return newdata;
}
static gint
rddhop_compare_etx_low_first(gconstpointer LENT a, gconstpointer LENT b)
{
// return a - b
rddhop_t * aa = (rddhop_t *)a;
rddhop_t * bb = (rddhop_t *)b;
return (aa->etx - bb->etx);
}
static gint
rddhop_compare_etx_high_first(gconstpointer LENT a, gconstpointer LENT b)
{
// return a - b
rddhop_t * aa = (rddhop_t *)a;
rddhop_t * bb = (rddhop_t *)b;
return (bb->etx - aa->etx);
}
GArray *
rddhop_table_to_GArray(rddhop_table_t * LENT table, int count)
{
GArray * array = g_array_new(0,1,sizeof(rddhop_t));
int i;
for (i = 0; i < count; ++i)
{
g_array_append_val(array,table[i].rddhop);
}
return array;
}
static void
update_etx_arrays(sinkgradient_state_t * LENT state)
{
GTree * tree = rddhop_table_to_path_tree(state->all_rddhops,
state->all_rddhops_count);
//// create an array of rddhop_t nexthops, with etx
// reflecting etx to their sink.
GArray * neighbor_array = g_array_new(0,1,sizeof(rddhop_t));
// create reject_array
GArray * reject_array = g_array_new(0,1,sizeof(rddhop_t));
g_tree_foreach(tree, traverse_reject_nulls, reject_array);
// remove array from tree
path_tree_remove_key_array(tree, reject_array);
g_tree_foreach(tree, traverse_find_src_hop, neighbor_array);
// Copy neighbor_array
GArray * array = g_array_new(0,1,sizeof(rddhop_t));
int i;
for (i = 0; i < neighbor_array->len; ++i)
g_array_append_val(array, g_array_index(neighbor_array,rddhop_t,i) );
g_array_sort(array, rddhop_compare_etx_low_first);
int myetx = 0;
int etxfound = 0;
for (i = 0; i < array->len; ++i)
{
rddhop_t hop = g_array_index(array, rddhop_t, i);
if (hop.host == my_node_id) {
myetx = state->myetx = hop.etx;
etxfound = 1;
}
if ((etxfound) && (hop.etx != myetx)) // prune the rest
{
g_array_remove_range(array, i, array->len - i);
break;
}
}
if (state->etx_lower) free(state->etx_lower);
state->etx_lower_count = array->len;
state->etx_lower = (rddhop_t*)malloc(sizeof(rddhop_t)*state->etx_lower_count);
memset(state->etx_lower,0,sizeof(rddhop_t) * state->etx_lower_count);
for (i = 0; i < array->len; ++i)
{
memcpy(&state->etx_lower[i], &g_array_index(array,rddhop_t,i), sizeof(rddhop_t));
}
g_array_free(array,1);
//////////////// copy array again
array = g_array_new(0,1,sizeof(rddhop_t));
for (i = 0; i < neighbor_array->len; ++i)
g_array_append_val(array, g_array_index(neighbor_array,rddhop_t,i) );
g_array_sort(array,rddhop_compare_etx_high_first);
myetx = 0;
etxfound = 0;
for (i = 0; i < array->len; ++i)
{
rddhop_t hop = g_array_index(array, rddhop_t, i);
if (hop.host == my_node_id) {
myetx = state->myetx = hop.etx;
etxfound = 1;
}
if ((etxfound) && (hop.etx != myetx)) // prune the rest
{
g_array_remove_range(array, i, array->len - i);
break;
}
}
if (state->etx_higher) free(state->etx_higher);
state->etx_higher_count = array->len;
state->etx_higher = (rddhop_t*)malloc(sizeof(rddhop_t)*state->etx_higher_count);
memset(state->etx_higher,0,sizeof(rddhop_t)*state->etx_higher_count);
for (i = 0; i < array->len; ++i)
{
memcpy(&state->etx_higher[i], &g_array_index(array,rddhop_t,i), sizeof(rddhop_t));
}
g_array_free(array, 1);
g_array_free(neighbor_array,1);
g_array_free(reject_array, 1);
g_tree_destroy(tree);
}
int
up_bin(status_context_t * LENT info, buf_t * LENT buf)
{
sinkgradient_state_t * state= (sinkgradient_state_t*)sd_data(info);
update_etx_arrays(state);
bufcpy(buf, state->etx_higher, state->etx_higher_count * sizeof(rddhop_t));
return STATUS_MSG_COMPLETE;
}
int
down_bin(status_context_t * LENT info, buf_t * LENT buf)
{
sinkgradient_state_t * state= (sinkgradient_state_t*)sd_data(info);
update_etx_arrays(state);
bufcpy(buf, state->etx_lower, state->etx_lower_count * sizeof(rddhop_t));
return STATUS_MSG_COMPLETE;
}
int
up_ascii(status_context_t * LENT info, buf_t * LENT buf)
{
sinkgradient_state_t * state= (sinkgradient_state_t*)sd_data(info);
update_etx_arrays(state);
bufprintf(buf, "My ETX: %d: Those that are HIGHER...\n", state->myetx);
int i;
for (i = 0; i < state->etx_higher_count; ++i)
{
bufprintf(buf, "host: %s, etx %d\n", print_if_id(state->etx_higher[i].host), state->etx_higher[i].etx);
}
return STATUS_MSG_COMPLETE;
}
int
down_ascii(status_context_t * LENT info, buf_t * LENT buf)
{
sinkgradient_state_t * state= (sinkgradient_state_t*)sd_data(info);
update_etx_arrays(state);
bufprintf(buf, "My ETX: %d: Those that are LOWER...\n", state->myetx);
int i;
for (i = 0; i < state->etx_lower_count; ++i)
{
bufprintf(buf, "host: %s, etx %d\n", print_if_id(state->etx_lower[i].host), state->etx_lower[i].etx);
}
return STATUS_MSG_COMPLETE;
}
// returns allocated memory
// modifies tree
// assumes that i'm not the sink
// throws alway all paths with my node id since they're considered loops
// returns path of length zero if there's no match.
static rddhop_path_data_t *
path_tree_create_best_path (GTree * LENT tree)
{
assert (tree);
rddhop_path_data_t *bestpath = NULL;
GArray * nexthop_array = g_array_new (0, 1, sizeof(rddhop_t));
// create reject_array (containing keys to remove);
GArray *reject_array = g_array_new (0, 1, sizeof(rddhop_path_key_t));
g_tree_foreach (tree, traverse_reject_nulls, reject_array);
g_tree_foreach (tree, traverse_reject_containing_self, reject_array);
// remove array from tree
path_tree_remove_key_array (tree, reject_array);
int found = 0;
if (g_tree_nnodes(tree) > 0)
{
// update etx
// g_tree_foreach (tree, traverse_update_etx, NULL);
g_tree_foreach (tree, traverse_find_src_hop, nexthop_array);
if (nexthop_array->len > 0) {
found = 1;
// find the min (src,etx)
rddhop_t minhop = rddhop_array_min_hop (nexthop_array);
// lookup src in tree and get array
rddhop_path_key_t minkey = key_copy_src (minhop.host);
rddhop_path_data_t *mindata = g_tree_lookup (tree, &minkey);
rddhop_path_data_append_self (minhop, mindata);
// copy this array;
bestpath = rddhop_path_data_copy (mindata);
}
}
if (found == 0)
{
// this is an empty path. I don't know how to reach the sink
bestpath = data_new();
}
// free up memory
g_array_free (reject_array, 1);
g_array_free (nexthop_array, 1);
return bestpath;
}
static rddhop_table_t *
rddhop_path_data_to_table (const rddhop_path_data_t * LENT data)
{
assert(data);
GArray LENT * array = data->array;
assert(array);
elog(LOG_INFO," ");
int count = array->len;
rddhop_table_t * table = (rddhop_table_t *)malloc(sizeof(rddhop_table_t)*count);
memset(table,0,sizeof(rddhop_table_t)*count);
int i;
for (i = 0; i < count; ++i)
{
table[i].rddhop = g_array_index(array,rddhop_t, i);
}
return table;
}
static void
rddhop_path_data_free(rddhop_path_data_t * data)
{
data_destroy_notify(data);
}
static rddhop_path_data_t *
sink_path(sinkgradient_state_t * LENT state)
{
rddhop_path_data_t * data = data_new();
rddhop_t hop = { hop_number : 0, host : my_node_id, etx : 0 };
data_append_hop(data, hop);
return data;
}
static int
sinkgradient_timer (gpointer LENT data, int interval, g_event_t * ev)
{
elog (LOG_NOTICE, " ");
sinkgradient_state_t *state = (sinkgradient_state_t *) data;
rddhop_path_data_t * best_path = NULL;
GTree * tree = NULL;
if (state->sinkstatus)
{
// create a rddhop_path_data_t that just has me as sink
elog(LOG_NOTICE,"I'm a sink, so pushing sink_path.");
best_path = sink_path(state);
}
else
{
elog(LOG_NOTICE,"Not a sink. So I have to calculate...");
// copy rddhop_table_t into a list of lists
tree = rddhop_table_to_path_tree (state->all_rddhops,
state->all_rddhops_count);
// pass tree to something that returns an array we should publish
best_path = path_tree_create_best_path (tree);
}
// convert path to table
rddhop_table_t *table = rddhop_path_data_to_table (best_path);
int count = best_path->array->len;
// publish
if (rddhop_pub_helper (use_prefix, table, count, &state->fid))
{
elog (LOG_CRIT, "Unable to publish rddhop_table");
}
buf_t *buf = buf_new ();
bufprintf (buf, "Publishing the following RDDHOP_TABLE...");
rddhop_table_unparse (buf, table, count);
elog (LOG_NOTICE, "%s", buf->buf);
buf_free (buf);
free (table);
rddhop_path_data_free(best_path);
if (tree)
g_tree_destroy(tree);
g_status_dev_notify(state->status_dev_context);
g_status_dev_notify(state->status_dev_up_bin_context);
g_status_dev_notify(state->status_dev_down_bin_context);
g_status_dev_notify(state->status_dev_up_ascii_context);
g_status_dev_notify(state->status_dev_down_ascii_context);
return TIMER_RENEW_MS(TIMER_DELAY(count));
}
static void
init_timer (sinkgradient_state_t * LENT state)
{
if (g_timer_add (SINKGRADIENT_TIMER, sinkgradient_timer, state, NULL, NULL) < 0)
{
elog (LOG_CRIT, "Unable to create timer event.");
exit (1);
}
}
void
sinkgradient_shutdown (void * LENT data)
{
elog (LOG_NOTICE, "sinkgradient shutting down.");
exit (0);
}
static int
status_handler(void*new_buf, size_t size, void * LENT data)
{
elog(LOG_INFO," ");
if (size == 0)
{
elog(LOG_ERR,"Size is 0!");
goto final;
}
if (!new_buf)
{
elog(LOG_ERR,"new_buf is NULL");
goto final;
}
sinkgradient_state_t * state = (sinkgradient_state_t *)data;
int new_status = atoi((char*)new_buf);
if ((new_status == 0)||(new_status == 1))
{
if (new_status!=state->sinkstatus)
{
elog(LOG_NOTICE,"Changing status to %d", new_status);
state->sinkstatus = new_status;
}
else
{
elog(LOG_NOTICE,"Status is the same.");
}
}
else
{
elog(LOG_ERR,"Unrecognized status");
goto final;
}
final:
if(new_buf)
free(new_buf);
return EVENT_RENEW;
}
static void
status_init (sinkgradient_state_t * LENT state)
{
status_client_opts_t opts = {
devname:sim_path("/dev/block/sinkstatus"),
handler:status_handler,
private_data:state
};
if (g_status_client_full(&opts, &state->status_context) < 0)
{
elog (LOG_ERR,"Unable to open status client to %s", opts.devname);
exit(1);
}
}
void
status_dev_init(sinkgradient_state_t * LENT state, char * LENT dev_name, status_response_cb_t printable_cb, status_response_cb_t binary_cb, status_context_t ** context)
{
status_dev_opts_t status_opts = {
device:{
devname: sim_path (dev_name), device_info:state}
, printable:printable_cb,
binary: binary_cb
};
if (g_status_dev (&status_opts, context) < 0)
{
elog (LOG_CRIT, "Can't create status device %s: %m",
status_opts.device.devname);
exit (1);
}
g_status_dev_notify(*context);
}
int
main (int argc, char ** LENT argv)
{
misc_init (&argc, argv, CVSTAG);
emrun_opts_t emrun_opts = {
shutdown:sinkgradient_shutdown
};
emrun_init (&emrun_opts);
char * uses = link_parse_uses(&argc, argv, NULL);
if (argc > 2)
usage (argv[0]);
use_mhsync = misc_parse_out_switch(&argc, argv, "use_mhsync", '\0');
if (use_mhsync) {
use_prefix = SSYNC_MULTIHOP_PREFIX;
elog (LOG_CRIT,"Using mhsync");
}
sinkgradient_state_t state = init_state ();
rddhop_sub (&state);
init_timer (&state);
status_init (&state);
status_dev_init(&state, "/dev/block/sinkgradient", sinkgradient_printable,
NULL, &state.status_dev_context);
status_dev_init(&state,"/dev/block/up_bin", NULL, up_bin,&state.status_dev_up_bin_context);
status_dev_init(&state,"/dev/block/down_bin", NULL, down_bin,&state.status_dev_down_bin_context);
status_dev_init(&state,"/dev/block/up_ascii", up_ascii,NULL, &state.status_dev_up_ascii_context);
status_dev_init(&state, "/dev/block/down_ascii", down_ascii, NULL, &state.status_dev_down_ascii_context);
init_link(uses, argv, &state);
/* cluster map event */
cluster_map_opts_t cl_opts = {
private_data: &state
};
if (cluster_map_new(&cl_opts, &cluster_map) < 0) {
elog(LOG_CRIT, "Can't connect to cluster map: %m");
exit(1);
}
g_status_dev_notify(state.status_dev_context);
g_main ();
return 1;
}
See more files for this project here