Show state_info.c syntax highlighted
char bundleserver_state_info_c_cvsid [] = "$Id:";
#include <stdlib.h>
#include "emrun/emrun.h"
#include "libmisc/misc.h"
#include "libmisc/misc_init.h"
#include "libdev/query_dev.h"
#include "fusd/fusd.h"
#include "link/link.h"
#include "state_info.h"
static void
usage (char * name);
// Returns 1 if a > b, else returns 0
static int
bs_seq_no_greater(bundleserver_seq_no_t a,
bundleserver_seq_no_t b);
static bundleserver_state_state_t
bs_state_state_init ();
static char *
get_uses_arg (int argc, char * argv[]);
static int
find_sink_node_id (gconstpointer list_data,
gconstpointer sink_pkt);
static int
find_ack_array_node_id (gconstpointer list_data,
gconstpointer ack_array_pkt);
static void
bundleserver_state_sink_link_receive_for_each
(bundleserver_state_state_t * state,
bundleserver_state_sink_pkt_t * sink_pkt);
static void
bundleserver_state_ack_array_link_receive (lu_context_t * lu,
link_pkt_t * pkt,
ssize_t data_len);
static void
bundleserver_state_ack_array_link_receive_update
(bundleserver_state_state_t * state,
bundleserver_state_ack_array_pkt_t * array_pkt,
ssize_t pkt_len);
static void
bundleserver_state_sink_link_receive (lu_context_t * lu,
link_pkt_t * pkt,
ssize_t data_len);
static int
bundleserver_state_link_receive (lu_context_t * lu,
link_pkt_t * pkt,
ssize_t data_len);
static void
setup_link (char * uses,
bundleserver_state_state_t * state,
lu_pkt_receive_cb_t receive_cb);
static void
setup_query_dev (char * device_name,
bundleserver_state_state_t * state,
query_process_cb_t process_cb,
query_context_t ** query_context_ref);
static int
bundleserver_state_sink_process (query_context_t * q,
char * command,
size_t buf_size,
buf_t * print,
buf_t * bin)
{
bundleserver_state_state_t * state
= (bundleserver_state_state_t *)qdev_data(q);
int find_node_id = 0;
int want_all_ids = 1;
// Parse out the ID that they want.
// command is either empty or a string containing a number.
if ((command != NULL)
&& (sscanf(command,"%d",&find_node_id) == 1))
{
want_all_ids = 0;
}
// TODO: pay attention to want_all_ids.
// For now, just assume want_all_ids is 1;
if (want_all_ids != 1)
{
elog (LOG_CRIT,"Returning a specific ID is not yet implemented.");
exit(1);
}
// Assume they want binary
// Copy over the sink list.
// Need to copy junk in to bin using bufcpy(buf, src, length)
// first get the sink list
int i = 0;
if (state->sink_list == NULL)
{
}
for (i =
return QUERY_DONE;
}
int
main (int argc, char * argv[]);
///////////////////////////////////////////////////////////
static void
usage (char * name)
{
misc_print_usage (name, "", "");
exit (1);
}
// Returns 1 if a > b, else returns 0
static int
bs_seq_no_greater(bundleserver_seq_no_t a,
bundleserver_seq_no_t b)
{
if (a.sec > b.sec) return 1;
if (a.sec < b. sec) return 0;
if (a.usec > b.usec) return 1;
if (a.usec < b.usec) return 0;
if (a.increment > b.increment) return 1;
return 0;
}
static bundleserver_state_state_t
bs_state_state_init ()
{
bundleserver_state_state_t state =
{
sink_list: NULL,
ack_array_list: NULL,
sink_query_dev_ref: NULL,
ack_array_query_dev_ref: NULL,
};
return state;
}
static char *
get_uses_arg (int argc, char * argv[])
{
char * uses = link_parse_uses (&argc, argv, NULL);
if (uses == NULL) {
elog (LOG_CRIT, "PLease specify a link to use!");
usage (argv[0]);
}
return uses;
}
static int
find_sink_node_id (gconstpointer list_data,
gconstpointer sink_pkt)
{
bundleserver_state_sink_pkt_t * my_list_data
= (bundleserver_state_sink_pkt_t *) list_data;
bundleserver_state_sink_pkt_t * my_sink_pkt
= (bundleserver_state_sink_pkt_t *) sink_pkt;
if (my_list_data->node_id == my_sink_pkt->node_id)
return 0;
else
return 1;
}
static int
find_ack_array_node_id (gconstpointer list_data,
gconstpointer ack_array_pkt)
{
bundleserver_state_ack_array_pkt_t * my_list_data
= (bundleserver_state_ack_array_pkt_t *) list_data;
bundleserver_state_ack_array_pkt_t * my_ack_array_pkt
= (bundleserver_state_ack_array_pkt_t *) ack_array_pkt;
if (my_list_data->node_id == my_ack_array_pkt->node_id)
return 0;
else
return 1;
}
static void
bundleserver_state_sink_link_receive_for_each
(bundleserver_state_state_t * state,
bundleserver_state_sink_pkt_t * sink_pkt)
{
// Search for entry with matching ID.
GSList * found_item = g_slist_find_custom(state->sink_list,
sink_pkt,
find_sink_node_id);
// If ID exists, update entry if seq no is greater.
if (found_item)
{
bundleserver_state_sink_pkt_t * found_sink_pkt
= (bundleserver_state_sink_pkt_t *)found_item->data;
if ( bs_seq_no_greater (sink_pkt->seq_no,
found_sink_pkt->seq_no) )
{
memcpy(found_sink_pkt,
sink_pkt,
sizeof(bundleserver_state_sink_pkt_t));
}
}
// If ID doesn't exist, create new entry.
else
{
bundleserver_state_sink_pkt_t * new_sink_pkt
= (bundleserver_state_sink_pkt_t *)malloc
(sizeof(bundleserver_state_sink_pkt_t));
memcpy(new_sink_pkt,
sink_pkt,
sizeof(bundleserver_state_sink_pkt_t));
state->sink_list = g_slist_append(state->sink_list,
new_sink_pkt);
}
}
static void
bundleserver_state_ack_array_link_receive (lu_context_t * lu,
link_pkt_t * pkt,
ssize_t data_len)
{
elog (LOG_NOTICE, " ");
bundleserver_state_state_t * state
= (bundleserver_state_state_t *) lu_data(lu);
if (data_len < sizeof(bundleserver_state_pkt_t))
{
elog (LOG_CRIT,
"link_pkt->data length is < bundleserver_state_pkt_t");
exit (1);
}
// Dig out the data.
// The data is bundleserver_ack_array_pkt_t
bundleserver_state_pkt_t * state_pkt
= (bundleserver_state_pkt_t *) pkt->data;
if (state_pkt->type != BS_STATE_PKT_TYPE_ACK_ARRAY)
{
elog (LOG_CRIT, "Type of packet is not an ack packet!!");
exit(1);
}
ssize_t state_pkt_data_len
= data_len - sizeof(bundleserver_state_pkt_t);
if ( state_pkt_data_len
< sizeof(bundleserver_state_ack_array_pkt_t))
{
elog (LOG_CRIT, "state_pkt->data length is < "
"bundleserver_state_ack_array_pkt_t");
exit(1);
}
bundleserver_state_ack_array_pkt_t * ack_array_pkt
= (bundleserver_state_ack_array_pkt_t *) state_pkt->data;
// Check that the length of the ack array makes sense.
ssize_t ack_array_data_size
= state_pkt_data_len - sizeof(bundleserver_state_ack_array_pkt_t);
div_t q = div (ack_array_data_size, sizeof(bundleserver_uid_t));
if (q.rem != 0)
{
elog (LOG_CRIT,
"Length of data is not a multiple of"
" sizeof(bundleserver_bundle_uid_t)");
exit (1);
}
// Now possibly update my entry.
bundleserver_state_ack_array_link_receive_update(state,
ack_array_pkt,
state_pkt_data_len);
}
static void
bundleserver_state_ack_array_link_receive_update
(bundleserver_state_state_t * state,
bundleserver_state_ack_array_pkt_t * array_pkt,
ssize_t pkt_len)
{
int create_and_insert = 1;
// Search for entry with matching node ID.
GSList * found_item = g_slist_find_custom(state->ack_array_list,
array_pkt,
find_ack_array_node_id);
// If ID exists, update entry if seq no is greater.
if (found_item)
{
bundleserver_state_ack_array_pkt_t * found_ack_array_pkt
= (bundleserver_state_ack_array_pkt_t *)found_item->data;
if (bs_seq_no_greater (array_pkt->seq_no, found_ack_array_pkt->seq_no))
{
// Remove and free old entry from GSList entry.
state->ack_array_list
= g_slist_delete_link (state->ack_array_list, found_item);
}
else
{
create_and_insert = 0;
}
}
if (create_and_insert)
{
bundleserver_state_ack_array_pkt_t * new_ack_array_pkt
= (bundleserver_state_ack_array_pkt_t *) malloc (pkt_len);
memcpy (new_ack_array_pkt, array_pkt, pkt_len);
state->ack_array_list = g_slist_append (state->ack_array_list,
new_ack_array_pkt);
}
}
static void
bundleserver_state_sink_link_receive (lu_context_t * lu,
link_pkt_t * pkt,
ssize_t data_len)
{
elog (LOG_NOTICE, " ");
bundleserver_state_state_t * state
= (bundleserver_state_state_t *) lu_data(lu);
// Dig out the data.
// The data is an array of sink_state data structures.
bundleserver_state_pkt_t * state_pkt
= (bundleserver_state_pkt_t *) pkt->data;
if (state_pkt->type != BS_STATE_PKT_TYPE_SINK_STATUS)
{
elog(LOG_CRIT, "Type of packet is not a state_sink packet!");
exit(1);
}
bundleserver_state_sink_pkt_t * sink_pkt
= (bundleserver_state_sink_pkt_t *) state_pkt->data;
// Check that the length of the data makes sense
int sink_data_size = data_len - sizeof(state_pkt->type);
div_t q = div (sink_data_size,
sizeof(bundleserver_state_sink_pkt_t));
if (q.rem != 0)
{
elog(LOG_CRIT,
"Length of data is not a multiple "
"of sizeof(bundleserver_state_sink_pkt_t)");
exit(1);
}
int sink_array_items = q.quot;
// For each item in the array
int i;
for (i = 0; i < sink_array_items; ++i)
{
bundleserver_state_sink_link_receive_for_each (state,
&sink_pkt[i]);
}
return;
}
// Receive packets from other nodes regarding state info they have.
static int
bundleserver_state_link_receive (lu_context_t * lu,
link_pkt_t * pkt,
ssize_t data_len)
{
elog (LOG_NOTICE, "Received pkt of length: %d", data_len);
if (data_len < 4)
{
elog (LOG_CRIT, "Packet is less then 4 bytes?.");
exit(1);
}
// Dig out the data.
bundleserver_state_pkt_t * state_pkt
= (bundleserver_state_pkt_t *) pkt->data;
switch (state_pkt->type)
{
case BS_STATE_PKT_TYPE_SINK_STATUS:
bundleserver_state_sink_link_receive(lu, pkt, data_len);
break;
case BS_STATE_PKT_TYPE_ACK_ARRAY:
bundleserver_state_ack_array_link_receive(lu, pkt, data_len);
break;
case BS_STATE_PKT_TYPE_NEIGHBOR_LIST:
default:
elog (LOG_CRIT, "Received a state_pkt of unknown type: %d",
state_pkt->type);
exit(1);
}
free(pkt);
return EVENT_RENEW;
}
static void
setup_link (char * uses,
bundleserver_state_state_t * state,
lu_pkt_receive_cb_t receive_cb)
{
lu_opts_t lu_opts =
{
opts: {
name: uses,
data: state,
pkt_type: PKT_TYPE_BUNDLESERVER_STATE
},
receive: receive_cb
};
if (lu_open (&lu_opts, NULL) < 0)
{
elog (LOG_CRIT,
"can't open %s: %m",
link_name (&lu_opts.opts, NULL));
exit (1);
}
}
// Assumes that *query_context_ref is NULL
static void
setup_query_dev (char * device_name,
bundleserver_state_state_t * state,
query_process_cb_t process_cb,
query_context_t ** query_context_ref)
{
query_dev_opts_t query_opts = {
device: {
devname: device_name,
device_info: state
},
process: process_cb
};
if (query_dev_new (&query_opts, query_context_ref) < 0)
{
elog (LOG_CRIT, "Can't create query dev %s: %m",
query_opts.device.devname);
exit (1);
}
}
int
main (int argc, char * argv[])
{
elog (LOG_INFO,"Starting up.");
misc_init (&argc, argv, CVSTAG);
bundleserver_state_state_t state
= bs_state_state_init ();
char * uses = get_uses_arg (argc, argv);
setup_link (uses, &state, bundleserver_state_link_receive);
setup_query_dev (BUNDLESERVER_STATE_SINK_QUERY_DEV,
& state,
bundleserver_state_sink_process,
& (state.sink_query_dev_ref));
/*
setup_query_dev (BUNDLESERVER_STATE_ACK_ARRAY_QUERY_DEV,
& state,
bundleserver_state_ack_array_process,
& (state.ack_array_query_dev_ref));
*/
g_main ();
elog (LOG_ALERT, "Event system terminated abnormally.");
return 1;
}
See more files for this project here