cs_self_cluster.c from EmStar at Krugle
Show cs_self_cluster.c syntax highlighted
/*
*
* Copyright (c) 2004 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "cs_i.h"
#include "link/neighbor.h"
#include "topo_ctl/cluster.h"
#include <math.h>
static int cluster_elect_timeout(void *data, int interval, g_event_t *ev);
static int cluster_refresh_expected(void *data, int interval, g_event_t *ev);
static void cluster_search_mode(cs_per_link_t *csp);
static void cluster_head_mode(cs_per_link_t *csp);
static void cluster_isolated_mode(cs_per_link_t *csp);
static int cluster_refresh_timeout(void *data, int interval, g_event_t *ev);
void cluster_set_election_timer(cs_per_link_t *csp, int elect, char *reason);
void cluster_kill_election_timer(cs_per_link_t *csp);
int cs_source_get_link_index(source_t *s)
{
return s->flow_id.src_if;
}
lu_multi_link_t *cs_source_get_link(source_t *s)
{
/* lookup link by flow_id index */
cs_state_t *cs = (cs_state_t *)ssync_source_get_global_data(s);
return lu_multi_index_link(cs->link_ref, cs_source_get_link_index(s));
}
cs_per_link_t *cs_source_get_per_link(source_t *s)
{
lu_multi_link_t *link = cs_source_get_link(s);
if (link) return (cs_per_link_t *)lu_multi_link_get_data(link);
return NULL;
}
int cs_timer(int avg, int var)
{
return random_range(1000*(avg - var), 1000*(avg + var));
}
int cs_refresh_timer(cluster_mode_t mode, int bits, int jitter)
{
/* $$$$ add mode dependencies.. CH mode, etc */
if (bits == 0)
return CLUSTER_FAST_REF_MIN + jitter;
else {
uint32_t mult = 1;
mult = mult << (bits-1);
return (CLUSTER_REFRESH_BASE_SECS * mult) + jitter;
}
}
source_t *cluster_lookup(cs_per_link_t *csp, flow_id_t *fid)
{
source_t *src = ssync_source_lookup(csp->parent->ref, fid);
/* if doesn't exist, create it but mark it hidden.. */
if (src == NULL) {
if (fid->src == csp->if_id && fid->src_if == csp->iface) {
elog(LOG_WARNING, "NOT creating our own source!!");
return NULL;
}
src = ssync_source_lookup_create(csp->parent->ref, fid, 0, 0);
ssync_source_set_hidden(src, 1);
}
return src;
}
int cluster_source_is_parent(cs_per_link_t *csp, source_t *ptr)
{
int i;
for (i=0; i<CS_CLUSTER_MAX; i++)
if (ptr == csp->heads[i]) return 1;
return 0;
}
int cluster_source_is_connected(cs_per_link_t *csp, source_t *src)
{
cs_per_flow_t *pf = cs_maybe_init_source(src);
if (src == csp->our_source) return 1;
if (csp->our_clustering_state.bits.cluster_head) {
if (pf->cluster_index.byte != 0 &&
pf->cluster_index.ci.head == 0)
return 1;
}
if (cluster_source_is_parent(csp, src))
return 1;
return 0;
}
void cluster_kill_timers(cs_per_link_t *csp)
{
g_event_destroy(csp->elect_timer);
g_event_destroy(csp->refresh_timer);
source_t *ptr;
for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
if (cs_source_get_link_index(ptr) == csp->iface &&
!cluster_source_is_parent(csp, ptr)) {
cs_per_flow_t *pf = cs_maybe_init_source(ptr);
g_event_destroy(pf->expect_refresh_timer);
g_event_destroy(pf->join_complete_timer);
g_event_destroy(pf->rejection_refractory);
}
}
}
void cluster_reset_sources(cs_per_link_t *csp)
{
source_t *ptr;
for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
if (cs_source_get_link_index(ptr) == csp->iface &&
!cluster_source_is_parent(csp, ptr)) {
cs_per_flow_t *pf = cs_maybe_init_source(ptr);
/* hide all sources but ours on this link */
if (ptr != csp->our_source)
ssync_source_set_hidden(ptr, 1);
/* clear the clustering state */
pf->outgoing_clustering_state.byte = 0;
pf->cluster_index.byte = 0;
pf->reject = 0;
pf->warn = 0;
}
}
}
void cluster_update_join_bit(cs_per_link_t *csp)
{
int joining = 0;
int i;
for (i=0; i<CS_CLUSTER_MAX; i++)
if (csp->heads[i]) {
cs_per_flow_t *pf = cs_maybe_init_source(csp->heads[i]);
if (pf->outgoing_clustering_state.bits.joining)
joining = 1;
}
cs_per_flow_t *pf = cs_maybe_init_source(csp->our_source);
if (pf->outgoing_clustering_state.bits.joining)
joining = 1;
csp->our_clustering_state.bits.joining = joining;
}
void cluster_set_join_bit(cs_per_link_t *csp, int join)
{
cs_per_flow_t *pf = cs_maybe_init_source(csp->our_source);
pf->outgoing_clustering_state.bits.joining = join;
cluster_update_join_bit(csp);
}
void cluster_set_member_join_bit(cs_per_link_t *csp, cs_per_flow_t *pf, int join)
{
pf->outgoing_clustering_state.bits.joining = join;
cluster_update_join_bit(csp);
}
/*
* INDEX ASSIGNMENT
*/
int cluster_assign_cluster_index(cs_per_link_t *csp)
{
cs_state_t *cs = csp->parent;
cs_per_flow_t *pf = cs_maybe_init_source(csp->our_source);
pf->cluster_index.ci.head = 1;
/* choose random starting point */
int stop_at = (random() % 126) + 1;
pf->cluster_index.ci.index = stop_at;
retry:
/* inc */
pf->cluster_index.ci.index++;
if (pf->cluster_index.ci.index == 127)
pf->cluster_index.ci.index = 1;
/* time to stop? */
if (pf->cluster_index.ci.index == stop_at) {
elog(LOG_CRIT, "ran out of cluster indices");
pf->cluster_index.ci.index = 0;
return -1;
}
if (cs->latest_table && cs->latest_table_count > 0) {
int i;
for (i=0; i<cs->latest_table_count; i++) {
if (cs->latest_table[i].header.flow_id.src_if == csp->iface) {
if (cs->latest_table[i].cluster.index.byte ==
pf->cluster_index.byte) {
goto retry;
}
}
}
}
/* $$$ check to see if this is the same as other clusters of this node
* $$$ on other interfaces... */
return 0;
}
int cluster_assign_member_index(cs_per_link_t *csp, cs_per_flow_t *pf)
{
int count = 0;
retry:
if (count++ > 128) return -1;
/* increment */
csp->last_member_index++;
if ((csp->last_member_index == 0) || (csp->last_member_index >= 127))
csp->last_member_index = 1;
pf->cluster_index.ci.head = 0;
pf->cluster_index.ci.index = csp->last_member_index;
/* check */
source_t *ptr;
for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
if (ptr->flow_id.src_if == csp->iface) {
cs_per_flow_t *pf2 = cs_maybe_init_source(ptr);
if (pf == pf2) continue;
if (pf->cluster_index.byte == pf2->cluster_index.byte)
goto retry;
}
}
return 0;
}
/*
* REJECT
*/
void cluster_reject(cs_per_flow_t *pf, int report, char *reason)
{
cs_per_link_t *csp = cs_source_get_per_link(pf->parent);
if (csp->our_source == pf->parent) {
elog(LOG_CRIT, "rejecting ourselves??");
return;
}
elog(LOG_WARNING, "rejecting %s because %s",
print_if_id(pf->parent->flow_id.src), reason);
/* set reject bit */
pf->why_rejected = reason;
pf->warn = 0;
g_timer_add(CLUSTER_REJECT_REFRACT * 1000, NULL, NULL, NULL,
&(pf->rejection_refractory));
/* clear cluster index */
pf->cluster_index.byte = 0;
/* kill expect timer */
g_event_destroy(pf->expect_refresh_timer);
g_event_destroy(pf->join_complete_timer);
/* set hidden */
ssync_source_set_hidden(pf->parent, 1);
/* clear this head if we're a cluster member */
int i;
for (i=0; i<CS_CLUSTER_MAX; i++)
if (csp->heads[i] == pf->parent)
csp->heads[i] = NULL;
/* report this rejection? */
if (report) {
pf->reject = 1;
cluster_refresh_reset(csp, 0, 1, "rejecting node", NULL);
}
/* reeval? */
cluster_reeval(csp);
}
/*
* CLUSTER JOIN
*/
int cluster_join_timeout(void *data, int interval, g_event_t *ev)
{
cluster_reject((cs_per_flow_t *)data, 1, "join timed out");
return TIMER_DONE;
}
void cluster_maybe_set_join_timer(cs_per_flow_t *pf)
{
if (pf->join_complete_timer == NULL) {
g_timer_add(CLUSTER_JOIN_COMPLETE * 1000, cluster_join_timeout, pf,
NULL, &(pf->join_complete_timer));
}
}
void cluster_join(cs_per_link_t *csp, source_t *src)
{
cs_per_flow_t *pf = cs_maybe_init_source(src);
int i;
for (i=0; i<CS_CLUSTER_MAX; i++)
if (csp->heads[i] == NULL) {
csp->heads[i] = src;
goto set;
}
elog(LOG_CRIT, "can't join cluster.. no room in table");
return;
set:
/* set the outgoing cluster state */
pf->outgoing_clustering_state.byte = 0;
cluster_set_member_join_bit(csp, pf, 1);
/* enable this source */
ssync_source_set_hidden(src, 0);
/* set the join timer */
g_event_destroy(pf->join_complete_timer);
cluster_maybe_set_join_timer(pf);
/* reset the backoff */
cluster_refresh_reset(csp, 1, 1, "Trying to join", NULL);
elog(LOG_NOTICE, "trying to join cluster head %s",
print_if_id(src->flow_id.src));
}
/*
* REEVALUATE
*/
void cluster_reeval(cs_per_link_t *csp)
{
lu_multi_link_t *link = lu_multi_index_link(csp->parent->link_ref, csp->iface);
cs_state_t *cs = csp->parent;
cs_per_flow_t *our_pf = cs_maybe_init_source(csp->our_source);
static int in_reeval = 0;
if (in_reeval) return;
in_reeval = 1;
/*
* check cluster table:
* detect acked joiners and assign indices
* detect cluster index collisions,
* store cluster_info
*/
/*
* Preprocess neighbors:
* + clear mark bits,
* + update conn values,
* + count number of propects for a new cluster
*/
source_t *ptr;
csp->prospect_count = 0;
csp->connect_count = 0;
csp->max_reported_prospects = 0;
csp->competitors = 0;
for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
cs_per_flow_t *pf = cs_maybe_init_source(ptr);
/* clear mark */
pf->mark = 0;
/* update connectivity unless we think this guy is dead */
if (!pf->dead) {
if (!cs->use_rssi)
pf->last_conn = lu_multi_link_get_conn(link, pf->parent->flow_id.src);
}
else
pf->last_conn = 0;
/* if source is on our interface and not ourselves... */
if (ptr->flow_id.src_if == csp->iface && !ssync_source_is_local(ptr)) {
/*
* if this source is a potential neighbor:
* + this node has the min required connectivity to join
* + if it is not isolated
*/
if (pf->last_conn >= CLUSTER_MIN_JOIN_CONN &&
!pf->reported_clustering_state.bits.isolated) {
/* find maxmimum reported prospect count among non-CHs, and
* count competitors */
if (pf->reported_clustering_state.bits.cluster_head == 0) {
if (pf->reported_clustering_state.bits.member_count >
csp->max_reported_prospects) {
csp->max_reported_prospects = pf->reported_clustering_state.bits.member_count;
csp->competitors = 1;
csp->for_instance = ptr->flow_id.src;
}
else if (pf->reported_clustering_state.bits.member_count ==
csp->max_reported_prospects) {
csp->competitors++;
}
}
/*
* count as a prospect if
* + if we are a CH, this source is also a CH
* + it does not appear in our tables
*/
if (csp->our_clustering_state.bits.cluster_head == 0 ||
(csp->our_clustering_state.bits.cluster_head == 1 &&
pf->reported_clustering_state.bits.cluster_head == 1)) {
/* search the table for this node */
int i;
for (i=0; i<cs->latest_table_count; i++) {
/* if this is on this interface and the if ids match */
if (cs->latest_table[i].header.flow_id.src_if == csp->iface &&
cs->latest_table[i].cluster.if_id == ptr->flow_id.src) {
/* count this as a connected node */
csp->connect_count++;
goto found;
}
}
/* dear mr. prospect, */
csp->prospect_count++;
pf->is_prospect = 1;
goto next_source_1;
}
found:
pf->is_prospect = 0;
}
/* no prospect if bad connectivity */
else
pf->is_prospect = 0;
}
next_source_1:
continue;
}
#if 0
/* metric weights prospects by total number of potential prospects */
/* $$$ and sqrt to give more dynamic range at the low end? */
csp->connect_metric = 16 *
(float)csp->prospect_count /
(float)(csp->connect_count + csp->prospect_count);
#else
csp->connect_metric = csp->prospect_count - (csp->connect_count / 2);
if (csp->connect_metric < 0)
csp->connect_metric = 0;
#endif
/*
* Pass 1: we run thru the table and
* + if we are a CH, check for index collisions with other cluster heads
* + save cluster info from possible members
* + if we are a CH, assign indices to new members
*/
int i;
for (i=0; i<cs->latest_table_count; i++) {
/* lookup the source of this record */
source_t *ptr = ssync_source_lookup(csp->parent->ref, &(cs->latest_table[i].header.flow_id));
if (ptr == NULL) {
elog(LOG_WARNING, "no source for this record??");
continue;
}
cs_per_flow_t *pf = cs_maybe_init_source(ptr);
/* if we are a cluster head, check for an index collision */
if (csp->our_clustering_state.bits.cluster_head) {
if (cs->latest_table[i].cluster.node_id != my_node_id &&
cs->latest_table[i].cluster.index.ci.head &&
cs->latest_table[i].cluster.index.byte &&
cs->latest_table[i].cluster.index.byte == our_pf->cluster_index.byte &&
cs->latest_table[i].cluster.node_id < my_node_id) {
cluster_assign_cluster_index(csp);
}
}
/* if this record is ABOUT the source, and not our source */
if (ptr->flow_id.src == cs->latest_table[i].cluster.if_id &&
!ssync_source_is_local(ptr)) {
/* copy over the cluster info */
memmove(&(pf->cluster_info), &(cs->latest_table[i].cluster), sizeof(pf->cluster_info));
/* is this our parent? */
int is_parent = cluster_source_is_parent(csp, ptr);
/* if we are a cluster head, and
* if this node is reporting as a cluster member,
* or if it's a head and not one of our parents,
* assign a new index to it if needed */
if (csp->our_clustering_state.bits.cluster_head) {
if ((pf->reported_clustering_state.bits.cluster_head == 0) || !is_parent) {
if (pf->cluster_index.byte == 0 || pf->cluster_index.ci.head)
cluster_assign_member_index(csp, pf);
}
}
/* if we are not a cluster head, or it's our parent, then
* if it reported being a clusterhead then copy, else zero */
if (csp->our_clustering_state.bits.cluster_head == 0 || is_parent) {
if (pf->reported_clustering_state.bits.cluster_head &&
pf->cluster_info.index.byte != 0 && pf->cluster_info.index.ci.head)
pf->cluster_index.byte = pf->cluster_info.index.byte;
else
pf->cluster_index.byte = 0;
}
}
}
/*
* Pass 2: check for acked JOINs
* + if a cluster-head, when we receive an entry that describes US, and we
* have already assigned a cluster index to the sender, we finalize
* the join.
* + if a cluster-member, when we receive an entry that describes US,
* we know that we've been accepted to the cluster and assigned an
* address
*/
for (i=0; i<cs->latest_table_count; i++) {
/* lookup the source of this record */
source_t *ptr = ssync_source_lookup(csp->parent->ref, &(cs->latest_table[i].header.flow_id));
if (ptr == NULL) {
elog(LOG_WARNING, "no source for this record??");
continue;
}
cs_per_flow_t *pf = cs_maybe_init_source(ptr);
elog(LOG_DEBUG(0), "got record data.. %s: %s %s",
ssync_flowid_to_str(&(cs->latest_table[i].header.flow_id)),
print_if_id(cs->latest_table[i].cluster.node_id),
print_if_id(cs->latest_table[i].cluster.if_id));
/* if this record is about US, but not FROM us,
* we mark the source it's FROM -- acked join */
if (csp->our_source != ptr &&
cs->latest_table[i].cluster.node_id == my_node_id &&
cs->latest_table[i].cluster.if_id == csp->if_id) {
/* if we're a clusterhead, but we haven't assigned this member
* an index already, skip it... */
if (csp->our_clustering_state.bits.cluster_head &&
pf->cluster_index.byte == 0)
goto next_entry;
/* otherwise mark, unset the join bit and kill the join timer */
pf->mark = 1;
if (pf->join_complete_timer) {
elog(LOG_NOTICE, "join complete: %s->%d",
print_if_id(ptr->flow_id.src), my_node_id);
g_event_destroy(pf->join_complete_timer);
}
cluster_set_member_join_bit(csp, pf, 0);
}
next_entry:
continue;
}
/*
* Now, check for members/ch's that are unmarked, and reject them
*/
/* if cluster head, check through sources */
if (csp->our_clustering_state.bits.cluster_head) {
source_t *ptr;
for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
cs_per_flow_t *pf = cs_maybe_init_source(ptr);
/* if unmarked, and iface matches, and it's not ourselves,
* and it's not still joining, and we've assigned an index,
& and it's not one of our parents */
if (pf->mark == 0 &&
ptr->flow_id.src_if == csp->iface &&
ptr != csp->our_source &&
pf->join_complete_timer == NULL &&
pf->cluster_index.byte &&
!cluster_source_is_parent(csp, ptr)) {
/* drop it */
elog(LOG_NOTICE, "dropping member %s", print_if_id(ptr->flow_id.src));
cluster_reject(pf, 1, "not joining but missing from table");
}
}
}
/* if cluster member, just check the clusters array */
for (i=0; i<CS_CLUSTER_MAX; i++) {
if (csp->heads[i]) {
cs_per_flow_t *pf = cs_maybe_init_source(csp->heads[i]);
/* it's not marked and our join completed.. */
if (pf->mark == 0 &&
pf->join_complete_timer == NULL) {
cluster_reject(pf, 1, "join completed and not present in table");
}
}
}
/*
* NOW, see if we can join anything... or should drop anything...
*/
/*
* Assess our position as a cluster MEMBER
* + count current number of heads and min connectivity value
* + reject bad cluster heads
*/
int head_count=0;
int min_conn_value=0;
int min_conn_index=-1;
for (i=0; i<CS_CLUSTER_MAX; i++) {
if (csp->heads[i]) {
head_count++;
cs_per_flow_t *pf = cs_maybe_init_source(csp->heads[i]);
/* find min connectivity of current heads */
int conn = pf->last_conn;
#ifdef NEEDY_CLUSTERS
/* bump up the connectivity rating for "needy clusters" */
if (pf->reported_clustering_state.bits.member_count < CLUSTER_JOIN_MEM)
conn += CLUSTER_NEEDY_CONN_BUMP;
#endif
/* reject this parent if it's bad */
if (conn < CLUSTER_DROP_CONN) {
cluster_reject(pf, 1, "bad connectivity to cluster head");
}
/* otherwise count the min parent */
else {
if (min_conn_index < 0 || min_conn_value > conn) {
min_conn_index = i;
min_conn_value = conn;
}
}
}
}
csp->head_count = head_count;
/* if we are a cluster HEAD... */
if (csp->our_clustering_state.bits.cluster_head) {
/* update members count, drop if poor connectivity */
repeat:
csp->member_count = 0;
source_t *ptr;
source_t *cluster_min_conn = NULL;
int cluster_min_conn_value = -1;
for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
if (cs_source_get_link_index(ptr) == csp->iface &&
csp->our_source != ptr &&
!cluster_source_is_parent(csp, ptr)) {
cs_per_flow_t *pf = cs_maybe_init_source(ptr);
/* if this is in our cluster.. */
if (pf->cluster_index.byte) {
int conn = pf->last_conn;
if (pf->cluster_index.ci.head)
elog(LOG_CRIT, "cluster head should be cluster member");
/* reject now if it's crap */
if (conn < CLUSTER_DROP_CONN)
cluster_reject(pf, 1, "bad connectivity to cluster head");
/* else count it and record lowest acceptable conn value */
else {
csp->member_count++;
if ((cluster_min_conn == NULL || cluster_min_conn_value > conn) &&
(pf->reported_clustering_state.bits.member_count > 1)) {
cluster_min_conn_value = conn;
cluster_min_conn = ptr;
}
}
}
}
}
/* drop lowest if over max */
if (csp->member_count > CLUSTER_MAX_MEM) {
if (cluster_min_conn) {
cluster_reject(cs_maybe_init_source(cluster_min_conn),
1, "over max cluster size");
goto repeat;
}
}
/* if we are joining:
* if above min, stabilize now */
if (csp->our_clustering_state.bits.searching) {
if (csp->member_count >= CLUSTER_JOIN_MEM) {
/* stabilize now */
cluster_set_join_bit(csp, 0);
cluster_kill_election_timer(csp);
}
}
/* if we are stable but we drop below min,
* transition to joining */
else {
if (csp->member_count < CLUSTER_DROP_MEM) {
/* destabilize now */
cluster_set_join_bit(csp, 1);
cluster_set_election_timer(csp, 0, "Head's member count below drop min");
}
}
}
/*
* Choose to join another cluster?
*/
/* find the best new clusterhead */
source_t *max_conn = NULL;
int max_conn_value = -1;
int max_conn_members = -1;
for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
cs_per_flow_t *pf = cs_maybe_init_source(ptr);
/* try to join if
* + it's on our link and not ourselves
* + it's a cluster head, and it's a prospect
* + we're not already joining
* + it's not recently rejected
*/
if (cs_source_get_link_index(ptr) == csp->iface &&
ptr != csp->our_source &&
pf->reported_clustering_state.bits.cluster_head &&
pf->is_prospect &&
!pf->join_complete_timer &&
!pf->rejection_refractory) {
/* skip if already selected? */
if (cluster_source_is_parent(csp, ptr))
goto next_source_2;
/* check connectivity */
int conn = pf->last_conn;
/* record highest conn value */
if ((conn > CLUSTER_MIN_JOIN_CONN) &&
((max_conn == NULL || conn > max_conn_value))) {
max_conn_value = conn;
max_conn_members = pf->reported_clustering_state.bits.member_count;
max_conn = ptr;
}
}
next_source_2:
continue;
}
/* if we found an acceptable CH */
if (max_conn) {
elog(LOG_DEBUG(0), "found possible CH %s to join: %d",
print_if_id(max_conn->flow_id.src), max_conn_value);
/* if the new one is better, or if we need cluster heads,
* or if this CH needs us */
if ((((max_conn_value - min_conn_value) > CLUSTER_CHANGE_CONN) &&
min_conn_value < CLUSTER_MIN_JOIN_CONN) ||
head_count < (csp->parent->mobile_node ? 1 : CLUSTER_MIN_HEADS) ||
max_conn_members < CLUSTER_JOIN_MEM) {
/* if we're maxed out, reject the crappiest one */
if (head_count >= (csp->parent->mobile_node ? 1 : CS_CLUSTER_MAX)) {
cluster_reject(cs_maybe_init_source(csp->heads[min_conn_index]),
1, "rejecting to make room");
csp->head_count--;
}
/* now join the new one */
cluster_join(csp, max_conn);
csp->head_count++;
}
}
/* if we are NOT a cluster head, update searching mode */
if (!csp->our_clustering_state.bits.cluster_head) {
/* maybe (re)start election timer if we have the most prospects */
if (csp->connect_metric > CLUSTER_MIN_OK_METRIC &&
csp->connect_metric >= csp->max_reported_prospects) {
/* if we were not searching */
if (!csp->our_clustering_state.bits.searching) {
cluster_set_election_timer(csp, 1, "we have max prospects, not searching");
}
/* or if we see an increase in competitors */
else if (csp->competitors > (csp->prev_competitors + CLUSTER_COMPETE_CHANGE))
cluster_set_election_timer(csp, 1, "competitor count change");
}
/* otherwise, stop election timer */
else {
cluster_kill_election_timer(csp);
}
}
/*
* check for isolation (no neighbors with valid connectivity)
* if we are isolated, keep resetting the election timer
* $$$ might make sense to give up after a while
*/
int isolated = 0;
if (csp->member_count == 0 && csp->head_count == 0) {
source_t *ptr;
isolated = 1;
for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
cs_per_flow_t *pf = cs_maybe_init_source(ptr);
if (pf->last_conn > CLUSTER_MIN_JOIN_CONN) {
isolated = 0;
break;
}
}
}
/* if we're isolated, no election timer.. */
if (isolated) {
cluster_isolated_mode(csp);
}
else {
/* if we had been isolated enter search mode now */
if (csp->our_clustering_state.bits.isolated) {
cluster_search_mode(csp);
}
}
/* update clustering state */
/* $$$ ugly:
* setting bits all over the place
* two copies, one in csp one in our_source
*/
/* update the member count entry and out source's 'outgoing' entry */
int membership_value =
csp->our_clustering_state.bits.cluster_head ?
csp->member_count : csp->connect_metric;
csp->our_clustering_state.bits.member_count =
(membership_value < 16) ? membership_value : 15;
/* update the table */
cs_table_schedule_repub(csp->parent);
/* update the status device */
cluster_status_update(csp);
in_reeval = 0;
}
/*
* PROCESS CLUSTER MSG
*/
void cluster_process_msg(cs_per_link_t *csp, source_t *src,
char *msg, int length, int rssi)
{
cs_per_flow_t *pf = cs_maybe_init_source(src);
if (length < sizeof(cluster_hdr_t)) {
elog(LOG_WARNING, "cluster message too short");
goto done;
}
cluster_hdr_t *hdr = (cluster_hdr_t *)msg;
/* store the new state */
pf->reported_clustering_state = hdr->mode;
/* reset warn counter */
if (pf->warn_count > 0)
elog(LOG_DEBUG(0), "Resetting warn counter for %d", src->flow_id.src);
pf->warn_count = 0;
pf->warn = 0;
/* undead.. */
pf->dead = 0;
/* save rssi if we're using it for link est */
if (csp->parent->use_rssi) {
pf->last_conn = rssi;
}
/* process joins directed at us */
int remain = length - sizeof(cluster_hdr_t);
char *buf = msg + sizeof(cluster_hdr_t);
while (remain > 0) {
int ctrl_byte = buf[0];
buf++;
remain--;
if (remain >= sizeof(if_id_t)) {
if_id_t id;
memmove(&id, buf, sizeof(id));
buf += sizeof(id);
remain -= sizeof(id);
/* is it us? */
if (id == csp->if_id) {
switch (ctrl_byte) {
case CLUSTER_MSG_JOIN:
/* process only if we're a clusterhead */
if (csp->our_clustering_state.bits.cluster_head) {
elog(LOG_WARNING, "processing JOIN request from %s, total members %d",
print_if_id(src->flow_id.src), csp->member_count);
/* reject on join if below min conn level */
int conn = pf->last_conn;
if (conn < CLUSTER_MIN_JOIN_CONN)
cluster_reject(pf, 1, "bad connectivity to cluster member");
else {
/* unhide this source */
ssync_source_set_hidden(src, 0);
cluster_maybe_set_join_timer(pf);
}
}
else {
elog(LOG_WARNING, "JOIN request, but we're not a clusterhead");
}
break;
case CLUSTER_MSG_WARN:
elog(LOG_WARNING, "processing WARN request from %s",
print_if_id(src->flow_id.src));
cluster_refresh_reset(csp, 0, 1, "replying warning", NULL);
break;
case CLUSTER_MSG_REJECT:
elog(LOG_WARNING, "processing REJECT request from %s",
print_if_id(src->flow_id.src));
cluster_reject(pf, 0, "rejected by peer");
break;
default:
elog(LOG_WARNING, "processing UNKNOWN request from %s",
print_if_id(src->flow_id.src));
}
}
}
}
/* clear/reset expected refresh timer */
if (hdr->refresh_backoff != 0 || hdr->refresh_jitter != 0) {
int next_refresh =
cs_refresh_timer(hdr->mode, hdr->refresh_backoff, hdr->refresh_jitter) + 2;
elog(LOG_DEBUG(0), "resetting src %d refresh timer for %d",
src->flow_id.src, next_refresh);
g_event_destroy(pf->expect_refresh_timer);
g_timer_add(next_refresh * 1000,
cluster_refresh_expected, pf, NULL, &(pf->expect_refresh_timer));
}
else {
elog(LOG_DEBUG(0), "no new refresh time defined");
}
cluster_reeval(csp);
done:
return;
}
/*
* REFRESH TIMER
*/
static
int cluster_refresh_expected(void *data, int interval, g_event_t *ev)
{
cs_per_flow_t *pf = (cs_per_flow_t *)data;
cs_per_link_t *csp = cs_source_get_per_link(pf->parent);
int in_cluster = cluster_source_is_connected(csp, pf->parent);
/* we expected a refresh but didn't get it.. */
if (pf->cluster_index.byte)
elog(LOG_WARNING, "expected a refresh for %s but didn't get one",
print_if_id(pf->parent->flow_id.src));
/* increment warning count, and if it's in our cluster request warning */
if (in_cluster) pf->warn = 1;
pf->warn_count++;
/* too many warnings? */
if (pf->warn_count >= CLUSTER_MAX_WARNINGS) {
/* hide this neighbor and mark as dead */
ssync_source_set_hidden(pf->parent, 1);
pf->dead = 1;
/* if this was actually a member, reject it */
if (in_cluster) {
cluster_reject(pf, 1, "too many warnings");
}
return TIMER_DONE;
}
else {
/* if this is in our cluster, trigger a fast refresh sending a warning */
if (in_cluster) {
cluster_refresh_reset(csp, 0, 1, "issuing warning", NULL);
}
}
return TIMER_RENEW;
}
int cluster_refresh_reset(cs_per_link_t *csp, int reset_back,
int fast, char *reason, cluster_hdr_t *set_header)
{
if (reset_back) {
elog(LOG_NOTICE, "Resetting backoff, because %s", reason);
csp->refresh_bits = 0;
}
if (set_header) {
set_header->refresh_backoff = 0;
set_header->refresh_jitter = 0;
}
int delay = 0;
/* fast mode.. go immediately */
if (fast) {
elog(LOG_NOTICE, "Fast refresh!");
}
else {
int base_delay;
int jitter;
/* do we back off? */
if ((csp->our_clustering_state.bits.joining == 0) &&
(csp->our_clustering_state.bits.isolated == 0) &&
(csp->our_clustering_state.bits.searching == 0)) {
if (set_header) {
set_header->refresh_backoff = csp->refresh_bits;
}
base_delay =
cs_refresh_timer(csp->our_clustering_state, csp->refresh_bits, 0);
jitter = random_range(0, CLUSTER_FAST_REF_VAR * 1000);
/* increment the backoff exponent */
int refresh_max =
((csp->our_clustering_state.bits.cluster_head == 0) ?
CLUSTER_REFRESH_MAX_EXP : CLUSTER_REFRESH_MAX_HEAD_EXP);
if (csp->refresh_bits < refresh_max)
csp->refresh_bits++;
else
csp->refresh_bits = refresh_max;
}
/* otherwise use this standard timing */
else {
/* in isolated mode, ultra-fast beacons */
if (csp->our_clustering_state.bits.isolated) {
base_delay = CLUSTER_ISOLATED_REF;
jitter = random_range(0, 1000);
}
/* in other cases, fast beacons */
else {
base_delay = CLUSTER_FAST_REF_MIN;
jitter = random_range(0, CLUSTER_FAST_REF_VAR * 1000);
}
}
if (set_header) {
set_header->refresh_jitter = (jitter / 1000) + 1;
}
delay = base_delay*1000 + jitter;
}
/* if the timer is already set, don't set the refresh for further in future */
if (csp->refresh_timer) {
int curr_timer = g_timer_time_remaining(csp->refresh_timer);
if (delay > curr_timer) {
/* blank header entry means "same as before" */
if (set_header) {
set_header->refresh_backoff = 0;
set_header->refresh_jitter = 0;
}
return 0;
}
}
/* otherwise, reset refresh timer */
g_event_destroy(csp->refresh_timer);
g_timer_add(delay, cluster_refresh_timeout, csp, NULL, &(csp->refresh_timer));
return 0;
}
static
int cluster_refresh_timeout(void *data, int interval, g_event_t *ev)
{
cs_per_link_t *csp = (cs_per_link_t *)data;
/* set refresh bit */
csp->our_source->log.refresh_pending = 1;
return TIMER_DONE;
}
void cluster_clear_cluster_msg(cs_per_link_t *csp)
{
if (csp->outgoing_cluster_msg)
buf_free(csp->outgoing_cluster_msg);
csp->outgoing_cluster_msg = NULL;
/* send rejections, warnings and reset expected timers */
source_t *ptr;
for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
cs_per_flow_t *pf = cs_maybe_init_source(ptr);
if (pf->warn) {
pf->warn = 0;
g_event_destroy(pf->expect_refresh_timer);
g_timer_add(CLUSTER_REF_POST_WARN * 1000,
cluster_refresh_expected, pf, NULL, &(pf->expect_refresh_timer));
}
if (pf->reject) {
pf->reject = 0;
}
}
}
void cluster_update_cluster_msg(cs_per_link_t *csp)
{
/* check latest */
cluster_reeval(csp);
/* construct a clustering message */
if (csp->outgoing_cluster_msg)
buf_free(csp->outgoing_cluster_msg);
csp->outgoing_cluster_msg = buf_new();
/* construct cluster header */
cluster_hdr_t hdr = {
mode: csp->our_clustering_state,
};
/* reset the refresh timer */
cluster_refresh_reset(csp, 0, 0, "", &hdr);
/* push the clustering message header */
bufcpy(csp->outgoing_cluster_msg, &hdr, sizeof(hdr));
elog(LOG_INFO, "sending clustering message: %d,%d",
hdr.refresh_backoff, hdr.refresh_jitter);
/* push any join addresses */
int i;
cs_per_flow_t *pf;
for (i=0; i<CS_CLUSTER_MAX; i++) {
if (csp->heads[i]) {
pf = cs_maybe_init_source(csp->heads[i]);
if (pf->outgoing_clustering_state.bits.joining) {
elog(LOG_WARNING, "emitting JOIN request");
uint8_t tmp = CLUSTER_MSG_JOIN;
bufcpy(csp->outgoing_cluster_msg, &tmp, sizeof(tmp));
bufcpy(csp->outgoing_cluster_msg, &(pf->parent->flow_id.src),
sizeof(if_id_t));
}
}
}
/* send rejections, warnings and reset expected timers */
source_t *ptr;
for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
pf = cs_maybe_init_source(ptr);
/* warning needed? */
if (pf->warn) {
elog(LOG_WARNING, "emitting WARNING to %s",
print_if_id(pf->parent->flow_id.src));
uint8_t tmp = CLUSTER_MSG_WARN;
bufcpy(csp->outgoing_cluster_msg, &tmp, sizeof(tmp));
bufcpy(csp->outgoing_cluster_msg, &(pf->parent->flow_id.src),
sizeof(if_id_t));
}
/* rejection needed? */
if (pf->reject) {
elog(LOG_WARNING, "emitting REJECT to %s",
print_if_id(pf->parent->flow_id.src));
uint8_t tmp = CLUSTER_MSG_REJECT;
bufcpy(csp->outgoing_cluster_msg, &tmp, sizeof(tmp));
bufcpy(csp->outgoing_cluster_msg, &(pf->parent->flow_id.src),
sizeof(if_id_t));
}
}
}
/*
* ELECTION TIMER
*/
/* when election timer expires, switch between searching and cluster head */
static
int cluster_elect_timeout(void *data, int interval, g_event_t *ev)
{
cs_per_link_t *csp = (cs_per_link_t *)data;
if (csp->our_clustering_state.bits.searching) {
if (csp->our_clustering_state.bits.cluster_head || csp->parent->mobile_node) {
elog(LOG_NOTICE, "Un-elect timer expired: going back to searching");
cluster_search_mode(csp);
}
else {
elog(LOG_NOTICE, "Election timer expired: becoming cluster head");
cluster_head_mode(csp);
}
}
else {
elog(LOG_NOTICE, "Election timer expired: ignoring -- NOT SEARCHING");
}
return TIMER_DONE;
}
void cluster_kill_election_timer(cs_per_link_t *csp)
{
/* force fast refresh on election timer change */
if (csp->our_clustering_state.bits.searching) {
cluster_refresh_reset(csp, 0, 1, "killed election timer", NULL);
}
elog(LOG_DEBUG(0), "Killing election timer");
csp->our_clustering_state.bits.searching = 0;
g_event_destroy(csp->elect_timer);
}
void cluster_set_election_timer(cs_per_link_t *csp, int elect, char *reason)
{
int delay = CLUSTER_UNELECT * 1000;
if (elect) {
/* min delay of PER_COMP to allow for message arrival */
/* $$$ ought to be exponential distribution */
delay = cs_timer((csp->competitors + 3) * CLUSTER_ELECT_PER_COMP,
(csp->competitors + 1) * CLUSTER_ELECT_PER_COMP);
}
elog(LOG_NOTICE, "Resetting election timer for %.1f sec, because %s",
(float)delay / 1000.0, reason);
/* force fast refresh if we are starting to search */
if (!csp->our_clustering_state.bits.searching) {
cluster_refresh_reset(csp, 0, 1, "set election timer", NULL);
}
/* set searching bit and election timer */
csp->our_clustering_state.bits.searching = 1;
csp->prev_competitors = csp->competitors;
g_event_destroy(csp->elect_timer);
g_timer_add(delay, cluster_elect_timeout, csp, NULL, &(csp->elect_timer));
}
/*
* CLUSTER HEAD MODE
*/
static
void cluster_head_mode(cs_per_link_t *csp)
{
/* set mode variable */
csp->our_clustering_state.byte = 0;
csp->our_clustering_state.bits.cluster_head = 1;
cluster_set_join_bit(csp, 1);
/* kill all timers, hide sources */
cluster_kill_timers(csp);
cluster_reset_sources(csp);
/* assign random index */
cluster_assign_cluster_index(csp);
/* set election timer */
cluster_set_election_timer(csp, 0, "Entering cluster head mode");
/* set refresh timer */
cluster_refresh_reset(csp, 1, 1, "Entered cluster head mode", NULL);
cluster_status_update(csp);
}
/*
* ISOLATED MODE
*/
static
void cluster_isolated_mode(cs_per_link_t *csp)
{
if (!csp->our_clustering_state.bits.isolated) {
/* set mode variable */
csp->our_clustering_state.byte = 0;
csp->our_clustering_state.bits.isolated = 1;
csp->our_clustering_state.bits.searching = 1;
/* kill all timers, hide sources */
cluster_kill_timers(csp);
cluster_reset_sources(csp);
/* set refresh timer */
cluster_refresh_reset(csp, 1, 0, "Entered isolated mode", NULL);
cluster_status_update(csp);
}
}
/*
* SEARCH MODE
*/
static
void cluster_search_mode(cs_per_link_t *csp)
{
/* set mode variable */
csp->our_clustering_state.byte = 0;
/* kill all timers, hide sources */
cluster_kill_timers(csp);
cluster_reset_sources(csp);
/* set refresh timer */
cluster_refresh_reset(csp, 1, 0, "Entered search mode", NULL);
cluster_status_update(csp);
}
/*
* cluster status device
*/
char *cluster_mode_to_str(cluster_mode_t mode)
{
DECLARE_STATIC_BUF_RING(buf, 10, 32);
if (mode.byte == 0)
sprintf(buf, "None");
else
sprintf(buf, "%s%s%s%s(%d)",
mode.bits.cluster_head ? "Head" : "Member",
mode.bits.isolated ? ",Isolated" : "",
mode.bits.searching ? ",Searching" : "",
mode.bits.joining ? ",Joining:" : "",
mode.bits.member_count
);
return buf;
}
static
int cluster_mode_status_print(status_context_t *info, buf_t *buf)
{
cs_per_link_t *csp = (cs_per_link_t *) sd_data(info);
cs_per_flow_t *our_pf = cs_maybe_init_source(csp->our_source);
cluster_mode_t mode = {
byte: csp->our_clustering_state.byte
};
if (mode.bits.isolated) {
bufprintf(buf, "I(%.1f)",
g_timer_time_remaining(csp->refresh_timer) / 1000.0);
}
else {
if (mode.bits.cluster_head) {
bufprintf(buf, "H[%s]", cluster_map_index_to_str(our_pf->cluster_index));
}
else {
bufprintf(buf, "M:%d/%d",
csp->prospect_count, csp->connect_count);
}
bufprintf(buf, "(%d)",
mode.bits.member_count);
if (mode.bits.joining)
bufprintf(buf, "J");
if (mode.bits.searching) {
bufprintf(buf, "S(%.1f)",
g_timer_time_remaining(csp->elect_timer) / 1000.0);
}
else {
if (csp->elect_timer) {
bufprintf(buf, "S***(%.1f)",
g_timer_time_remaining(csp->elect_timer) / 1000.0);
}
else
if (mode.bits.cluster_head == 0 &&
csp->connect_metric > CLUSTER_MIN_OK_METRIC &&
csp->competitors > 0) {
bufprintf(buf, "W(%s)", print_if_id(csp->for_instance));
}
}
}
bufprintf(buf, "\n");
return STATUS_MSG_COMPLETE;
}
static
int cluster_status_print(status_context_t *info, buf_t *buf)
{
cs_per_link_t *csp = (cs_per_link_t *) sd_data(info);
cs_state_t *cs = csp->parent;
bufprintf(buf, "Cluster Sync Clustering Status, node %s\n",
print_if_id(my_node_id));
bufprintf(buf, "[if_class=%s, neighbors=%s]\n",
cs->if_class, cs->neighbors);
lu_multi_link_t *l;
for (l=lu_multi_links_top(cs->link_ref); l; l=lu_multi_links_next(l)) {
cs_per_link_t *csp = (cs_per_link_t *)lu_multi_link_get_data(l);
bufprintf(buf, "iface=%d, if_id=%s\n", csp->iface, print_if_id(csp->if_id));
bufprintf(buf, "=========================\n");
cs_per_flow_t *pf = cs_maybe_init_source(csp->our_source);
bufprintf(buf, "Mode: %s, index %s",
cluster_mode_to_str(csp->our_clustering_state),
cluster_map_index_to_str(pf->cluster_index));
if (csp->elect_timer)
bufprintf(buf, ", elect in %.3fs",
g_timer_time_remaining(csp->elect_timer) / 1000.0);
if (csp->refresh_timer)
bufprintf(buf, ", refresh in %.3fs",
g_timer_time_remaining(csp->refresh_timer) / 1000.0);
bufprintf(buf, "\n\n");
int pass;
for (pass=0; pass<2; pass++) {
source_t *ptr;
for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
if (pass == 0 && ptr->hidden) continue;
if (pass == 1 && !ptr->hidden) continue;
if (cs_source_get_link_index(ptr) == csp->iface) {
pf = cs_maybe_init_source(ptr);
char *type = "Neighbor";
if (ptr == csp->our_source) {
type = "SELF";
}
else {
if (cluster_source_is_parent(csp, ptr)) {
type = "Parent";
}
}
bufprintf(buf, " %s %s/%d[%s]: Conn=%d, Mode=%s, Reported=%s%s,\n",
type,
print_if_id(ptr->flow_id.src), ptr->flow_id.src_if,
cluster_map_index_to_str(pf->cluster_index),
pf->last_conn,
cluster_mode_to_str(pf->outgoing_clustering_state),
cluster_mode_to_str(pf->reported_clustering_state),
ptr->hidden ? ", Hidden" : "");
bufprintf(buf, " %s%s%swcount=%d%s%s, info: %s/%s",
pf->is_prospect ? "PROSPECT, " : "",
pf->dead ? "DEAD, " : "",
pf->warn ? "warn, " : "",
pf->warn_count,
pf->reject ? ", reject" : "",
pf->rejection_refractory ? ", pr" : "",
print_if_id(pf->cluster_info.node_id), print_if_id(pf->cluster_info.if_id)
);
if (pf->expect_refresh_timer)
bufprintf(buf, ", expect refresh in %.3fs",
g_timer_time_remaining(pf->expect_refresh_timer) / 1000.0);
if (pf->join_complete_timer)
bufprintf(buf, ", join in %.3fs",
g_timer_time_remaining(pf->join_complete_timer) / 1000.0);
bufprintf(buf, "\n");
if (pf->why_rejected)
bufprintf(buf, " Rejected because %s\n",
pf->why_rejected);
}
}
bufprintf(buf, "\n");
}
}
return STATUS_MSG_COMPLETE;
}
static
int cluster_status_bin(status_context_t *info, buf_t *buf)
{
cs_per_link_t *csp = (cs_per_link_t *) sd_data(info);
source_t *ptr;
for (ptr=ssync_sources_top(&(csp->parent->ref->sources)); ptr;
ptr=ssync_sources_next(ptr)) {
if (cs_source_get_link_index(ptr) == csp->iface) {
cs_per_flow_t *pf = cs_maybe_init_source(ptr);
if (!cluster_source_is_connected(csp, pf->parent))
continue;
/* $$$$ DO SPECIAL EMVIEW MODULE!!! */
neighbor_t ne = {
node_id: LINK_BROADCAST,
if_id: ptr->flow_id.src,
interface: ptr->flow_id.src_if,
state: pf->join_complete_timer ? ASYMMETRIC : ACTIVE,
conn_from: pf->join_complete_timer ?
g_timer_time_remaining(pf->join_complete_timer) / 1000 : pf->last_conn,
conn_to: 100
};
bufcpy(buf, (char*)&ne, sizeof(ne));
}
}
neighbor_t ne = {};
bufcpy(buf, (char*)&ne, sizeof(ne));
return STATUS_MSG_COMPLETE;
}
int cluster_status_update(cs_per_link_t *csp)
{
g_status_dev_notify(csp->cluster_stat);
g_status_dev_notify(csp->cluster_mode_stat);
return 0;
}
/*
* INITIALIZATION
*/
void cluster_link_init(lu_multi_link_t *link)
{
cs_per_link_t *csp = lu_multi_link_get_data(link);
status_dev_opts_t t_opts = {
device: {
devname: link_name_s(lu_multi_link_get_name(link), "cluster"),
device_info: csp
},
printable: cluster_status_print,
binary: cluster_status_bin
};
if (g_status_dev(&t_opts, &(csp->cluster_stat)) < 0) {
elog(LOG_CRIT, "Unable to create status device %s: %m",
t_opts.device.devname);
exit(1);
}
status_dev_opts_t t_opts2 = {
device: {
devname: link_name_s(lu_multi_link_get_name(link), "cluster_mode"),
device_info: csp
},
printable: cluster_mode_status_print,
};
if (g_status_dev(&t_opts2, &(csp->cluster_mode_stat)) < 0) {
elog(LOG_CRIT, "Unable to create status device %s: %m",
t_opts.device.devname);
exit(1);
}
/* initialize */
/* create our outbound bcast stream */
flow_id_t fid = {
src: csp->if_id,
dst: LINK_BROADCAST,
dst_if: csp->iface,
src_if: csp->iface,
max_hops: 1
};
csp->our_source = ssync_source_lookup_create(csp->parent->ref, &fid, 0, 1);
/* initialize our source */
cs_per_flow_t *pf = cs_maybe_init_source(csp->our_source);
pf->cluster_info.node_id = my_node_id;
pf->cluster_info.if_id = csp->if_id;
/* start in isolated mode */
cluster_isolated_mode(csp);
}
See more files for this project here