Show wl_structs.c syntax highlighted
/*
* Copyright (c) 2004 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "wl_i.h"
#include <emrun/emsim.h>
QUEUE_FUNCTION_INSTANTIATIONS(wl_nodes,_,list,struct wl_node,struct wl_node_list);
QUEUE_FUNCTION_INSTANTIATIONS(wl_pending,pending,list,struct pending_update,struct pending_list);
QUEUE_FUNCTION_INSTANTIATIONS(wl_pending_node,per_node,updates,struct pending_update,struct wl_node);
/*
* transaction records
*/
pending_update_t *wl_pending_create(wl_state_t *wl)
{
pending_update_t *np = g_new0(pending_update_t,1);
wl_node_t *n;
int i;
np->arrivals = g_new0(arrival_t,wl_nodes_qlen(&(wl->nodes))+1);
for (i=0,n=wl_nodes_top(&(wl->nodes)); n; i++,n=wl_nodes_next(n))
np->arrivals[i].node = n;
wl_pending_push(&(wl->pending), np);
return np;
}
/* true if all node entries have been marked received */
int wl_pending_completed(pending_update_t *p)
{
int i;
for (i=0; p->arrivals[i].node; i++)
if (p->arrivals[i].latency == 0) return 0;
return 1;
}
int wl_pending_lookup_arrival(pending_update_t *p, node_id_t id)
{
int i;
for (i=0; p->arrivals[i].node; i++)
if (p->arrivals[i].node->id == id) return i;
return -1;
}
/*
* node records
*/
wl_node_t *wl_node_create(wl_state_t *wl, node_id_t id)
{
wl_node_t *n = g_new0(wl_node_t,1);
n->id = id;
n->parent = wl;
wl_nodes_push(&(wl->nodes), n);
return n;
}
wl_node_t *wl_node_lookup(wl_node_list_t *list, node_id_t id)
{
wl_node_t *n;
for (n=wl_nodes_top(list); n; n=wl_nodes_next(n))
if (n->id == id)
return n;
return NULL;
}
void wl_node_become(wl_node_t *n)
{
sim_become_node(n->id);
}
void wl_node_unbecome()
{
sim_restore_node();
}
/* scan for completed records, dump everything if we're shutting down */
void wl_scan_for_complete(wl_state_t *wl, int shutdown)
{
pending_update_t *ptr;
int64_t elapsed64 = misc_time_elapsed_sys();
/* OK, now scan again for completed records */
for (ptr=wl_pending_top(&(wl->pending)); ptr; ) {
int i;
wl_node_t *n_ptr;
pending_update_t *tmp = ptr;
ptr=wl_pending_next(ptr);
if (!shutdown)
for (i=0; tmp->arrivals[i].node; i++) {
if (tmp->arrivals[i].removal_latency == 0)
goto next_complete;
}
else {
for (i=0; tmp->arrivals[i].node; i++) {
if (tmp->arrivals[i].removal_latency == 0) {
if (tmp->removal_time == 0)
tmp->arrivals[i].removal_latency = -3;
else
tmp->arrivals[i].removal_latency = -2;
}
if (tmp->arrivals[i].latency == 0)
tmp->arrivals[i].latency = -2;
}
if (tmp->removal_time == 0)
tmp->removal_time = elapsed64;
}
/* ok, dump it */
fprintf(wl->logfile, "%.3f %.3f %d %d ",
tmp->admission_time / MILLION_F,
tmp->removal_time / MILLION_F,
tmp->source->id, tmp->app_seq);
for (n_ptr = wl_nodes_top(&(wl->nodes)); n_ptr;
n_ptr = wl_nodes_next(n_ptr)) {
int index = wl_pending_lookup_arrival(tmp, n_ptr->id);
if (index < 0)
fprintf(wl->logfile, "-4 -4 ");
else {
if (tmp->arrivals[index].latency < 0)
fprintf(wl->logfile, "%d ", tmp->arrivals[index].latency);
else {
fprintf(wl->logfile, "%.3f ",
tmp->arrivals[index].latency / 1000.0);
fprintf(wl->cdf_logfile, "%d %f\n", tmp->arrivals[index].hops_away,
tmp->arrivals[index].latency / 1000.0);
}
if (tmp->arrivals[index].removal_latency < 0)
fprintf(wl->logfile, "%d ", tmp->arrivals[index].removal_latency);
else {
fprintf(wl->logfile, "%.3f ",
tmp->arrivals[index].removal_latency / 1000.0);
fprintf(wl->cdf_logfile, "%d %f\n", tmp->arrivals[index].hops_away,
tmp->arrivals[index].removal_latency / 1000.0);
}
}
}
fprintf(wl->logfile, "\n");
fflush(wl->logfile);
elog(LOG_WARNING, "admission time was %lld, removal time %lld, now %lld",
tmp->admission_time, tmp->removal_time, elapsed64);
/* and delete it */
wl_pending_remove(&(wl->pending), tmp);
wl_pending_node_remove(tmp->source, tmp);
if (tmp->arrivals) free(tmp->arrivals);
if (tmp->entry) free(tmp->entry);
free(tmp);
next_complete:
continue;
}
}
/*
* more protocol oriented stuff
*/
static
uint16_t wl_extract_seqno(char *entry)
{
uint16_t seq;
memmove(&seq, entry + WL_KEY_LEN, sizeof(seq));
return seq;
}
int wl_node_update(ssync_sub_t *sub, ssync_elt_t *table, int count)
{
wl_node_t *recv_n = (wl_node_t *)ssync_sub_data(sub);
wl_state_t *wl = recv_n->parent;
pending_update_t *ptr;
int64_t elapsed64 = misc_time_elapsed_sys();
ssync_elt_iter_t *iter = ssync_elt_iter_new(table, count);
/* clear all the marks for deletion detection */
for (ptr=wl_pending_top(&(wl->pending)); ptr; ptr=wl_pending_next(ptr))
ptr->mark = !ptr->deleted;
elog(LOG_DEBUG(10), "got %d records at node %d", count, recv_n->id);
/* check all the elements */
for (ssync_elt_iter_top(iter); ssync_elt_iter_valid(iter);
ssync_elt_iter_next(iter)) {
ssync_elt_t *elt = ssync_elt_iter_get_elt(iter);
uint16_t app_seqno = wl_extract_seqno(elt->data);
wl_node_t *src_n = wl_node_lookup(&(wl->nodes), elt->flow_id.src);
pending_update_t *ptr;
elog(LOG_DEBUG(10), "got record: src=%d, rcv=%d ",
elt->flow_id.src, elt->rcv_time);
if (src_n == NULL) {
elog(LOG_WARNING, "unable to locate source node %d??", elt->flow_id.src);
continue;
}
for (ptr=wl_pending_node_top(src_n); ptr; ptr=wl_pending_node_next(ptr)) {
if (ptr->app_seq == app_seqno) {
int i;
/* mark as present */
ptr->mark = 1;
/* verify data consistency */
if (ptr->entry_len != elt->data_length) {
elog(LOG_WARNING, "sent and received are different lengths: %d %d",
ptr->entry_len, elt->data_length);
}
else if (memcmp(ptr->entry, elt->data, elt->data_length) != 0) {
elog(LOG_WARNING, "sent and received are different data!");
elog_raw(LOG_WARNING, elt->data, elt->data_length);
elog_raw(LOG_WARNING, ptr->entry, elt->data_length);
}
/* check arrivals vector */
for (i=0; ptr->arrivals[i].node; i++) {
if (recv_n == ptr->arrivals[i].node) {
/* store the latency */
if (ptr->arrivals[i].latency == 0) {
elog(LOG_DEBUG(10), "new arrival seq %d", app_seqno);
ptr->arrivals[i].latency = (elapsed64 - ptr->admission_time) / 1000;
ptr->arrivals[i].hops_away = elt->hops_away;
if (ptr->arrivals[i].latency < 0)
elog(LOG_WARNING, "Negative latency %d??? %lld %lld",
ptr->arrivals[i].latency, elapsed64, ptr->admission_time);
if (ptr->arrivals[i].latency == 0)
ptr->arrivals[i].latency = 1;
}
goto next;
}
}
}
}
next:
continue;
}
/* OK, now let's scan for any new deletions */
for (ptr=wl_pending_top(&(wl->pending)); ptr; ptr=wl_pending_next(ptr)) {
if (ptr->mark == 0) {
int i;
elog(LOG_DEBUG(10), "deletion found");
/* check arrivals vector */
for (i=0; ptr->arrivals[i].node; i++) {
if (recv_n == ptr->arrivals[i].node) {
if (ptr->arrivals[i].removal_latency == 0) {
elog(LOG_DEBUG(10), "new deletion found, appseq=%d", ptr->app_seq);
if (ptr->removal_time == 0) {
elog(LOG_WARNING, "deletion detected, not deleted yet?? src=%d, appseq=%d",
ptr->source->id, ptr->app_seq);
}
ptr->arrivals[i].removal_latency = (elapsed64 - ptr->removal_time) / 1000;
if (ptr->arrivals[i].removal_latency < 0) {
elog(LOG_WARNING, "Negative removal latency %d??? %lld %lld",
ptr->arrivals[i].removal_latency,
elapsed64, ptr->removal_time);
}
if (ptr->arrivals[i].removal_latency == 0)
ptr->arrivals[i].removal_latency = 1;
break;
}
}
}
}
}
wl_scan_for_complete(wl, 0);
ssync_elt_iter_destroy(iter, 1);
return EVENT_RENEW;
}
wl_node_t *wl_node_register(wl_state_t *wl, node_id_t id)
{
wl_node_t *n = wl_node_create(wl, id);
/* connect to device */
wl_node_become(n);
ssync_sub_opts_t opts = {
prefix_name: wl->prefix,
type_name: wl->typename,
key_len: WL_KEY_LEN,
cb: wl_node_update,
data: n
};
if (ssync_sub_open(&opts, &(n->ssync_sub)) < 0) {
elog(LOG_WARNING, "Unable to connect to state sync data device: %m");
exit(1);
}
wl_node_unbecome();
return n;
}
int wl_count_entries(wl_node_t *node)
{
int count;
pending_update_t *ptr;
for (count=0, ptr=wl_pending_node_last(node); ptr; ptr=wl_pending_node_prev(ptr)) {
if (!(ptr->deleted))
count++;
}
return count;
}
int wl_admit_random_data(wl_state_t *wl, wl_node_t *node,
int key, int size)
{
char *entry;
int i;
if (size < 0) size = wl->entry_size;
if (size < 4) size = 4;
entry = malloc(size);
/* fill it */
for (i=0; i < size; i++)
entry[i] = random()%26 + 65;
int status = wl_admit_data(wl, node, key, entry, size);
return status;
}
/* steals entry memory! */
int wl_admit_data(wl_state_t *wl, wl_node_t *node, int key,
char *entry, int size)
{
/* create a new pending transaction record */
pending_update_t *p = wl_pending_create(wl);
static int app_seqno = 0;
int maybe_completed = 0;
wl_pending_node_push(node, p);
if (key < 0) {
key = node->next_key++;
if (node->next_key >= wl->max_keys) node->next_key = 0;
}
p->entry = entry;
p->entry_len = size;
p->key = key;
p->app_seq = ++app_seqno;
wl->main_stats.total_pushed++;
wl->main_stats.total_pushed_bytes += size;
/* copy key and seq into the entry */
memmove(p->entry, &(p->key), sizeof(p->key));
memmove(p->entry + sizeof(p->key), &(p->app_seq), sizeof(p->app_seq));
p->to_stamp = 1;
p->source = node;
elog(LOG_DEBUG(5), "Appending pending data to node %d", node->id);
pending_update_t *ptr;
for (ptr=wl_pending_node_top(node); ptr; ptr=wl_pending_node_next(ptr)) {
elog(LOG_DEBUG(5), "existing: seqno=%d, key=%d, src=%d, flags=%d,%d,%d ",
ptr->app_seq, ptr->key, ptr->source->id, ptr->deleted, ptr->to_stamp, ptr->mark);
if (!ptr->deleted && (ptr != p) && (key == ptr->key)) {
int i;
ptr->deleted = 1;
ptr->to_stamp = 1;
/* cancel any still waiting for arrial */
for (i=0; ptr->arrivals[i].node; i++) {
if (ptr->arrivals[i].latency == 0) {
ptr->arrivals[i].latency = -1;
ptr->arrivals[i].removal_latency = -1;
maybe_completed = 1;
}
}
}
}
if (maybe_completed)
wl_scan_for_complete(wl, 0);
return 0;
}
int wl_push_data(wl_state_t *wl, wl_node_t *node)
{
buf_t *data = buf_new();
buf_iter_t *iter = buf_item_iter_new(data);
int64_t elapsed64 = misc_time_elapsed_sys();
pending_update_t *ptr;
int retval = 0;
int count = 0;
flow_id_t fid = {
src_if: wl->iface,
dst_if: wl->iface,
max_hops: wl->hops
};
/* construct the data to push */
for (ptr = wl_pending_node_top(node); ptr; ptr = wl_pending_node_next(ptr)) {
if (!(ptr->deleted)) {
buf_item_iter_append_str(iter, ptr->entry, ptr->entry_len);
count++;
}
}
wl_node_become(node);
ssync_pub_t *pub =
ssync_pub_new(wl->prefix, wl->typename, 0, WL_KEY_LEN, &fid);
if (pub) {
ssync_pub_push_vector(pub, data->buf, data->len);
ssync_pub_issue(pub);
ssync_pub_free(pub);
}
else {
elog(LOG_WARNING, "Unable to push data to system: %m");
goto out;
}
wl_node_unbecome();
/* timestamp any mods.. */
for (ptr = wl_pending_node_top(node); ptr; ptr = wl_pending_node_next(ptr)) {
if (ptr->to_stamp) {
ptr->to_stamp = 0;
if (ptr->admission_time == 0)
ptr->admission_time = elapsed64;
else
ptr->removal_time = elapsed64;
}
}
out:
if (data) buf_free(data);
if (iter) buf_item_iter_destroy(iter, 0);
return retval;
}
See more files for this project here