MdOppGradient.c from EmStar at Krugle
Show MdOppGradient.c syntax highlighted
////////////////////////////////////////////////////////////////////////////
//
// CENS
//
// Copyright (c) 2003 The Regents of the University of California. All
// rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
//
// - Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
//
// - Neither the name of the University nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
// PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// Contents: This file contains the implementaion of the One-Phase-Pull
// routing algorithim and gradient logic for Micro Diffusion.
//
// Purpose: The purpose of this functionality is to create routing trees
// and forward data that matches the subscriptions of local and
// remote applications.
//
////////////////////////////////////////////////////////////////////////////
//
// $Id: MdOppGradient.c,v 1.11 2004/05/10 22:23:33 adparker Exp $
#include <assert.h>
#include <link/link.h>
#include "DiffTypes.h"
#include "MdForwarder.h"
#include "MdOppGradient.h"
#include "Mdiff.h"
#include "MdMatch.h"
QUEUE_FUNCTION_INSTANTIATIONS(IntDiffMsg_queue, idm_ptrs, idm_list,
IntDiffMsg_elem_t, IntDiffMsg_queue_t);
QUEUE_FUNCTION_INSTANTIATIONS(DiffGradient_queue, dg_ptrs, dg_list,
DiffGradient_elem_t, DiffGradient_queue_t);
static void int2ip(uint32_t number, char * ip)
{
snprintf(ip,16,"%u.%u.%u.%u",
(number & 0xff000000) >> 24,
(number & 0x00ff0000) >> 16,
(number & 0x0000ff00) >> 8,
(number & 0x000000ff));
}
// This constant is the size needed for a gradient buffer entry.
// ??? define the things below in DiffTypes.h
// TD_OPP_LOCAL_BUFF_SIZE = sizeof( DiffGradient_t )
// + ( TD_GRADIENT_MAX_ATTR * sizeof( DiffAttr_t ) )
//
// This is the number of attributes that each interest must have to be valid.
// TD_NUM_STANDARD_INTEREST_ATTRS = 2
////////////////////////////////////////////////////////////////////////////
//
// Command: StdControl.init( )
//
// Purpose: The purpose of this command is to init this component.
//
////////////////////////////////////////////////////////////////////////////
//
// ??? come back to this after i figure out which variable i'm keeping.
//
result_t MdOppGradient_init(Mdiff_state_t * state )
{
// Init interests queue
state->MdOppGradient->interest_queue = (IntDiffMsg_queue_t *)malloc(sizeof(IntDiffMsg_queue_t));
IntDiffMsg_queue_init(state->MdOppGradient->interest_queue);
// Init gradients queue
state->MdOppGradient->gradient_queue = (DiffGradient_queue_t *)malloc(sizeof(DiffGradient_queue_t));
DiffGradient_queue_init(state->MdOppGradient->gradient_queue);
// Used to initialize new interests when local subscriptions are
// made.
state->MdOppGradient->m_pStdAttr[ 0 ].m_uiAttr = TD_ATTR_CLASS;
state->MdOppGradient->m_pStdAttr[ 0 ].m_uiOp = TD_IS;
state->MdOppGradient->m_pStdAttr[ 0 ].m_iValue = TD_ATTR_INTEREST;
state->MdOppGradient->m_pStdAttr[ 1 ].m_uiAttr = TD_ATTR_OPP_ROUND;
state->MdOppGradient->m_pStdAttr[ 1 ].m_uiOp = TD_IS;
state->MdOppGradient->m_pStdAttr[ 1 ].m_iValue = 0;
state->MdOppGradient->m_iHandleSeq = 0;
// Used to check if a message is an interest.
state->MdOppGradient->m_uiNumInterestAttrs = 2;
state->MdOppGradient->m_pInterestAttr[ 0 ].m_uiAttr = TD_ATTR_CLASS;
state->MdOppGradient->m_pInterestAttr[ 0 ].m_uiOp = TD_EQ;
state->MdOppGradient->m_pInterestAttr[ 0 ].m_iValue = TD_ATTR_INTEREST;
state->MdOppGradient->m_pInterestAttr[ 1 ].m_uiAttr = TD_ATTR_OPP_ROUND;
state->MdOppGradient->m_pInterestAttr[ 1 ].m_uiOp = TD_ANY;
state->MdOppGradient->m_pInterestAttr[ 1 ].m_iValue = 0;
// used to check if a message is data
state->MdOppGradient->m_uiNumDataAttrs = 1;
state->MdOppGradient->m_pDataAttr[ 0 ].m_uiAttr = TD_ATTR_CLASS;
state->MdOppGradient->m_pDataAttr[ 0 ].m_uiOp = TD_EQ;
state->MdOppGradient->m_pDataAttr[ 0 ].m_iValue = TD_ATTR_DATA;
return SUCCESS;
}
////////////////////////////////////////////////////////////////////////////
//
// Command: StdControl.start( )
//
// Purpose: The purpose of this command is to start this component.
//
////////////////////////////////////////////////////////////////////////////
//
// This is run after the initialization.
//
result_t MdOppGradient_start( Mdiff_state_t * state )
{
result_t tReturn = SUCCESS;
elog(LOG_INFO," ");
// Timer to broadcast interests. TD_GRADIENT_PERIOD
g_event_destroy(state->MdOppGradient->interest_timer);
g_timer_add(TD_GRADIENT_PERIOD,
MdOppGradient_sendInterests,
state,
NULL,
&(state->MdOppGradient->interest_timer));
// Timer to run gradient timeouts. TD_GRADIENT_PERIOD
g_event_destroy(state->MdOppGradient->gradient_timer);
g_timer_add(TD_GRADIENT_PERIOD,
MdOppGradient_timeoutGradients,
state,
NULL,
&(state->MdOppGradient->gradient_timer));
return tReturn;
}
////////////////////////////////////////////////////////////////////////////
//
// Command: StdControl.stop( )
//
// Purpose: The purpose of this command is to stop this component.
//
////////////////////////////////////////////////////////////////////////////
result_t MdOppGradient_stop( Mdiff_state_t * state)
{
// ??? Destroy timers, devices, and clients.
//return call Timer.stop( );
return SUCCESS;
}
////////////////////////////////////////////////////////////////////////////
//
// Event: TdFilterI.recv( )
//
// Purpose: This event is used to signal the user filter that a message
// matching its criteria has arrived.
//
////////////////////////////////////////////////////////////////////////////
//
// p_pMsg: owned by function.
// ownership passed to MdOppGradient_matchGradient.
result_t MdOppGradient_recv( Mdiff_state_t * state, IntDiffMsg_t *p_pMsg )
{
result_t tReturn = SUCCESS;
elog(LOG_INFO," ");
assert(p_pMsg != NULL);
MdOppGradient_matchGradient(state, p_pMsg);
return tReturn;
}
////////////////////////////////////////////////////////////////////////////
//
// Event: TdFilterI.getMatchAttrs( )
//
// Purpose: This event is used as a pseduo-virtual method to allow the
// implementor of a filter to return their own attribute array
// for their filter during matching.
//
////////////////////////////////////////////////////////////////////////////
//
// ??? Maybe expose this through a status device. But not needed by Mdiff.
//
////////////////////////////////////////////////////////////////////////////
//
// Command: TdPubSubI.publish( )
//
// Purpose: This command allows users to inform the gradient filter that
// this node will act as a source for data that is described in
// the attribute parameters of this call. This call allows users
// to send data using this publish handle without having to reset
// all of the publish attributes every time.
//
////////////////////////////////////////////////////////////////////////////
//
// SKIP
//
////////////////////////////////////////////////////////////////////////////
//
// Command: TdPubSubI.unPublish( )
//
// Purpose: The purpose of this command
//
////////////////////////////////////////////////////////////////////////////
//
// SKIP
//
////////////////////////////////////////////////////////////////////////////
//
// Command: TdPubSubI.subscribe( )
//
// Purpose: This command allows users to inform the gradient filter that
// this node will act as a sink for the data specified by the
// attribute parameters of this call. This call informs the
// gradient to deliver messages to this application whenever the
// message content matches this subscription.
//
////////////////////////////////////////////////////////////////////////////
//
// p_pAttributeArray: owned by func.
// must be freed.
// REQUIRES that p_pAttr be non-null and p_uiNumAttr be non-zero.
int8_t MdOppGradient_subscribe( Mdiff_state_t * state,
DiffAttr_t *p_pAttributeArray,
uint8_t p_uiNumAttr )
{
int8_t returnHandle = 0;
DiffAttr_t * p_pAttrCpy = NULL;
elog(LOG_INFO," ");
assert(p_pAttributeArray != NULL);
assert(p_uiNumAttr != 0);
// Add a new gradient based on the attributes passed.
p_pAttrCpy = (DiffAttr_t *)malloc(sizeof(DiffAttr_t) * p_uiNumAttr);
memcpy(p_pAttrCpy, p_pAttributeArray, sizeof(DiffAttr_t) * p_uiNumAttr);
returnHandle = MdOppGradient_addGradient(state, TD_LOCAL_ADDR, TD_LOCAL_ADDR,
p_pAttrCpy, p_uiNumAttr, 0);
// Add a new interest based on the attributes passed.
// Ownership of p_pAttributeArray passed to addInterest
MdOppGradient_addInterest(state, p_pAttributeArray, p_uiNumAttr);
return returnHandle;
}
result_t MdOppGradient_addInterest(Mdiff_state_t * state, DiffAttr_t * p_pAttributeArray,
uint8_t p_uiNumAttr)
{
IntDiffMsg_t * p_interest = (IntDiffMsg_t *)malloc(sizeof(IntDiffMsg_t));
IntDiffMsg_elem_t * p_elem = (IntDiffMsg_elem_t *)malloc(sizeof(IntDiffMsg_elem_t));
size_t len = 0;
elog(LOG_INFO," ");
state->MdOppGradient->m_pStdAttr[1].m_uiAttr = 1;
memset(p_interest,0,sizeof(IntDiffMsg_t));
memset(p_elem,0,sizeof(IntDiffMsg_elem_t));
p_elem->tMsg = p_interest;
// Copy the attributes
len = p_uiNumAttr * sizeof(DiffAttr_t);
memcpy(p_interest->m_tMsg.m_pData, p_pAttributeArray, len);
// Copy Interest attrs.
memcpy(p_interest->m_tMsg.m_pData + len,
state->MdOppGradient->m_pStdAttr,
2 * sizeof(DiffAttr_t));
//p_interest->m_tMsg.m_pData[2].m_uiAttr = 1;
// ((DiffAttr_t*)(buf - len))->m_uiAttr = 1;
// Set the next hop ??? (last hop?) of this message to be wherever the gradient is pointing
// (which should be here).
p_interest->m_uiLastHop = TD_LOCAL_ADDR;
// Set the total number of attributes
p_interest->m_uiNumAttrs = p_uiNumAttr + TD_NUM_STANDARD_INTEREST_ATTRS;
// Set the data length.
p_interest->m_tMsg.m_uiDataLen = p_interest->m_uiNumAttrs * sizeof( DiffAttr_t );
// Init the origin of this message (this node).
// p_interest->m_tMsg.m_uiSrcAddr = TOS_LOCAL_ADDRESS;
p_interest->m_tMsg.m_uiSrcAddr = state->MD_LOCAL_ADDRESS;
IntDiffMsg_queue_push(state->MdOppGradient->interest_queue, p_elem);
free(p_pAttributeArray);
return SUCCESS;
}
////////////////////////////////////////////////////////////////////////////
//
// Command: TdPubSubI.unSubscribe( )
//
// Purpose: This command removes an entry from the gradient that would
// direct it to send a matching message to the application.
//
////////////////////////////////////////////////////////////////////////////
result_t MdOppGradient_unSubscribe( Mdiff_state_t * state, int8_t p_iHandle)
{
// The interest message, in the interest queue, will remove itself
// when it comes ready to be sent; So there is no need to deal with it
// here.
elog(LOG_INFO," ");
return MdOppGradient_remGradient( state, p_iHandle );
}
////////////////////////////////////////////////////////////////////////////
//
// Command: TdGradientI.getNumGradients( )
//
// Purpose: This command returns the number of gradient entries that are
// currently active.
//
////////////////////////////////////////////////////////////////////////////
uint8_t MdOppGradient_getNumGradients( Mdiff_state_t * state )
{
//state->MdOppGradient->m_uiNumGradients;
// The queue already does this for us.
elog(LOG_INFO," ");
return DiffGradient_queue_qlen(state->MdOppGradient->gradient_queue);
}
////////////////////////////////////////////////////////////////////////////
//
// Command: TdGradientI.getGradient( )
//
// Purpose: This command copies a gradient entry into an output variable
// for a user.
//
////////////////////////////////////////////////////////////////////////////
//
// Returns SUCCESS if it successfully found the gradient by index.
//
// NOTE THAT if p_uiOutputLen is too small, the gradient is still
// copied, but only as much as the caller allowed.
//
// You can check for this situation by examining the claimed length of
// the copied gradient.
result_t MdOppGradient_getGradientByIndex( Mdiff_state_t * state,
uint8_t p_uiLogicalIndex,
DiffGradient_t *p_pOutputGradient,
uint8_t p_uiOutputLen )
{
uint8_t uiGrad = 0;
result_t tReturn = FAIL;
DiffGradient_elem_t * p_elem = DiffGradient_queue_top(state->MdOppGradient->gradient_queue);
DiffGradient_t *pGrad = NULL;
elog(LOG_INFO," ");
// Check output variable
assert(p_pOutputGradient != NULL);
assert(p_uiOutputLen > 0);
// Zero out caller's buffer.
memset(p_pOutputGradient, 0, p_uiOutputLen);
// Check index
if ( p_uiLogicalIndex < DiffGradient_queue_qlen(state->MdOppGradient->gradient_queue) )
{
// Go to the indexed gradient.
for ( uiGrad = 0; uiGrad < p_uiLogicalIndex; ++uiGrad)
{
p_elem = DiffGradient_queue_next(p_elem);
}
pGrad = p_elem->tMsg;
// Copy the gradient over.
memcpy(p_pOutputGradient,
pGrad,
p_uiOutputLen <= pGrad->m_uiLength ? p_uiOutputLen : pGrad->m_uiLength);
tReturn = SUCCESS;
}
else
{
//??? Log that the index is out of bounds
}
return tReturn;
}
////////////////////////////////////////////////////////////////////////////
//
// Command: TdGradientI.getGradientByHandle( )
//
// Purpose: This command locates a gradient using its handle, and then
// copies it into an output variable for a user.
//
////////////////////////////////////////////////////////////////////////////
//
// This is just like MdGradient_getGradientByIndex
//
result_t MdOppGradient_getGradientByHandle( Mdiff_state_t * state,
int8_t p_iHandle,
DiffGradient_t *p_pOutputGradient,
uint8_t p_uiOutputLen )
{
uint8_t uiGrad = 0;
result_t tReturn = FAIL;
DiffGradient_elem_t *p_elem = DiffGradient_queue_top(state->MdOppGradient->gradient_queue);
uint8_t qlen = DiffGradient_queue_qlen(state->MdOppGradient->gradient_queue);
DiffGradient_t * pGrad = NULL;
elog(LOG_INFO," ");
// Check output variable
assert(p_pOutputGradient != NULL);
assert(p_uiOutputLen > 0);
// Zero out caller's buffer.
memset(p_pOutputGradient, 0, p_uiOutputLen);
// Search out gradient by handle.
for (uiGrad = 0; uiGrad < qlen; ++uiGrad)
{
pGrad = p_elem->tMsg;
// Check if we have a matching handle.
if (pGrad->m_iHandle == p_iHandle)
{
memcpy(p_pOutputGradient,
pGrad,
p_uiOutputLen <= pGrad->m_uiLength ? p_uiOutputLen: pGrad->m_uiLength);
tReturn = SUCCESS;
break;
}
}
return tReturn;
}
////////////////////////////////////////////////////////////////////////////
//
// Event: Timer.fired( )
//
// Purpose: This event fires periodically and posts the timeoutGradients( )
// task and send interest from our interest queue.
//
////////////////////////////////////////////////////////////////////////////
//
// Not needed
////////////////////////////////////////////////////////////////////////////
//
// Task: matchGradient( )
//
// Purpose: This function takes a Tiny Diffusion message as a parameter
// and finds all gradient entries that match it. As a gradient is
// found, the message is sent to it and interests are used to
// update gradients and forwarded if new, and dropped if old.
//
////////////////////////////////////////////////////////////////////////////
//
// split this in two: one for data, one for interests.
//
// m_tMsg OWNED by matchGradient.
//
void MdOppGradient_matchGradient(Mdiff_state_t * state, IntDiffMsg_t * m_tMsg)
{
DiffGradient_t *pGrad = NULL;
DiffGradient_queue_t * gradient_queue = state->MdOppGradient->gradient_queue;
DiffGradient_elem_t * p_elem = DiffGradient_queue_top(gradient_queue);
//m_uiSentinel = 0;
//m_uiCount = 0;
int16_t m_iRoundIndex = -1;
uint16_t m_uiRound = 0;
int16_t m_iClassIndex = 0;
//m_uiIndex = 0;
elog(LOG_INFO," ");
// If the message is a data message
if (MdMatch_isClass(state,
TD_ATTR_DATA,
(DiffAttr_t *) m_tMsg->m_tMsg.m_pData,
m_tMsg->m_uiNumAttrs))
// if (MdMatch_oneWayMatch(state,
// state->MdOppGradient->m_pDataAttr,
// state->MdOppGradient->m_uiNumDataAttrs,
// (DiffAttr_t *) m_tMsg->m_tMsg.m_pData,
// m_tMsg->m_uiNumAttrs))
{
char ip2[16];
int2ip(m_tMsg->m_uiLastHop,ip2);
elog(LOG_INFO,"Received data from: %s",ip2);
// Loop over the gradients to see if it matches anything.
// Get a pointer to the head and then keep following next pointer.
for ( ; p_elem != NULL; p_elem = DiffGradient_queue_next(p_elem) )
{
pGrad = p_elem->tMsg;
// Check this gradient against the message
if (MdMatch_oneWayMatch(state,
(DiffAttr_t *) pGrad->m_pAttrs,
( pGrad->m_uiLength - sizeof(DiffGradient_t) ) / sizeof(DiffAttr_t),
( DiffAttr_t *) m_tMsg->m_tMsg.m_pData,
m_tMsg->m_uiNumAttrs))
{
// This gradient matches.
// Check if this gradient is a local match.
if ( (uint16_t) TD_LOCAL_ADDR == pGrad->m_uiNextHop )
{
// Send this to the application
// I still own pGrad and m_tMsg
MdData_recvMsg( state, &(m_tMsg->m_tMsg) );
}
// Else, this is a remote subscription.
// - Make sure we don't forward to original source.
// - Make sure we don't forward to last hop
else if ( (pGrad->m_uiNextHop != m_tMsg->m_tMsg.m_uiSrcAddr)
&& (pGrad->m_uiNextHop != m_tMsg->m_uiLastHop) )
{
// send the message
// We still own the message.
char ip[16];
int2ip(pGrad->m_uiNextHop,ip);
elog(LOG_INFO, "Sending data to: %s",ip);
MdComm_send(state, pGrad->m_uiNextHop, &(m_tMsg->m_tMsg) );
}
}
}
}
// Otherwise, it must be an interest.
// else if (MdMatch_oneWayMatch(state,
// state->MdOppGradient->m_pInterestAttr,
// state->MdOppGradient->m_uiNumInterestAttrs,
// (DiffAttr_t *) m_tMsg->m_tMsg.m_pData,
// m_tMsg->m_uiNumAttrs))
else if (MdMatch_isClass(state,
TD_ATTR_INTEREST,
(DiffAttr_t *) m_tMsg->m_tMsg.m_pData,
m_tMsg->m_uiNumAttrs))
{
// if it's local, then skip.
if ( (uint16_t) TD_LOCAL_ADDR == m_tMsg->m_uiLastHop
|| (uint16_t) state->MD_LOCAL_ADDRESS == m_tMsg->m_tMsg.m_uiSrcAddr )
{
// ??? Log local, so skipping.
}
// Otherwise
else
{
// Ensure the message has a CLASS attribute
if (-1 == ( m_iClassIndex = MdMatch_getAttrIndex(state,
TD_ATTR_CLASS,0,
(DiffAttr_t*)m_tMsg->m_tMsg.m_pData,
m_tMsg->m_uiNumAttrs)))
{
// ??? Log doesn't have class attribute.
}
// Ensure the message has a ROUND attribute
else if (-1 == (m_iRoundIndex = MdMatch_getAttrIndex(state,
TD_ATTR_OPP_ROUND, 0,
(DiffAttr_t*)m_tMsg->m_tMsg.m_pData,
m_tMsg->m_uiNumAttrs)))
{
// ??? Log doesn't have ROUND attribute
}
// We found the required attributes
else
{
int matchFound = 0;
// copy the round value.
m_uiRound = ((DiffAttr_t*)&( m_tMsg->m_tMsg.m_pData[ m_iRoundIndex ] ))->m_iValue;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[ m_iClassIndex ]))->m_uiAttr = TD_ATTR_NOOP;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[ m_iClassIndex ]))->m_iValue = 100;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[ m_iRoundIndex ]))->m_uiAttr = TD_ATTR_NOOP;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[ m_iRoundIndex ]))->m_iValue = 100;
// Look for gradients that match this interest.
// Gradients are held in a queue.
// Loop over the gradients to see if it matches anything.
// Get a pointer to the head and then keep following next pointer.
for ( ; p_elem != NULL; p_elem = DiffGradient_queue_next(p_elem) )
{
pGrad = p_elem->tMsg;
// Be sure we turn the CLASS and ROUND attributes to no-ops on each iteration.
// ? Why does thi sneed to be set every iteration? Does something modify it?
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[ m_iClassIndex ]))->m_uiAttr=TD_ATTR_NOOP;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[ m_iClassIndex ]))->m_iValue=100;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[ m_iRoundIndex ]))->m_uiAttr=TD_ATTR_NOOP;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[ m_iRoundIndex ]))->m_iValue=100;
// If this is a local subscription, then the interest
// message won't be pertaining to it.
if ((uint16_t)TD_LOCAL_ADDR == pGrad->m_uiNextHop)
{
// ??? Log local subscription found, skipping.
}
// If this message came from the same node that set
// this gradient up, and the interst message is
// equal to the gradient...
else if ((m_tMsg->m_tMsg.m_uiSrcAddr == pGrad->m_uiSrcAddr)
&& (SUCCESS == MdMatch_equal(state,
(DiffAttr_t*)m_tMsg->m_tMsg.m_pData,
m_tMsg->m_uiNumAttrs,
0,
(DiffAttr_t*)pGrad->m_pAttrs,
(pGrad->m_uiLength-sizeof(DiffGradient_t))
/ sizeof(DiffAttr_t),
0)))
{
matchFound = 1;
// Reset the CLASS and ROUND attributes.
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[m_iClassIndex]))->m_uiAttr=TD_ATTR_CLASS;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[m_iClassIndex]))->m_iValue=TD_ATTR_INTEREST;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[m_iRoundIndex]))->m_uiAttr=TD_ATTR_OPP_ROUND;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[m_iRoundIndex]))->m_iValue=m_uiRound;
// Update this interest so that it doesn't
// timeout, and we have the most current
// information stored (route, round, timeout).
// If this interest has an outdated round
// number, we've seen it before, or anything,
// this update call will fail.
if (SUCCESS != MdOppGradient_updateGradient(state,
pGrad->m_iHandle,
m_tMsg->m_uiLastHop,
TD_GRADIENT_TIMEOUT,
m_uiRound))
{
// ??? Log that we were unable to update the gradient
}
// If we were able to update the gradient, then
// this is a valid interest update, so we should
// re-transmit it to our neighbors.
else if (SUCCESS != MdComm_send(state, state->MD_BCAST_ADDR, &(m_tMsg->m_tMsg)))
{
// ??? Log that we were unable to send.
}
}
}
// Reset class and round before adding as gradient.
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[m_iClassIndex]))->m_uiAttr=TD_ATTR_CLASS;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[m_iClassIndex]))->m_iValue=TD_ATTR_INTEREST;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[m_iRoundIndex]))->m_uiAttr=TD_ATTR_OPP_ROUND;
((DiffAttr_t*)&(m_tMsg->m_tMsg.m_pData[m_iRoundIndex]))->m_iValue=m_uiRound;
// If we did not find a matching gradient, and this is
// not a message that we originally sent...
if ((matchFound == 0)
&& state->MD_LOCAL_ADDRESS != m_tMsg->m_tMsg.m_uiSrcAddr)
{
// ??? Log that we didn't find a matching gradient
// and will create one.
// Also, MdOppGradient_addGradient needs to own DiffAttr_t *.
DiffAttr_t * p_pAttrCpy = (DiffAttr_t *)malloc(sizeof(DiffAttr_t) * m_tMsg->m_uiNumAttrs);
memcpy(p_pAttrCpy, (DiffAttr_t*)m_tMsg->m_tMsg.m_pData, sizeof(DiffAttr_t) * m_tMsg->m_uiNumAttrs);
if (-1 == MdOppGradient_addGradient(state,
m_tMsg->m_tMsg.m_uiSrcAddr,
m_tMsg->m_uiLastHop,
p_pAttrCpy,
m_tMsg->m_uiNumAttrs,
m_uiRound))
{
// ??? Log unable to add gradient
}
// We now forward this interest to our neighbors.
if ( SUCCESS != MdComm_send( state, state->MD_BCAST_ADDR, &(m_tMsg->m_tMsg)) )
{
// ??? Log unable to send
}
}
}
}
}
//++m_tMsg->m_uiPriority;
// Pass the message (and ownership) of this message on to the
// forwarder.
if (SUCCESS != MdForwarder_recvFromFilter(state, m_tMsg))
{
// Log unable to call forwarder
}
}
////////////////////////////////////////////////////////////////////////////
//
// Task: timeoutGradients( )
//
// Purpose: This function is called periodically on a timer and evaluates
// all current gradients so that they can be removed if they are
// too old.
//
////////////////////////////////////////////////////////////////////////////
gint MdOppGradient_timeoutGradients( gpointer data, int interval, g_event_t * event)
{
Mdiff_state_t * state = (Mdiff_state_t *)data;
DiffGradient_t *pGrad = NULL;
DiffGradient_queue_t * gradient_queue = state->MdOppGradient->gradient_queue;
DiffGradient_elem_t * p_elem = DiffGradient_queue_top(gradient_queue);
DiffGradient_elem_t * p_elem_next = NULL;
elog(LOG_INFO," ");
// For each gradient...
// We have this funny structure because we are deleting elements
// from the structure we're looping over.
while (1) {
if (p_elem == NULL)
break;
p_elem_next = DiffGradient_queue_next(p_elem);
pGrad = p_elem->tMsg;
if ((uint16_t) TD_LOCAL_ADDR == pGrad->m_uiNextHop) {
// do nothing
}
else if (0 == pGrad->m_uiLastSeen) {
if (SUCCESS != MdOppGradient_remGradient(state, pGrad->m_iHandle)) {
// ??? Log Unable to remove gradient
}
}
else {
--pGrad->m_uiLastSeen;
}
p_elem = p_elem_next;
}
return TIMER_RENEW;
}
////////////////////////////////////////////////////////////////////////////
//
// Task: sendInterests( )
//
// Purpose: This task dequeues one interest message from the internal
// interest message queue. It then checks to make sure that this
// interest message still has a corresponding gradient (i.e. it
// hasn't been un-subscribed). If so, the interest message is
// sent to the network. This task reposts itself until all interest
// messages are sent once per period.
//
////////////////////////////////////////////////////////////////////////////
gint MdOppGradient_sendInterests( gpointer data,
int interval,
g_event_t * event)
{
Mdiff_state_t * state = (Mdiff_state_t *)data;
uint8_t foundMatch = 0;
int16_t iRoundIndex = 0;
DiffGradient_t *pGrad = NULL;
IntDiffMsg_t *pInt = NULL;
// For each interest, find the corresponding gradient.
IntDiffMsg_queue_t * interest_queue = state->MdOppGradient->interest_queue;
IntDiffMsg_elem_t * p_int_elem = IntDiffMsg_queue_top(interest_queue);
IntDiffMsg_elem_t * p_int_elem_next = NULL;
DiffGradient_queue_t * gradient_queue = state->MdOppGradient->gradient_queue;
DiffGradient_elem_t * p_grad_elem = DiffGradient_queue_top(gradient_queue);
elog(LOG_INFO," ");
// We use a while instead of a for to loop over the interests
// because we are deleting elements.
while(1) {
if (p_int_elem == NULL)
break;
p_int_elem_next = IntDiffMsg_queue_next(p_int_elem);
pInt = p_int_elem->tMsg;
// Find a matching gradient
for ( p_grad_elem = DiffGradient_queue_top(gradient_queue);
p_grad_elem != NULL;
p_grad_elem = DiffGradient_queue_next(p_grad_elem)) {
pGrad = p_grad_elem->tMsg;
// Skip non-local gradients.
if ( (uint16_t) TD_LOCAL_ADDR != pGrad->m_uiNextHop ) {
// do nothing
}
// if the gradient matches.
else if ( SUCCESS == MdMatch_equal(state,
(DiffAttr_t *)pGrad->m_pAttrs,
(pGrad->m_uiLength - sizeof(DiffGradient_t))
/ sizeof(DiffAttr_t),
0,
(DiffAttr_t *) pInt->m_tMsg.m_pData,
pInt->m_uiNumAttrs-2,
0 )) {
foundMatch = 1;
break;
}
}
if (1 == foundMatch) { // If we found a matching gradient
// Find the ROUND attribute.
if (-1 == (iRoundIndex = MdMatch_getAttrIndex(state,
TD_ATTR_OPP_ROUND,
0,
(DiffAttr_t*)pInt->m_tMsg.m_pData,
pInt->m_uiNumAttrs))) {
// Log Isn't a round required?
}
else { // We got ROUND. Increment it and broadcast
++((DiffAttr_t*)&(pInt->m_tMsg.m_pData[iRoundIndex]))->m_iValue;
// Set to zero if it rolls over.
if (((DiffAttr_t*)&(pInt->m_tMsg.m_pData[iRoundIndex]))->m_iValue < 0) {
((DiffAttr_t*)&(pInt->m_tMsg.m_pData[iRoundIndex]))->m_iValue = 0;
}
if (SUCCESS != MdComm_send(state, state->MD_BCAST_ADDR, &(pInt->m_tMsg))) {
// LOG unable to broadcast the interest.
}
}
}
else { // No matching gradient found. Delete interest.
IntDiffMsg_queue_remove(interest_queue, p_int_elem);
free(pInt);
free(p_int_elem);
}
p_int_elem = p_int_elem_next;
}
return TIMER_RENEW;
}
////////////////////////////////////////////////////////////////////////////
//
// Function: addGradient( )
//
// Purpose: This function creates an entry for a new gradient according to
// the attribute parameters specified.
//
////////////////////////////////////////////////////////////////////////////
//
// Assume that we own p_pAttributeArray.
//
// We don't support blobs!!!!!!!
int8_t MdOppGradient_addGradient( Mdiff_state_t * state,
uint16_t p_uiSrcAddr,
uint16_t p_uiNextHop,
DiffAttr_t *p_pAttributeArray,
uint8_t p_uiNumAttr,
uint8_t p_uiRound )
{
int8_t iReturn = -1;
size_t attr_length = (size_t)p_uiNumAttr * sizeof(DiffAttr_t);
DiffGradient_queue_t * gradient_queue = state->MdOppGradient->gradient_queue;
DiffGradient_elem_t * p_grad_elem = (DiffGradient_elem_t*)malloc(sizeof(DiffGradient_elem_t));
DiffGradient_t * pGrad = NULL;
elog(LOG_INFO," ");
memset(p_grad_elem, 0, sizeof(DiffGradient_elem_t));
p_grad_elem->tMsg = (DiffGradient_t *)malloc(sizeof(DiffGradient_t) + attr_length);
pGrad = p_grad_elem->tMsg;
pGrad->m_uiLastSeen = ( uint8_t ) TD_GRADIENT_TIMEOUT;
pGrad->m_uiRound = p_uiRound;
pGrad->m_uiLength = sizeof(DiffGradient_t) + attr_length;
pGrad->m_uiNextHop = p_uiNextHop;
pGrad->m_uiSrcAddr = p_uiSrcAddr;
pGrad->m_iHandle = iReturn = state->MdOppGradient->m_iHandleSeq++;
assert(p_pAttributeArray != NULL);
memcpy(pGrad->m_pAttrs, p_pAttributeArray, attr_length);
DiffGradient_queue_push(gradient_queue, p_grad_elem);
free(p_pAttributeArray);
return iReturn;
}
////////////////////////////////////////////////////////////////////////////
//
// Function: getGradient( )
//
// Purpose: This function locates a gradient entry and copies its contents
// into the user's output variable.
//
////////////////////////////////////////////////////////////////////////////
result_t MdOppGradient_getGradient( Mdiff_state_t * state,
uint16_t p_uiIndex,
DiffGradient_t *p_pOutputGradient,
uint8_t p_uiOutputLen )
{
result_t tReturn = FAIL;
uint16_t i = 0;
DiffGradient_queue_t * gradient_queue = state->MdOppGradient->gradient_queue;
DiffGradient_elem_t * p_grad_elem = DiffGradient_queue_top(gradient_queue);
elog(LOG_INFO," ");
assert(p_pOutputGradient != NULL);
for (i = 1; i < p_uiIndex; ++i) {
if (p_grad_elem == NULL) {
break;
}
p_grad_elem = DiffGradient_queue_next(p_grad_elem);
}
if ((p_grad_elem != NULL) && (p_grad_elem->tMsg != NULL)) {
memset(p_pOutputGradient, 0, p_uiOutputLen);
if (p_grad_elem->tMsg->m_uiLength > p_uiOutputLen) {
memcpy(p_pOutputGradient, p_grad_elem->tMsg, p_uiOutputLen);
}
else {
memcpy(p_pOutputGradient, p_grad_elem->tMsg, p_grad_elem->tMsg->m_uiLength);
}
tReturn = SUCCESS;
}
return tReturn;
}
////////////////////////////////////////////////////////////////////////////
//
// Function: updateGradient( )
//
// Purpose: This function updates the information about a gradient such as
// the neighbor, the TTL, the attributes, etc.
//
////////////////////////////////////////////////////////////////////////////
result_t MdOppGradient_updateGradient( Mdiff_state_t * state,
int8_t p_iHandle,
uint16_t p_uiNextHop,
uint8_t p_uiLastSeen,
uint8_t p_uiRound )
{
result_t tReturn = FAIL;
DiffGradient_queue_t * gradient_queue = state->MdOppGradient->gradient_queue;
DiffGradient_elem_t * p_grad_elem = DiffGradient_queue_top(gradient_queue);
DiffGradient_t * p_grad = NULL;
elog(LOG_INFO," ");
// Find the matching gradient
for ( ; p_grad_elem != NULL; p_grad_elem = DiffGradient_queue_next(p_grad_elem)) {
p_grad = p_grad_elem->tMsg;
if ((p_grad != NULL) && (p_grad->m_iHandle == p_iHandle)) {
break;
}
}
if (p_grad_elem != NULL) {
if ( (p_uiRound > p_grad->m_uiRound)
|| (p_uiRound < 10 && p_grad->m_uiRound > 100)) {
p_grad->m_uiNextHop = p_uiNextHop;
p_grad->m_uiLastSeen = p_uiLastSeen;
p_grad->m_uiRound = p_uiRound;
tReturn = SUCCESS;
}
}
return tReturn;
}
////////////////////////////////////////////////////////////////////////////
//
// Function: remGradient( )
//
// Purpose: This function removes a gradient from the cache.
//
////////////////////////////////////////////////////////////////////////////
result_t MdOppGradient_remGradient( Mdiff_state_t * state,
int8_t p_iHandle )
{
result_t tReturn = FAIL;
DiffGradient_queue_t * gradient_queue = state->MdOppGradient->gradient_queue;
DiffGradient_elem_t * p_grad_elem = DiffGradient_queue_top(gradient_queue);
elog(LOG_INFO," ");
// Find the matching gradient
for ( ; p_grad_elem != NULL; p_grad_elem = DiffGradient_queue_next(p_grad_elem)) {
if ((p_grad_elem->tMsg != NULL)
&& (p_grad_elem->tMsg->m_iHandle == p_iHandle)) {
DiffGradient_queue_remove(gradient_queue, p_grad_elem);
free(p_grad_elem->tMsg);
free(p_grad_elem);
tReturn = SUCCESS;
break;
}
}
return tReturn;
}
////////////////////////////////////////////////////////////////////////////
//
// Event: TdCommI.sendDone( )
//
// Purpose: This event is the callback from the comms layer.
//
////////////////////////////////////////////////////////////////////////////
//
//
//
// We do not own tMsg.
result_t MdData_recvMsg( Mdiff_state_t * state,
DiffMsg_t * tMsg)
{
// send message to all application clients
MdMsg_t * msg = (MdMsg_t*)malloc(sizeof(MdMsg_t) + sizeof(DiffMsg_t));
elog(LOG_INFO," ");
memset(msg,0,sizeof(MdMsg_t) + sizeof(DiffMsg_t));
msg->type = MdRecvMsg;
msg->length = sizeof(DiffMsg_t);
memcpy(msg->data, tMsg, sizeof(DiffMsg_t));
pd_receive(state->app_pd_ref, msg, sizeof(DiffMsg_t));
free(msg);
return SUCCESS;
}
result_t MdComm_send( Mdiff_state_t * state,
uint16_t ui_nextHop,
DiffMsg_t * tMsg)
{
ssize_t result = FAIL;
link_pkt_t hdr = {
dst: {
id: ui_nextHop
},
type: PKT_TYPE_MDIFF,
};
buf_t *packet = buf_new();
elog(LOG_INFO," ");
if (hdr.dst.id == state->MD_BCAST_ADDR) {
hdr.dst.id = LINK_BROADCAST;
}
else {
hdr.dst.id += state->addr_base;
}
bufcpy(packet, &hdr, sizeof(hdr));
bufcpy(packet, tMsg, sizeof(DiffMsg_t));
if (lu_send(state->link_ref, (link_pkt_t*)packet->buf,
packet->len - sizeof(hdr)) < 0) {
result = FAIL;
elog(LOG_CRIT,"unable to send message! lu_send returned %d:",result);
}
else {
result = SUCCESS;
}
buf_free(packet);
return result;
}
See more files for this project here