ssync_pure_soft.c from EmStar at Krugle
Show ssync_pure_soft.c syntax highlighted
/*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "libstatedev.h"
#include "ssync_pure_soft.h"
/*
* pure soft-state implementation over flooded medium
*
*/
/*
* receive data from net..
*/
static
int ssync_net_input(lu_context_t *lu, link_pkt_t *pkt, ssize_t data_len)
{
ssync_ps_state_t *ps = (ssync_ps_state_t *)lu_data(lu);
ssync_protocol_t *ss_pkt = (ssync_protocol_t *)(pkt->data);
type_t *t;
source_t *s;
state_element_t *elt;
flow_id_t flow_id = {
src: ss_pkt->source,
dst: LINK_BROADCAST,
max_hops: ss_pkt->hops
};
/* verify protocol version number */
if (ss_pkt->hdr.version != ps->ref->opts.version) {
elog(LOG_WARNING, "Received wrong protocol version %d, we are %d",
ss_pkt->hdr.version, ps->ref->opts.version);
goto out;
}
/* accept the data.. */
/* look up the type, source, element */
t = ssync_type_lookup(ps->ref, ss_pkt->type.type, ss_pkt->type.fixed_len,
ss_pkt->type.key_len,1);
s = ssync_source_lookup_create(ps->ref, &flow_id, 0, 0);
s->hops_away = ss_pkt->hops - pkt->max_hops;
elt = ssync_element_lookup(s, t, 1);
/* fill the elt */
if (elt->seqno != ss_pkt->seqno) {
elt->seqno = ss_pkt->seqno;
elt->rcv_time = pkt->rcv_time;
}
/* publish, no logs */
ssync_log_poor_mans_publish(elt, (char *)ss_pkt->data, data_len - sizeof(*ss_pkt));
out:
free(pkt);
return EVENT_RENEW;
}
/*
* ssync_send, sends a version 1 protocol message..
*/
static
int ssync_send(state_element_t *elt)
{
int retval = 0;
state_sync_t *ss = elt->source->parent;
ssync_ps_state_t *ps = (ssync_ps_state_t *)ssync_dev_get_data(ss);
buf_t *send_buf = buf_new();
link_pkt_t hdr = {
type: PKT_TYPE_STATESYNC,
dst: {
id: LINK_BROADCAST
},
max_hops: elt->source->flow_id.max_hops
};
ssync_protocol_t proto = {
hdr: {
version: ss->opts.version,
},
hops: elt->source->flow_id.max_hops,
source: elt->source->flow_id.src,
seqno: elt->seqno
};
memmove(&(proto.type), &(elt->type->type), sizeof(proto.type));
/* build packet */
bufcpy(send_buf, &hdr, sizeof(hdr));
bufcpy(send_buf, &proto, sizeof(proto));
ssync_log_get_curr_bin(elt, send_buf);
if (lu_send(ps->flood_ref, (link_pkt_t *)send_buf->buf, send_buf->len - sizeof(hdr)) < 0) {
elog(LOG_WARNING, "Write to flood failed: %m");
retval = -1;
}
buf_free(send_buf);
return retval;
}
/*
* Locally published data, with refresh
*/
static int ssync_next_refresh(state_element_t *elt)
{
if (!elt || !(elt->type) || !(elt->type->parent)) {
elog(LOG_WARNING, "invalid pointer somewhere!");
return 10000;
}
else {
ssync_ps_state_t *ps = (ssync_ps_state_t *)
ssync_dev_get_data(elt->type->parent);
int refresh = elt->source->refresh;
int variance = ps->refresh_variance * refresh;
return random_range(refresh - variance, refresh + variance);
}
}
static
int ssync_refresh(void *data, int interval, g_event_t *event)
{
state_element_t *elt = (state_element_t *)data;
ssync_send(elt);
return TIMER_RENEW_MS(ssync_next_refresh(elt));
}
static
int ssync_pure_soft_publish(state_element_t *elt)
{
g_event_destroy(elt->refresh_timer);
g_timer_add(ssync_next_refresh(elt), ssync_refresh, elt, NULL, &(elt->refresh_timer));
ssync_send(elt);
return 0;
}
int ssync_pure_soft_init(ssync_ps_state_t *ps, int *argc, char **argv, state_sync_dev_opts_t *opts)
{
if (opts->version == 1) {
char *uses = link_parse_uses(argc, argv, "flood");
/* configure callback */
opts->publish_cb = ssync_pure_soft_publish;
opts->cb_data = ps;
/* start up devices */
ssync_dev_new(opts, &(ps->ref));
{
/* options struct for opening a link user connection */
lu_opts_t luopts = {
opts: {
name: uses,
pkt_type: PKT_TYPE_STATESYNC,
data: ps
},
receive: ssync_net_input
};
/* open flooding adaptor */
if (lu_open(&luopts, &(ps->flood_ref)) < 0) {
elog(LOG_CRIT, "Can't open flooding adpator %s: %m",
link_name(&(luopts.opts), "data"));
exit(1);
}
}
/* parse variance arg */
ps->refresh_variance = SSYNC_DEFAULT_VARIANCE;
misc_parse_option_as_float(argc, argv, "variance", 0, &(ps->refresh_variance));
return 1;
}
return 0;
}
See more files for this project here