Show wl_main.c syntax highlighted
/*
* Copyright (c) 2004 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "wl_i.h"
#include <sim/libsim.h>
#include <emrun/emrun.h>
#include <link/link.h>
/*
* This callback is called every time the simulator framework tells us
* that a new simulator configuration is now active. We create new
* simulated "motenics" if we learn that the number of nodes has
* increased.
*/
static void wl_new_config(sim_config_t *conf, void *data)
{
int i;
wl_state_t *wl = (wl_state_t *)data;
elog(LOG_INFO, "got new simulator configuration");
for (i = 1; i <= conf->num_nodes; i++) {
if (wl_node_lookup(&(wl->nodes), i) == NULL)
wl_node_register(wl, i);
}
}
void wl_print_stats(wl_stats_t *stats, ssync_stats_t *stat, int node, float time)
{
fprintf(stats->stat_logfile, "%d %f %d %d %d %d %d %d %d %d %d %d %d %d %d %d\n",
node,
time,
stat->counts[0], stat->counts[1], stat->counts[2],
stat->counts[3], stat->counts[4], stat->counts[5],
stat->bytes[0], stat->bytes[1], stat->bytes[2],
stat->bytes[3], stat->bytes[4], stat->bytes[5],
stats->total_pushed,
stats->total_pushed_bytes
);
}
void wl_totalize_node_stats(wl_node_t *n, wl_stats_t *stats, float time)
{
wl_print_stats(stats, &(stats->totals), 0, time);
/* totalize */
int add = stats->totals.bytes[1];
int i;
for (i=2; i<SSYNC_STAT_FIELD_COUNT; i++)
add = (stats->totals.bytes[i] += add);
wl_print_stats(stats, &(stats->totals), -1, time);
}
int wl_do_node_stats(wl_node_t *n, wl_stats_t *stats, float time)
{
if (stats->prefix) {
wl_node_become(n);
buf_t *buf =
g_status_client_read_once(ssync_path(stats->prefix, SSYNC_SUFFIX_STATS),
1, sizeof(ssync_stats_t));
wl_node_unbecome();
if (buf)
memmove(&(stats->latest), buf->buf, sizeof(ssync_stats_t));
else {
elog(LOG_DEBUG(0), "Unable to get stats for prefix %s, node %d",
stats->prefix, n->id);
return -1;
}
}
/* add these stats into totals */
int i;
for (i=0; i<SSYNC_STAT_FIELD_COUNT; i++) {
stats->totals.bytes[i] += stats->latest.bytes[i];
stats->totals.counts[i] += stats->latest.counts[i];
}
/* print it */
wl_print_stats(stats, &(stats->latest), n->id, time);
return 0;
}
void wl_dump_traf(wl_state_t *wl)
{
wl_node_t *n;
uint64_t uptime = misc_time_elapsed_sys();
for (n=wl_nodes_top(&(wl->nodes)); n; n=wl_nodes_next(n)) {
wl_node_become(n);
buf_t *buf = g_status_client_read_once(link_name_s(wl->base_link, LINK_STATUS_SUBDEV),
1, sizeof(link_status_t));
if (buf) {
link_status_t *stat = (link_status_t *)buf->buf;
fprintf(wl->traf_logfile, "%.6f %d %d %d %d %d %d %d %d\n",
uptime / MILLION_F,
n->id, stat->MTU,
stat->packets_rx, stat->bytes_rx,
stat->packets_tx, stat->bytes_tx,
stat->errors_tx, stat->errors_rx);
}
else
elog(LOG_DEBUG(0), "Unable to get status for link %s, node %d %s: %m",
wl->base_link, n->id, link_name_s(wl->base_link, LINK_STATUS_SUBDEV));
wl_node_unbecome();
}
}
int wl_do_stats(void *data, int interval, g_event_t *ev)
{
wl_state_t *wl = (wl_state_t *)data;
wl_node_t *n;
float time = misc_time_elapsed_sys() / MILLION_F;
memset(&(wl->main_stats.totals), 0, sizeof(wl->main_stats.totals));
memset(&(wl->csync_stats.totals), 0, sizeof(wl->csync_stats.totals));
memset(&(wl->comb_stats.totals), 0, sizeof(wl->comb_stats.totals));
for (n=wl_nodes_top(&(wl->nodes)); n; n=wl_nodes_next(n)) {
if (wl_do_node_stats(n, &(wl->main_stats), time) < 0) continue;
if (wl->do_csync_stats) {
if (wl_do_node_stats(n, &(wl->csync_stats), time) < 0) continue;
/* sum across */
int i;
for (i=0; i<SSYNC_STAT_FIELD_COUNT; i++) {
wl->comb_stats.latest.bytes[i] =
wl->main_stats.latest.bytes[i] + wl->csync_stats.latest.bytes[i];
wl->comb_stats.latest.counts[i] =
wl->main_stats.latest.counts[i] + wl->csync_stats.latest.counts[i];
}
wl->comb_stats.total_pushed = wl->main_stats.total_pushed;
wl->comb_stats.total_pushed_bytes = wl->main_stats.total_pushed_bytes;
wl_do_node_stats(n, &(wl->comb_stats), time);
}
}
wl_totalize_node_stats(n, &(wl->main_stats), time);
if (wl->do_csync_stats) {
wl_totalize_node_stats(n, &(wl->csync_stats), time);
wl_totalize_node_stats(n, &(wl->comb_stats), time);
}
wl_dump_traf(wl);
return EVENT_RENEW;
}
void wl_stats_init(wl_stats_t *stats, char *prefix, char *log_name, char *name)
{
char buf[256];
sprintf(buf, "%s%s-stat", log_name, name);
stats->stat_logfile = fopen(buf, "w");
stats->prefix = prefix;
if (stats->stat_logfile == NULL) {
elog(LOG_CRIT, "Unable to open file '%s' for writing: %m", buf);
exit(1);
}
fprintf(stats->stat_logfile,
"#h id time "
"actc txc retxc nackc refrc packc "
"act tx retx nack refr pack "
"tpush tbytes\n");
}
void wl_shutdown(void *data)
{
wl_state_t *wl = (wl_state_t *)data;
elog(LOG_NOTICE, "Workload generator service closing logfiles");
wl_do_stats(wl, 0, NULL);
fclose(wl->main_stats.stat_logfile);
if (wl->do_csync_stats) {
fclose(wl->csync_stats.stat_logfile);
fclose(wl->comb_stats.stat_logfile);
}
if (wl->traf_logfile)
fclose(wl->traf_logfile);
if (!wl->stats_only) {
wl_scan_for_complete(wl, 1);
fclose(wl->logfile);
fclose(wl->cdf_logfile);
}
elog(LOG_NOTICE, "Workload generator service shutting down...");
/* halt simulation */
if (wl->halt_sim)
printf_to_file(EMSIM_EMRUN_COMMAND_DEVNAME, "halt workload complete!");
exit(0);
}
static wl_state_t state = {
interval: DEFAULT_INTERVAL,
max_keys: DEFAULT_MAX_KEYS,
base_link: "mote0",
prefix: SSYNC_DEFAULT_PREFIX,
typename: WL_TYPENAME,
hops: DEFAULT_HOPS,
refresh: DEFAULT_REFRESH,
entry_size: DEFAULT_VALUE_LEN,
};
void wl_force_shutdown()
{
wl_shutdown(&state);
exit(0);
}
/*
* control device
*/
static
int wl_control_usage(status_context_t *info, buf_t *buf)
{
bufprintf(buf, "Workload control device.. echo halt to stop workload\n");
return STATUS_MSG_COMPLETE;
}
static
int wl_control_cmd(status_context_t *ctx, char *command, size_t buf_size)
{
if (strncmp(command, "halt", 4) == 0)
wl_force_shutdown();
return buf_size;
}
int main(int argc, char **argv)
{
char *log_name = NULL;
char *load = NULL;
char *base_link;
char *prefix;
char *typename;
/* emrun will trigger this callback to run on shutdown */
emrun_opts_t emrun_opts = {
shutdown: wl_shutdown, /* this function implements our shutdown handler */
data: &state /* this pointer will be passed to our shutdown handler */
};
/* generic init */
misc_init(&argc, argv, CVSTAG);
/* init state */
state.usage_buf = buf_new();
/* parse command-line arguments */
bufprintf(state.usage_buf,
" --logfile <path>: [REQUIRED] specifies the path for the logfile\n"
" --interval <ms>: average interval of new data generation, per node, default %d\n"
" --hops <count>: number of hops to send, default %d\n"
" --max-keys <max>: maximum number of keys pushed into system, default %d\n"
" --refresh <ms>: refresh to use for items \n"
" --halt: halt sim after workload completes \n"
" --link <link>: base link to use for traffic counting (default '%s')\n"
" --prefix <prefix>: base prefix to use for ssync access (default '%s')\n"
" --delay <secs startup delay>\n"
" --csync-stats: record clustersync stats as well\n"
" --stats-only: no plugin.. just record usage stats\n"
" --local: hook to a local device not a simulator\n"
" --load <plugin>: [REQUIRED] specifies the workload module to use:\n",
DEFAULT_INTERVAL, DEFAULT_HOPS, DEFAULT_MAX_KEYS, state.base_link, state.prefix);
/* parse standard options */
log_name = misc_parse_out_option(&argc, argv, "logfile", 0);
misc_parse_option_as_int(&argc, argv, "interval", 0, &(state.interval));
misc_parse_option_as_int(&argc, argv, "max-keys", 0, &(state.max_keys));
misc_parse_option_as_int(&argc, argv, "entry-size", 0, &(state.entry_size));
misc_parse_option_as_int(&argc, argv, "refresh", 0, &(state.refresh));
misc_parse_option_as_int(&argc, argv, "delay", 0, &(state.delay));
misc_parse_option_as_int(&argc, argv, "hops", 0, &(state.hops));
misc_parse_option_as_int(&argc, argv, "iface", 0, &(state.iface));
state.do_csync_stats = misc_parse_out_switch(&argc, argv, "csync-stats", 0);
state.stats_only = misc_parse_out_switch(&argc, argv, "stats-only", 0);
state.local = misc_parse_out_switch(&argc, argv, "local", 0);
state.halt_sim = 1;
if ((base_link = misc_parse_out_option(&argc, argv, "link", 0)))
state.base_link = base_link;
if ((prefix = misc_parse_out_option(&argc, argv, "prefix", 0)))
state.prefix = prefix;
if ((typename = misc_parse_out_option(&argc, argv, "typename", 0)))
state.typename = typename;
if (!state.stats_only) {
/* select the workload */
load = misc_parse_out_option(&argc, argv, "load", 0);
/* add new plugin inits here */
wl_plugin_random_rate(&state, load, &argc, argv);
wl_plugin_scenario_file(&state, load, &argc, argv);
}
if (log_name) {
char buf[256];
if (!state.stats_only) {
state.logfile = fopen(log_name, "w");
if (state.logfile == NULL) {
elog(LOG_CRIT, "Unable to open file '%s' for writing: %m", log_name);
exit(1);
}
sprintf(buf, "%s-cdf", log_name);
state.cdf_logfile = fopen(buf, "w");
if (state.cdf_logfile == NULL) {
elog(LOG_CRIT, "Unable to open file '%s' for writing: %m", buf);
exit(1);
}
}
sprintf(buf, "%s-traf", log_name);
state.traf_logfile = fopen(buf, "w");
if (state.traf_logfile == NULL) {
elog(LOG_CRIT, "Unable to open file '%s' for writing: %m", buf);
exit(1);
}
fprintf(state.traf_logfile, "#h time ID MTU prx brx ptx btx etx erx\n");
sprintf(buf, "%s-stat", log_name);
wl_stats_init(&(state.main_stats), state.prefix, log_name, "");
if (state.do_csync_stats) {
wl_stats_init(&(state.csync_stats), "csync", log_name, "-csync");
wl_stats_init(&(state.comb_stats), NULL, log_name, "-comb");
}
}
else {
elog(LOG_CRIT, "--logfile argument required!");
fprintf(stderr, "%s\n", state.usage_buf->buf);
exit(1);
}
if (argc > 1) {
int i;
fprintf(stderr, "Extra arguments: ");
for (i=1; i<argc; i++) {
fprintf(stderr, "\"%s\" ", argv[i]);
}
fprintf(stderr, "\n\n");
fprintf(stderr, "%s\n", state.usage_buf->buf);
exit(1);
}
status_dev_opts_t t_opts = {
device: {
devname: sim_path("/dev/workload_control"),
device_info: &state
},
printable: wl_control_usage,
write: wl_control_cmd
};
if (g_status_dev(&t_opts, NULL) < 0) {
elog(LOG_WARNING, "Unable to create control device %s: %m",
t_opts.device.devname);
}
if (state.local) {
if (wl_node_lookup(&(state.nodes), my_node_id) == NULL)
wl_node_register(&state, my_node_id);
}
else {
/* Set up event to read the simulator configuration. This calls the
* callback immediately with the initial simulator configuration. */
sim_opts_t sim_opts = {
new_config: wl_new_config,
data: &state
};
if (g_sim_config(&sim_opts) < 0) {
elog(LOG_ALERT, "can't get simulator configuration: %m");
exit(1);
}
}
/* set stats timer */
g_timer_add(WL_STATS_INTERVAL, wl_do_stats, &state, NULL, NULL);
/* we're ready to serve */
emrun_init(&emrun_opts);
g_main();
/* this should never be reached */
elog(LOG_ALERT, "event system terminated abnormally");
return 1;
}
See more files for this project here