Show mhf_net.c syntax highlighted
/*
*
* Copyright (c) 2004 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "mhf_i.h"
void mhf_routes_update(mhf_state_t *mh);
static
void *mhf_net_start_msg(retx_context_t *r)
{
mhf_per_link_t *mhp = (mhf_per_link_t *)retx_get_data(r);
/* get the MTU */
lu_multi_link_t *link = lu_multi_index_link(mhp->parent->link_ref, mhp->iface);
lu_context_t *lu = lu_multi_link_get_lu(link);
uint16_t MTU = 200;
if (lu_get_mtu(lu, &MTU) < 0)
elog(LOG_WARNING, "can't get MTU for interface %d", mhp->iface);
/* create packet header */
link_pkt_t hdr = {
dst: {
id: LINK_BROADCAST
},
type: mhp->parent->pkt_type
};
buf_t *pkt = buf_new();
bufcpy(pkt, &hdr, sizeof(hdr));
/* initialize target */
mhp->curr_target.byte = 0;
/* create iterator */
ssync_msg_iter_t *iter = ssync_msg_iter_new(NULL, 0, MTU);
ssync_msg_iter_init_msg(iter, 0, pkt);
/* return it ... */
return iter;
}
static
int mhf_net_append_msg
(retx_context_t *r, void *msg_state, source_t *src,
uint8_t list_index, log_seqno_t seqno,
uint8_t entry_type, char *entry_data, uint8_t entry_len)
{
mhf_per_link_t *mhp = (mhf_per_link_t *)retx_get_data(r);
ssync_msg_iter_t *iter = (ssync_msg_iter_t *)msg_state;
/* define flow index */
uint8_t flow_index=0;
ssync_msg_iter_maybe_flow_map(iter, &flow_index, &(src->flow_id), src->hops_away+1);
/* if this is a NOP going out, append the seqno */
if (entry_type == RETX2_CTRL_NOP) {
per_flow_state_t *pf = mhf_maybe_init_source(src);
entry_data = (char*)&(pf->refresh_seqno);
entry_len = sizeof(int16_t);
}
return ssync_msg_iter_append_msg
(iter, entry_type, flow_index, mhp->curr_target,
list_index, seqno, 0, 0, entry_data, entry_len);
}
static
int mhf_net_append_nack
(retx_context_t *r, void *msg_state, source_t *src, int is_init,
uint8_t list_index, log_seqno_t seqno, uint16_t nack_count, int retry)
{
mhf_per_link_t *mhp = (mhf_per_link_t *)retx_get_data(r);
ssync_msg_iter_t *iter = (ssync_msg_iter_t *)msg_state;
/* choose our retransmitter */
per_flow_state_t *pf = mhf_maybe_init_source(src);
cl_index_t target_index = { byte: 0 };
if (pf->upstream.if_id && !retry) {
ssync_msg_iter_maybe_addr_map(iter, &target_index, pf->upstream.if_id);
}
/* define flow index */
uint8_t flow_index=0;
ssync_msg_iter_maybe_flow_map(iter, &flow_index, &(src->flow_id), src->hops_away+1);
/* save this new target */
mhp->curr_target = target_index;
return ssync_msg_iter_append_msg(iter, RETX2_CTRL_NACK, flow_index,
mhp->curr_target, list_index, seqno,
is_init, nack_count, NULL, 0);
}
static
int mhf_net_complete_msg(retx_context_t *r, void *msg_state)
{
mhf_per_link_t *mhp = (mhf_per_link_t *)retx_get_data(r);
ssync_msg_iter_t *iter = (ssync_msg_iter_t *)msg_state;
int retval = -1;
buf_t *pkt = ssync_msg_iter_finalize_msg(iter);
/* only send if there is actually data there */
if (pkt->len > (sizeof(link_pkt_t) + sizeof(ssync_hdr_t))) {
link_pkt_t *hdr = (link_pkt_t *)pkt->buf;
/* send it ! */
retval = lu_multi_send(mhp->parent->link_ref, mhp->iface,
hdr, pkt->len - sizeof(link_pkt_t));
if (retval < 0) {
elog(LOG_WARNING, "unable to send message to interface %d: %m", mhp->iface);
}
else {
if (mhp->parent->no_throttle)
retval = RETX_NODELAY;
}
}
/* clean up */
ssync_msg_iter_destroy(iter);
buf_free(pkt);
return retval;
}
static
int mhf_net_get_msg_len(retx_context_t *r, void *msg_state)
{
ssync_msg_iter_t *iter = (ssync_msg_iter_t *)msg_state;
return ssync_msg_iter_get_len(iter);
}
static
void mhf_net_new_link(lu_multi_context_t *lu_m, lu_multi_link_t *l)
{
mhf_per_link_t *mhp = (mhf_per_link_t *)g_new0(mhf_per_link_t, 1);
mhp->parent = (mhf_state_t *)lu_multi_link_get_data(l);
lu_multi_link_set_data(l, mhp);
mhp->iface = lu_multi_link_get_index(l);
if (lu_multi_get_if_id(lu_m, mhp->iface, &(mhp->if_id)) < 0) {
elog(LOG_CRIT, "failed to get if id: %m");
exit(1);
}
retx_opts_t ro = {
start_msg: mhf_net_start_msg,
append: mhf_net_append_msg,
append_nack: mhf_net_append_nack,
complete_msg: mhf_net_complete_msg,
get_msg_len: mhf_net_get_msg_len,
iface: mhp->iface,
private_data: mhp,
ssync: mhp->parent->ref,
refresh_msg_mode: 1,
fast_flood: mhp->parent->no_throttle,
fast_retx: mhp->parent->fast_refresh,
rate: mhp->parent->rate,
variance: mhp->parent->rate * 0.025,
};
if (retx_new(&ro, &(mhp->retx)) < 0) {
elog(LOG_CRIT, "unable to start retx protocol");
exit(1);
}
}
static
int mhf_timed_out(void *data, int interval, g_event_t *ev)
{
source_t *src = (source_t *)data;
per_flow_state_t *pf = mhf_maybe_init_source(src);
elog(LOG_NOTICE, "Source timed out!");
g_event_destroy(ev);
pf->timed_out = 1;
ssync_source_set_hidden(src, 1);
return TIMER_DONE;
}
static
int mhf_net_input(lu_multi_link_t *lu_l, link_pkt_t *pkt, ssize_t data_len)
{
mhf_per_link_t *mhp = (mhf_per_link_t *)lu_multi_link_get_data(lu_l);
mhf_state_t *mh = mhp->parent;
/* check for looped packets */
if (pkt->src.id == mhp->if_id) {
elog(LOG_WARNING, "dropping looped packet");
goto out;
}
/* handle statesync packets */
if ((pkt->type == mh->pkt_type) &&
(data_len >= sizeof(ssync_hdr_t))) {
ssync_hdr_t *msg = (ssync_hdr_t *)(pkt->data);
if (msg->version == SSYNC_RETRANS_V2) {
ssync_msg_iter_t *iter =
ssync_msg_iter_new((char *)msg->data, data_len - sizeof(*msg), 0);
for (ssync_msg_iter_top(iter); ssync_msg_iter_valid(iter);
ssync_msg_iter_next(iter)) {
/* we ignore some commands: */
switch (iter->command) {
case RETX2_CTRL_SEQNO:
case RETX2_CTRL_LIST:
case RETX2_CTRL_ADDR:
case RETX2_CTRL_FLOW:
goto skip;
case 0xD:
case 0xE:
elog(LOG_WARNING, "unrecognized command %d", iter->command);
goto skip;
default:
break;
}
/* skip flow index 0.. that's cluster sync */
if (iter->flow_index == 0)
goto skip;
if (iter->curr_flow.id.src == 0) {
elog(LOG_WARNING, "no flow id set");
goto skip;
}
/* lookup or create source */
source_t *target_src =
ssync_source_lookup(mh->ref, &(iter->curr_flow.id));
if (target_src == NULL) {
if (iter->curr_flow.id.src != my_node_id)
target_src = ssync_source_create(mh->ref, &(iter->curr_flow.id), 0, 0);
else {
elog(LOG_WARNING, "not creating local source from data from net!");
goto skip;
}
}
per_flow_state_t *pf = mhf_maybe_init_source(target_src);
int maybe_refresh = 0;
/* $$$ need to filter hopcount? */
/* set hops and upstream neighbor */
if (!ssync_source_is_local(target_src)) {
/* if it's a NOP.. */
if (iter->command == RETX2_CTRL_NOP) {
/* verify length */
if (iter->length == sizeof(int16_t)) {
int16_t seqno;
memmove(&seqno, iter->msg, sizeof(int16_t));
/* if this seqno is newer, forward it, and reset our min hopcount */
if ((seqno - pf->refresh_seqno) > 0 ||
(seqno - pf->refresh_seqno) < -3) {
pf->refresh_seqno = seqno;
maybe_refresh = 1;
target_src->hops_away = -1;
elog(LOG_DEBUG(0), "forwarding refresh, hop %d/%d, from %d",
iter->curr_flow.hops, target_src->flow_id.max_hops,
target_src->flow_id.src);
}
}
else
elog(LOG_WARNING, "refresh message, wrong length");
}
if (target_src->hops_away < 0 || target_src->hops_away > iter->curr_flow.hops) {
target_src->hops_away = iter->curr_flow.hops;
pf->upstream.if_id = pkt->src.id;
pf->upstream.interface = mhp->iface;
mhf_routes_update(mh);
}
}
/* set forwarding based on hopcount */
int in_hopcount = (target_src->hops_away <= target_src->flow_id.max_hops);
int fwd_hopcount = (target_src->hops_away < target_src->flow_id.max_hops);
memset(pf->forward_vector, boolify(fwd_hopcount), sizeof(pf->forward_vector));
/* set pending if we got new data, and either we have overflood set
* or we are still in the fwd hopcount region */
if ((mh->overflood || fwd_hopcount) && maybe_refresh)
target_src->log.refresh_pending = 1;
elog(LOG_DEBUG(0), "msg %d from %d to %d, fwd %d in %d",
iter->command, target_src->flow_id.src, my_node_id,
fwd_hopcount, in_hopcount);
/* update the expect refresh timer */
if (!ssync_source_is_local(target_src)) {
pf->timed_out = 0;
g_event_destroy(pf->expect_refresh_timer);
int expire = MHF_SEQNO_REFRESH * 2.5;
if (mh->slow_refresh) expire *= MHF_SLOW_REFRESH_FACTOR;
g_timer_add(expire, mhf_timed_out, target_src, NULL,
&(pf->expect_refresh_timer));
}
/* based on hopcount, (un)hide */
ssync_source_set_hidden(target_src, pf->timed_out || !in_hopcount);
/* if no target, or our if id targeted, set ourselves as the target */
cl_id_t src_id = {};
cl_id_t target_id = {};
if ((iter->target.byte == 0) ||
(mhp->if_id == iter->if_addr))
target_id.node_id = my_node_id;
/* process this into retx protocol */
if (iter->command == RETX2_CTRL_NACK)
retx_process_nack(mhp->retx, &src_id, &target_id,
target_src, iter->nack_init,
iter->nack_list_index, iter->nack_seqno,
iter->nack_count);
else {
if (!ssync_source_is_local(target_src)) {
retx_process_msg(mhp->retx, &src_id, &target_id,
target_src, iter->command,
iter->list_index, iter->seqno,
iter->msg, iter->length, 0);
}
}
skip:
continue;
}
ssync_msg_iter_destroy(iter);
}
else
elog(LOG_DEBUG(0), "dropping non-sync packet");
}
out:
free(pkt);
return EVENT_RENEW;
}
static
int mhf_routes_print(status_context_t *info, buf_t *buf)
{
mhf_state_t *mh = (mhf_state_t *) sd_data(info);
bufprintf(buf, "Routing Table, node %d\n", my_node_id);
bufprintf(buf, "-----------------------\n");
source_t *ptr;
for (ptr=ssync_sources_top(&(mh->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
per_flow_state_t *pf = mhf_maybe_init_source(ptr);
bufprintf(buf, "%s: ", ssync_flowid_to_str(&(ptr->flow_id)));
bufprintf(buf, "%s%s",
ptr->hidden ? "**HIDDEN** " : "",
ssync_source_is_local(ptr) ? "**SOURCED** " : "");
bufprintf(buf, "\n");
if (!ptr->hidden && !ssync_source_is_local(ptr)) {
if (pf->upstream.if_id)
bufprintf(buf, " NextHop: %s, iface %d, %d hops away\n",
print_if_id(pf->upstream.if_id), pf->upstream.interface,
ptr->hops_away);
}
/* dump routing state */
int i;
int first = 1;
for (i=0; i<MAX_IFACES; i++) {
if (pf->forward_vector[i]) {
if (first)
bufprintf(buf, " Forward to ifaces: ");
bufprintf(buf, "%d, ", i);
first = 0;
}
}
if (!first) bufprintf(buf, "\n");
/* dump retx state */
retx_state_to_buf(buf, ptr, " ");
}
bufprintf(buf, "\n");
return STATUS_MSG_COMPLETE;
}
static
int mhf_routes_bin(status_context_t *info, buf_t *buf)
{
mhf_state_t *mh = (mhf_state_t *) sd_data(info);
source_t *ptr;
for (ptr=ssync_sources_top(&(mh->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
if (!ptr->hidden && !ssync_source_is_local(ptr)) {
per_flow_state_t *pf = mhf_maybe_init_source(ptr);
if (pf->upstream.if_id && !pf->timed_out) {
route_entry_t entry = {
dst: ptr->flow_id.src,
next_hop: pf->upstream.if_id,
interface: pf->upstream.interface,
hops_metric: ptr->hops_away
};
bufcpy(buf, &entry, sizeof(entry));
}
}
}
route_entry_t entry = {};
bufcpy(buf, &entry, sizeof(entry));
return STATUS_MSG_COMPLETE;
}
void mhf_routes_update(mhf_state_t *mh)
{
g_status_dev_notify(mh->routes_status);
}
void mhf_net_init(mhf_state_t *mh, int *argc, char **argv)
{
/* options */
mh->no_throttle = misc_parse_out_switch(argc, argv, "no_throttle", 0);
/* parse optional packet type */
mh->pkt_type = PKT_TYPE_STATESYNC;
char *type = misc_parse_out_option(argc, argv, "pkt_type", 0);
if (type) mh->pkt_type = link_parse_pkt_type(type);
if (mh->pkt_type < 0) {
elog(LOG_CRIT, "Unable to parse packet type '%s'", type);
}
else if (type) {
elog(LOG_NOTICE, "Using packet type %s (%d)", type, mh->pkt_type);
}
if (misc_parse_option_as_int(argc, argv, "send_rate", 0, &(mh->rate)) < 0) {
if (mh->fast_refresh) {
mh->rate = 50;
}
}
/* parse link config flags */
mh->if_class = misc_parse_out_option(argc, argv, "if_class", 0);
mh->neighbors = misc_parse_out_option(argc, argv, "neighbors", 0);
/* options struct for opening a link user connection */
lu_multi_opts_t opts = {
lu_opts: {
opts: {
pkt_type: mh->pkt_type,
data: mh
},
},
if_class: mh->if_class ? mh->if_class : LINK_CLASS_LINK,
neighbors_list: mh->neighbors,
receive: mhf_net_input,
new_link: mhf_net_new_link
};
if (lu_open_multi(&opts, &(mh->link_ref)) < 0) {
elog(LOG_CRIT, "Can't open link class %s: %m", opts.if_class);
exit(1);
}
/* create routing output */
status_dev_opts_t t_opts = {
device: {
devname: ssync_path(mh->ref->opts.prefix, SSYNC_SUFFIX_ROUTES),
device_info: mh
},
printable: mhf_routes_print,
binary: mhf_routes_bin
};
if (g_status_dev(&t_opts, &(mh->routes_status)) < 0) {
elog(LOG_CRIT, "Unable to create routes status device %s: %m",
t_opts.device.devname);
exit(1);
}
}
See more files for this project here