Show Mdiff_main.c syntax highlighted
char Mdiff_main_c_cvsid[] = "$Id: Mdiff_main.c,v 1.10 2004/05/10 22:23:33 adparker Exp $";
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include "link/link.h"
#include "link/neighbor.h"
#include "emrun/emrun.h"
#include "libmisc/misc_init.h"
#include "Mdiff.h"
#include "MdMatch.h"
#include "MdOppGradient.h"
#include "MdForwarder.h"
#include "routing/routing_table.h"
static void Mdiff_shutdown(void *data);
static int Mdiff_status_print(status_context_t *info, buf_t *buf);
static int Mdiff_status_binary(status_context_t *info, buf_t *buf);
static int Mdiff_grad_binary(status_context_t *info, buf_t *buf);
// For link
static int Mdiff_receiver(lu_context_t * link,
link_pkt_t *link_pkt,
ssize_t data_len);
static int Mdiff_filter_pd_send_cb(pd_context_t * pd, const void *packet,
int packetlen, int loop_needed);
result_t Mdiff_recvFromFilter_helper( pd_context_t * pd, const void *packet,
int packetlen );
result_t Mdiff_setPriority(pd_context_t * pd, const void *packet, int packetlen);
static int Mdiff_app_pd_send_cb(pd_context_t * pd, const void *packet,
int packetlen, int loop_needed);
static result_t Mdiff_sendmsg_helper(pd_context_t * pd, const void *packet, int packetlen);
static result_t Mdiff_subscribe_helper(pd_context_t * pd, const void *packet, int packetlen);
static result_t Mdiff_unsubscribe_helper(pd_context_t * pd, const void *packet, int packetlen);
static int neighbor_list_new(neighbor_t *nb_list, int count, void *data);
static void Mdiff_return_handle(pd_context_t *pd, int8_t iHandle);
static int Mdiff_command_start(status_context_t * info, char * command, size_t buf_size);
static void Mdiff_print_gradients(Mdiff_state_t * state, buf_t *buf);
static void Mdiff_print_gradient(DiffGradient_t * pgrad, buf_t *buf);
static void Mdiff_print_diffattr(DiffAttr_t * i, buf_t *buf);
static void Mdiff_print_interests(Mdiff_state_t * state, buf_t* buf);
static void Mdiff_print_interest(IntDiffMsg_t * pint, buf_t* buf);
static void Mdiff_print_diffmsg(DiffMsg_t *pdiff, uint8_t num, buf_t *buf);
static void int2ip(uint32_t number, char * ip);
void usage(char * name)
{
misc_print_usage
(name, "-U <dvice> [-f]",
" --uses <device>: Specify device to use.\n");
exit(1);
}
int main(int argc, char * argv[])
{
char * uses = NULL;
MdOppGradient_state_t MdOppGradient_state = {};
MdForwarder_state_t MdForwarder_state = {};
Mdiff_state_t state = {
MD_BCAST_ADDR: (uint16_t)LINK_BROADCAST,
nb_list: NULL,
MdOppGradient: &MdOppGradient_state,
MdForwarder: &MdForwarder_state
};
//sleep(5);
MdOppGradient_init( &state );
MdForwarder_init( &state);
misc_init(&argc, argv, CVSTAG);
// get the --uses arg
uses = link_parse_uses(&argc, argv, NULL);
if (uses == NULL) {
elog(LOG_CRIT, "Please specify a link to use!");
usage(argv[0]);
}
// additional args?
if (misc_args_remain(&argc, argv)) {
elog(LOG_CRIT, "Additional unparsed arguments!");
usage(argv[0]);
}
// Setup status device server.
{
status_dev_opts_t status_opts = {
device: {
devname: MDIFF_STATUS_DEV,
device_info: &state
},
printable: Mdiff_status_print,
binary: Mdiff_status_binary,
write: Mdiff_command_start
};
if (g_status_dev(&status_opts, &(state.status_ref)) < 0) {
elog(LOG_CRIT, "unable to create status device: %m");
exit(1);
}
}
// Setup status device server for gradients.
{
status_dev_opts_t status_opts = {
device: {
devname: MDIFF_GRAD_STATUS_DEV,
device_info: &state
},
binary: Mdiff_grad_binary,
printable: Mdiff_grad_binary
};
if (g_status_dev(&status_opts, &(state.grad_ref)) < 0) {
elog(LOG_CRIT, "unable to create status device: %m");
exit(1);
}
}
// Setup packet device server for filters.
{
packet_dev_opts_t filter_packet_opts = {
device: {
devname: MDIFF_FILTER_PACKET_DEV,
device_info: &state
},
send: Mdiff_filter_pd_send_cb
};
if (g_packet_dev(&filter_packet_opts, &(state.filter_pd_ref)) < 0) {
elog(LOG_CRIT, "unable to create packet device: %m");
exit(1);
}
}
// Setup packet device server for applications
{
packet_dev_opts_t app_packet_opts = {
device: {
devname: MDIFF_APP_PACKET_DEV,
device_info: &state
},
send: Mdiff_app_pd_send_cb
};
if (g_packet_dev(&app_packet_opts, &(state.app_pd_ref)) < 0) {
elog(LOG_CRIT, "unable to create packet device: %m");
exit(1);
}
}
// Setup link for sending / receiving messages.
{
if_id_t temp_id;
lu_opts_t lu_opts = {
opts: {
name: uses,
data: &state,
pkt_type: PKT_TYPE_MDIFF
},
receive: Mdiff_receiver
};
if (lu_open(&lu_opts, &state.link_ref) < 0) {
elog(LOG_CRIT, "can't open %s: %m", link_name(&lu_opts.opts, NULL));
exit(1);
}
if (!lu_get_if_id(state.link_ref, &temp_id)) {
state.MD_LOCAL_ADDRESS = (uint16_t)temp_id;
state.addr_base = 0xFFFF0000 & temp_id;
}
else {
elog(LOG_CRIT, "can't get interface id for %s.", link_name(&lu_opts.opts,NULL));
exit(1);
}
}
// Setup neighbor list
{
neighbor_opts_t n_opts = {
link_name: uses,
new_list: neighbor_list_new,
data: &state
};
if (g_neighbors(&n_opts, NULL) < 0) {
elog(LOG_CRIT, "Failed to open neighborlist");
exit(1);
}
}
// Setup emrun
{
emrun_opts_t emrun_opts = {
shutdown: Mdiff_shutdown,
data: &state
};
emrun_init(&emrun_opts);
}
g_status_dev_notify(state.status_ref);
g_main();
elog(LOG_ALERT, "Event system terminated abnormally.");
return 1;
}
static void Mdiff_shutdown(void *data)
{
elog(LOG_NOTICE, "Mdiff module shutting down.");
exit(0);
}
static int Mdiff_status_print(status_context_t * info, buf_t *buf)
{
Mdiff_state_t * state = (Mdiff_state_t *)sd_data(info);
bufprintf(buf, "Hello.\n");
// Print list of gradients and interests.
bufprintf(buf, "GRADIENTS********************************\n");
Mdiff_print_gradients(state, buf);
bufprintf(buf, "INTERESTS********************************\n");
Mdiff_print_interests(state, buf);
return STATUS_MSG_COMPLETE;
}
static void Mdiff_print_interests(Mdiff_state_t * state, buf_t *buf)
{
IntDiffMsg_elem_t * p_int_elem = IntDiffMsg_queue_top(state->MdOppGradient->interest_queue);
for (; p_int_elem != NULL; p_int_elem = IntDiffMsg_queue_next(p_int_elem)) {
Mdiff_print_interest(p_int_elem->tMsg, buf);
}
}
static void Mdiff_print_interest(IntDiffMsg_t * pint, buf_t*buf)
{
char ip[16];
DiffMsg_t * pdiff = &pint->m_tMsg;
int2ip(pint->m_uiLastHop,ip);
bufprintf(buf,"INTEREST***********\n");
bufprintf(buf,"m_uiPriority: %u\n",pint->m_uiPriority);
bufprintf(buf,"m_uiNumAttrs: %u\n",pint->m_uiNumAttrs);
bufprintf(buf,"m_uiLastHop: %s\n",ip);
Mdiff_print_diffmsg(pdiff, pint->m_uiNumAttrs, buf);
}
static void Mdiff_print_diffmsg(DiffMsg_t *pdiff, uint8_t num, buf_t *buf)
{
char ip[16];
uint8_t i;
DiffAttr_t * pattr = (DiffAttr_t*)pdiff->m_pData;
int2ip(pdiff->m_uiSrcAddr,ip);
bufprintf(buf,"DiffMsg**********\n");
bufprintf(buf,"m_uiSrcAddr: %s\n",ip);
bufprintf(buf,"m_uiTtl: %u\n",pdiff->m_uiTtl);
bufprintf(buf,"m_uiSeq: %u\n",pdiff->m_uiSeq);
bufprintf(buf,"m_uiDataLen: %u\n",pdiff->m_uiDataLen);
for ( i = 0; i < num; ++i) {
Mdiff_print_diffattr( (DiffAttr_t*) (pattr + i * sizeof(DiffAttr_t)), buf);
}
}
static void Mdiff_print_gradients(Mdiff_state_t * state, buf_t *buf)
{
DiffGradient_elem_t * p_grad_elem = DiffGradient_queue_top(state->MdOppGradient->gradient_queue);
for (; p_grad_elem != NULL; p_grad_elem = DiffGradient_queue_next(p_grad_elem)) {
Mdiff_print_gradient(p_grad_elem->tMsg, buf);
}
}
static void Mdiff_print_gradient(DiffGradient_t * pgrad, buf_t *buf)
{
char ip[16];
int num = (pgrad->m_uiLength - sizeof(DiffGradient_t)) / sizeof(DiffAttr_t);
int index = 0;
bufprintf(buf, "DiffGradient********\n");
bufprintf(buf, "m_iHandle: %d\n", pgrad->m_iHandle);
bufprintf(buf, "m_uiLastSeen: %u\n",pgrad->m_uiLastSeen);
int2ip(pgrad->m_uiSrcAddr,ip);
bufprintf(buf, "m_uiSrcAddr: %s\n",ip);
int2ip(pgrad->m_uiNextHop,ip);
bufprintf(buf, "m_uiNextHop: %s\n",ip);
bufprintf(buf, "m_uiLength: %u\n",pgrad->m_uiLength);
bufprintf(buf, "m_uiRound: %u\n",pgrad->m_uiRound);
for (index = 0; index < num; ++index) {
Mdiff_print_diffattr((DiffAttr_t*)(pgrad->m_pAttrs + index * sizeof(DiffAttr_t)) , buf);
}
}
static void Mdiff_print_diffattr(DiffAttr_t * i, buf_t *buf)
{
bufprintf(buf, "DiffAttr**********\n");
bufprintf(buf, "m_uiAttr: %u\t",i->m_uiAttr);
bufprintf(buf, "m_uiOp: %u\t",i->m_uiOp);
bufprintf(buf, "m_iValue: %d\t\n",i->m_iValue);
}
static int Mdiff_status_binary(status_context_t *info, buf_t *buf)
{
Mdiff_state_t * state = (Mdiff_state_t*)sd_data(info);
//route_entry_t route = {
// next_hop:
//};
bufcpy(buf, state, sizeof(Mdiff_state_t));
return STATUS_MSG_COMPLETE;
}
static int Mdiff_grad_binary(status_context_t *info, buf_t *buf)
{
Mdiff_state_t * state = (Mdiff_state_t*)sd_data(info);
DiffGradient_elem_t * p_grad_elem = DiffGradient_queue_top(state->MdOppGradient->gradient_queue);
for (; p_grad_elem != NULL; p_grad_elem = DiffGradient_queue_next(p_grad_elem)) {
route_entry_t e_r = {
dst: p_grad_elem->tMsg->m_uiSrcAddr,
next_hop: p_grad_elem->tMsg->m_uiNextHop,
seqno: p_grad_elem->tMsg->m_uiRound,
};
bufcpy(buf, &e_r, sizeof(e_r));
}
return STATUS_MSG_COMPLETE;
}
static int Mdiff_receiver(lu_context_t * link, link_pkt_t *link_pkt, ssize_t data_len)
{
Mdiff_state_t * state = (Mdiff_state_t *)lu_data(link);
neighbor_t * nb_list = state->nb_list;
int nb_count = state->nb_count;
int i = 0;
if_id_t last_hop = link_pkt->src.id;
DiffMsg_t * p_pMsg = NULL;
char ip[16];
int found_neighbor = 0;
elog(LOG_INFO," ");
assert(sizeof(DiffMsg_t) == data_len);
// cycle through nb until you find neighbor.
for (i = 0; i < nb_count; ++i) {
if (nb_list[i].if_id == last_hop) {
found_neighbor = 1;
int2ip(nb_list[i].if_id,ip);
if (nb_list[i].state == ACTIVE) {
elog(LOG_INFO,"ACCEPTING message from node: %u, if:%s",nb_list[i].node_id, ip);
p_pMsg = (DiffMsg_t*)malloc(sizeof(DiffMsg_t));
memcpy(p_pMsg, (DiffMsg_t*)link_pkt->data, sizeof(DiffMsg_t));
// passing ownership of p_pMsg.
MdForwarder_sendMsg(state, (uint16_t)last_hop, p_pMsg);
}
else {
elog(LOG_INFO,"REJECTING message from node: %u, if:%s",nb_list[i].node_id, ip);
}
break;
}
}
if (found_neighbor == 0) {
int2ip(last_hop, ip);
elog(LOG_CRIT,"REJECTING message from non-neighbor node: %s",ip);
}
free(link_pkt);
return EVENT_RENEW;
}
static int Mdiff_filter_pd_send_cb(pd_context_t * pd, const void *packet,
int packetlen, int loop_needed)
{
MdMsg_t * msg = (MdMsg_t *)packet;
// switch on packet type.
elog(LOG_INFO," ");
switch(msg->type)
{
case MdRecvFromFilter:
Mdiff_recvFromFilter_helper(pd, packet, packetlen);
break;
case MdSetPriority:
Mdiff_setPriority(pd, packet, packetlen);
break;
default:
elog(LOG_CRIT,"Unable to handle filter pd message type: %u",msg->type);
exit(1);
}
if (loop_needed) {
pd_loop_receive(pd, packet, packetlen);
}
return 0;
}
result_t Mdiff_recvFromFilter_helper( pd_context_t * pd, const void *packet,
int packetlen )
{
Mdiff_state_t * state = (Mdiff_state_t*)pd_data(pd);
MdMsg_t * msg = (MdMsg_t *)packet;
IntDiffMsg_t * idmsg = (IntDiffMsg_t*)malloc(sizeof(IntDiffMsg_t));
elog(LOG_INFO," ");
memcpy(idmsg, (IntDiffMsg_t*)msg->data, sizeof(IntDiffMsg_t));
MdForwarder_recvFromFilter(state, idmsg);
return SUCCESS;
}
result_t Mdiff_setPriority(pd_context_t * pd, const void *packet, int packetlen)
{
pd_client_t * current = pd_curr_client(pd);
MdMsg_t *msg = (MdMsg_t*)packet;
uint8_t * priority = (uint8_t*)malloc(sizeof(uint8_t));
elog(LOG_INFO," ");
memcpy(priority, (uint8_t*)msg->data, sizeof(uint8_t));
pd_set_client_data(current, priority);
return SUCCESS;
}
static int Mdiff_app_pd_send_cb(pd_context_t * pd, const void *packet,
int packetlen, int loop_needed)
{
MdMsg_t * msg = (MdMsg_t *)packet;
elog(LOG_INFO," ");
// switch on packet type.
switch(msg->type)
{
case MdSendMsg:
elog(LOG_INFO,"msg length: %d", packetlen);
Mdiff_sendmsg_helper(pd, packet, packetlen);
break;
case MdSubscribe:
Mdiff_subscribe_helper(pd, packet, packetlen);
break;
case MdUnsubscribe:
Mdiff_unsubscribe_helper(pd, packet, packetlen);
break;
default:
elog(LOG_CRIT,"unable to handle app pd message type: %u",msg->type);
exit(1);
}
if (loop_needed) {
pd_loop_receive(pd, packet, packetlen);
}
return 0;
}
static result_t Mdiff_sendmsg_helper(pd_context_t * pd, const void *packet, int packetlen)
{
Mdiff_state_t * state = (Mdiff_state_t*)pd_data(pd);
MdMsg_t * msg = (MdMsg_t*)packet;
DiffMsg_t * dmsg = (DiffMsg_t*)malloc(msg->length);
elog(LOG_INFO," ");
memcpy(dmsg, (DiffMsg_t*)msg->data, msg->length);
MdForwarder_sendMsg(state, TD_LOCAL_ADDR, dmsg);
return SUCCESS;
}
static result_t Mdiff_subscribe_helper(pd_context_t * pd, const void *packet, int packetlen)
{
Mdiff_state_t * state = (Mdiff_state_t*)pd_data(pd);
MdMsg_t * msg = (MdMsg_t*)packet;
DiffAttr_t * p_pAttributeArray = (DiffAttr_t*)malloc(msg->length);
uint8_t p_uiNumAttr = msg->length / sizeof(DiffAttr_t);
int8_t iHandle = 0;
elog(LOG_INFO," ");
memcpy(p_pAttributeArray, (DiffAttr_t*)msg->data, msg->length);
iHandle = MdOppGradient_subscribe(state, p_pAttributeArray, p_uiNumAttr);
elog(LOG_INFO, "Sending handle %d", iHandle);
Mdiff_return_handle(pd, iHandle);
return SUCCESS;
}
static void Mdiff_return_handle(pd_context_t *pd, int8_t iHandle)
{
// Get current pd client
pd_client_t * current = pd_curr_client(pd);
// create MdMsg_t containing iHandle.
MdMsg_t * msg = (MdMsg_t*)malloc(sizeof(MdMsg_t) + sizeof(int8_t));
msg->type = MdHandle;
msg->length = sizeof(int8_t);
memcpy(msg->data, &iHandle, sizeof(int8_t));
elog(LOG_INFO,"Sending handle: %d", *((int8_t*)msg->data));
// send msg.
pd_client_receive(pd, current, msg, sizeof(MdMsg_t) + sizeof(int8_t), NULL);
// free msg.
free(msg);
}
static result_t Mdiff_unsubscribe_helper(pd_context_t * pd, const void *packet, int packetlen)
{
Mdiff_state_t * state = (Mdiff_state_t*)pd_data(pd);
MdMsg_t * msg = (MdMsg_t*)packet;
int8_t iHandle = *((int8_t*)msg->data);
elog(LOG_INFO," ");
MdOppGradient_unSubscribe(state, iHandle);
return SUCCESS;
}
static int neighbor_list_new(neighbor_t *nb_list, int count, void *data)
{
Mdiff_state_t * state = (Mdiff_state_t*)data;
// elog(LOG_INFO," ");
if (state->nb_list != NULL) {
free(state->nb_list);
}
state->nb_list = nb_list;
state->nb_count = count;
return EVENT_RENEW;
}
static int Mdiff_command_start(status_context_t * info, char * command, size_t buf_size)
{
int start = 0;
Mdiff_state_t * state = (Mdiff_state_t *)sd_data(info);
parser_state_t p_state = {
input: command,
input_len: buf_size
};
elog(LOG_INFO," ");
while(misc_parse_next_kvp(&p_state) >= 0) {
if (strcmp("start",p_state.key) == 0) {
start = 1;
}
}
if (start) {
MdOppGradient_start(state);
MdForwarder_start(state);
}
return EVENT_RENEW;
}
static void int2ip(uint32_t number, char * ip)
{
snprintf(ip,16,"%u.%u.%u.%u",
(number & 0xff000000) >> 24,
(number & 0x00ff0000) >> 16,
(number & 0x0000ff00) >> 8,
(number & 0x000000ff));
}
See more files for this project here