Code Search for Developers
 
 
  

broadcastblock.c from EmStar at Krugle


Show broadcastblock.c syntax highlighted

char broadcastblock_c_cvsid[] =
  "$Id: broadcastblock.c,v 1.14 2005/04/07 05:09:03 adparker Exp $";

#include "libdev/glib_dev.h"
#include "libmisc/misc.h"
#include "emrun/emrun.h"
#include "devel/block/block.h"
#include "devel/block/haveblock.h"
#include "devel/block/needblock.h"
#include "devel/state/ssync.h"
#include "link/link.h"
#include "libdev/status_client.h"
#include "devel/block/rddhop.h"
#include "devel/state/cluster_map.h"

int use_mhsync = 0;
char * use_prefix = SSYNC_CLUSTER_PREFIX;

typedef struct broadcastblock_state_s
{
  haveblock_table_t *all_haveblocks;
  int all_haveblocks_count;

  needblock_table_t *all_needblocks;
  int all_needblocks_count;

  ssync_sub_t *haveblocks_sub;
  ssync_sub_t *needblocks_sub;
  lu_context_t *link_context;
  link_pkt_t hdr;
  char *blocks_dir;
  rddhop_t * downstream;
  int downstream_count;
  status_client_context_t * status_context;
} broadcastblock_state_t;

cluster_map_t * cluster_map = NULL;


static int
needblock_sub_handler (ssync_sub_t * sub, needblock_table_t * table,
                       int count, void *data)
{
  elog (LOG_NOTICE, " ");
  broadcastblock_state_t *state = (broadcastblock_state_t *) data;
  if (!use_mhsync)
    needblock_table_convert_src_to_id(table, count, cluster_map);
    
  needblock_table_t * filtered = NULL;
  int filtered_count = 0;

  rddhop_filter_needblock_table(state->downstream,
				state->downstream_count,
				table,
				count,
				&filtered,
				&filtered_count);
  
  if (state->all_needblocks != NULL)
    free (state->all_needblocks);
  state->all_needblocks = filtered;
  state->all_needblocks_count = filtered_count;

  {
    buf_t *buf = buf_new ();
    needblock_table_unparse (buf, filtered, filtered_count);
    elog (LOG_NOTICE, "%s", buf->buf);
    buf_free (buf);
  }

  free(table);
  return EVENT_RENEW;
}


static int
haveblock_sub_handler (ssync_sub_t * sub, haveblock_table_t * table,
                       int count, void *data)
{
  elog (LOG_NOTICE, " ");
  broadcastblock_state_t *state = (broadcastblock_state_t *) data;

  if (!use_mhsync)
    haveblock_table_convert_src_to_id(table, count, cluster_map);
  
  if (state->all_haveblocks != NULL)
    free (state->all_haveblocks);
  state->all_haveblocks = table;
  state->all_haveblocks_count = count;

  {
    buf_t *buf = buf_new ();
    haveblock_table_unparse (buf, table, count);
    elog (LOG_NOTICE, "%s", buf->buf);
    buf_free (buf);
  }

  return EVENT_RENEW;
}


static broadcastblock_state_t
init_state ()
{

  buf_t *buf = buf_new ();
  bufprintf (buf, BLOCK_DIR_PATTERN, my_node_id);

  broadcastblock_state_t state = {
  all_haveblocks:NULL,
  all_haveblocks_count:0,
  all_needblocks:NULL,
  all_needblocks_count:0,
  haveblocks_sub:NULL,
  needblocks_sub:NULL,
  link_context:NULL,
  blocks_dir:strdup (buf->buf),
  hdr:{
    dst:{
    id:LINK_BROADCAST}
     ,
    type:PKT_TYPE_RDD_BLOCK},
  status_context:NULL,
  downstream:NULL,
  downstream_count:0
  };

  return state;
}


static void
usage (char *name)
{
  misc_print_usage
    (name, "-U <device>", "  --uses <device>: Specify link device to use.");
  exit (1);
}


static void
init_link (char *uses, char **argv, broadcastblock_state_t * state)
{
  if (uses == NULL)
    {
      elog (LOG_CRIT, "Please specify a link to use!");
      usage (argv[0]);
    }

  lu_opts_t lu_opts = {
  opts:{
    name:uses,
  data:state}
    ,
  };
  if (lu_open (&lu_opts, &state->link_context) < 0)
    {
      elog (LOG_CRIT, "Can't open %s: %m", link_name (&lu_opts.opts, NULL));
      exit (1);
    }
  return;
}

static void
haveblock_sub (broadcastblock_state_t * state)
{
  ssync_sub_opts_t opts = {
    reread_period: REREAD_PERIOD,
    read_refractory: REFRACTORY
  };
  
  if (haveblock_sub_open_full
      (use_prefix, haveblock_sub_handler, state, &opts, &state->haveblocks_sub) < 0)
    {
      elog (LOG_ERR, "Unable to subscribe to haveblock channel: %m");
      exit (1);
    }
  return;
}

static void
needblock_sub (broadcastblock_state_t * state)
{
  ssync_sub_opts_t opts = {
    reread_period:REREAD_PERIOD,
    read_refractory: REFRACTORY
  };
  if (needblock_sub_open_full
      (use_prefix, needblock_sub_handler, state, &opts, &state->needblocks_sub) < 0)
    {
      elog (LOG_ERR, "Unable to subscribe to needblock channel: %m");
      exit (1);
    }
  return;
}


static void
broadcastblock_send_random (broadcastblock_state_t * state,
                            GTree * candidate_tree)
{
  elog (LOG_NOTICE, " ");
  elog (LOG_NOTICE, "Number of candidates: %d",
        g_tree_nnodes (candidate_tree));
  if (g_tree_nnodes (candidate_tree) == 0)
    {
      return;
    }
  block_t *data_block = NULL;
  buf_t *packet = buf_new ();
  GArray *array = block_tree_to_array (candidate_tree);
  int count = array->len;
  // randomly select an index between [0, count-1]
  int index = (int) (1.0 * count * random () / (RAND_MAX + 1.0));
  elog (LOG_NOTICE, "Randomly selected index %d of %d", index, count - 1);
  // retrieve random block 
  block_t block = g_array_index (array, block_t, index);
  // read the file into memory.
  data_block = block_get_from_disk (state->blocks_dir, &block);
  if (data_block == NULL)
    goto done;
  bufcpy (packet, &state->hdr, sizeof (state->hdr));
  bufcpy (packet, data_block, sizeof (block_t) + data_block->length);
  // send off the packet
  {
    buf_t *buf = buf_new ();
    block_unparse (buf, data_block, "");
    elog (LOG_NOTICE, "BROADCASTING block: %s", buf->buf);
    buf_free (buf);
  }
  if (lu_send (state->link_context, (link_pkt_t *) packet->buf,
               packet->len - sizeof (state->hdr)) < 0)
    {
      elog (LOG_CRIT, "Unable to broadcast message! : %m");
    }
  elog (LOG_NOTICE, "SUCCESSFUL BROADCAST");
done:
  if (data_block)
    free (data_block);
  buf_free (packet);
  g_array_free (array, 1);
  return;
}


static int
broadcastblock_timer_cb (gpointer data, int interval, g_event_t * ev)
{
  elog (LOG_NOTICE, " ");
  broadcastblock_state_t *state = (broadcastblock_state_t *) data;
  if (state->all_needblocks_count == 0)
    {
      return EVENT_RENEW;
    }
  
  GTree *haveblock_local_tree =
    haveblock_table_to_GTreeFull (state->all_haveblocks,
                                  state->all_haveblocks_count,
                                  ONLY_LOCAL);
  GTree *haveblock_local_notneeded_tree =
    haveblock_table_to_GTreeFull (state->all_haveblocks,
                                  state->all_haveblocks_count,
                                  ONLY_LOCAL);
  GArray *needblock_nonlocal_array =
    needblock_table_to_GArray (state->all_needblocks,
                               state->all_needblocks_count,
                               ONLY_NONLOCAL);
  block_remove_array_from_tree (needblock_nonlocal_array,
                                haveblock_local_notneeded_tree);
  // haveblock_local_notneeded now contains blocks that no one needs.
  GArray *haveblock_notneeded_array =
    block_tree_to_array (haveblock_local_notneeded_tree);
  block_remove_array_from_tree (haveblock_notneeded_array,
                                haveblock_local_tree);
  // now haveblock_local_tree contains the blocks that people need.
  // just need to remove empty block...
  block_remove_empty_from_tree(haveblock_local_tree);
  broadcastblock_send_random (state, haveblock_local_tree);
  g_tree_destroy (haveblock_local_tree);
  g_tree_destroy (haveblock_local_notneeded_tree);
  g_array_free (needblock_nonlocal_array, 1);
  g_array_free (haveblock_notneeded_array, 1);
  return EVENT_RENEW;
}

static void
init_timer (broadcastblock_state_t * state)
{
  //  set up timer for broadcasting
  if (g_timer_add (BROADCAST_TIMER, broadcastblock_timer_cb, state, NULL, NULL) < 0)
    {
      elog (LOG_CRIT, "Unable to create timer event.");
      exit (1);
    }
}


static int
status_handler(void*new_buf, size_t size, void * data)
{
  elog(LOG_INFO," ");
  broadcastblock_state_t * state = (broadcastblock_state_t*)data;
  if (state->downstream)
    free(state->downstream);
  state->downstream = new_buf;
  state->downstream_count = size / (sizeof(rddhop_t));

  if ((size > 0) && (state->downstream_count == 0))
  {
    elog (LOG_ERR, "Somekind of division problem...count:%d = size:%d / rddhop_size: %d",state->downstream_count,  size, sizeof(rddhop_t));
    exit(1);
  }
  return EVENT_RENEW;
}


static void
status_init (broadcastblock_state_t * state)
{
  status_client_opts_t opts = {
    devname:sim_path("/dev/block/down_bin"),
    handler:status_handler,
    private_data:state
  };
  if (g_status_client_full(&opts, &state->status_context) < 0)
    {
      elog (LOG_ERR,"Unable to open status client to %s", opts.devname);
      exit(1);
    }
}


int
main (int argc, char **argv)
{

  misc_init (&argc, argv, CVSTAG);
  {                             // EmRun will trigger this callback on shutdown
    emrun_opts_t emrun_opts = {
    shutdown:block_generic_shutdown,
                                // shutdown handler
    data:"broadcastblock"           // pointer passed to shutdown handler
    };
    emrun_init (&emrun_opts);
  }
  char *uses = link_parse_uses (&argc, argv, NULL);
  elog (LOG_NOTICE, " ");
  if (argc > 2)
    {
      usage (argv[0]);
    }
  use_mhsync = misc_parse_out_switch(&argc, argv, "use_mhsync", '\0');
  if (use_mhsync)
    {
      elog (LOG_CRIT, "Using mhsync");
      use_prefix = SSYNC_MULTIHOP_PREFIX;
    }
  
  broadcastblock_state_t state = init_state ();
  haveblock_sub (&state);
  needblock_sub (&state);
  init_link (uses, argv, &state);
  init_timer (&state);
  status_init(&state);

    
  /* cluster map event */
  cluster_map_opts_t cl_opts = {
    private_data: &state
   };
  if (cluster_map_new(&cl_opts, &cluster_map) < 0) {
    elog(LOG_CRIT, "Can't connect to cluster map: %m");
    exit(1);
  }

  g_main ();
  return 1;
}




See more files for this project here

EmStar

EmStar is a software system for developing and deploying wireless sensor networks involving Linux-based platforms. As the wireless sensor network community has attempted to deploy more complex designs---large-scale, long-lived systems that need self-organization and adaptivity---a number of difficult software design issues have arisen. Advances in software design have not kept pace with the capabilities of hardware. This is because designing for an adaptive, efficient, and useful sensor network has turned out to be surprisingly complex and difficult. EmStar is a Linux-based software framework, whose goal is to dramatically reduce this complexity, enabling work to be shared and reused, and simplifying and speeding the design of new sensor network applications.

Project homepage: http://cvs.cens.ucla.edu/emstar/
Programming language(s): C,Shell Script
License: other

  broadcastblock.c