resdisc_query.c from EmStar at Krugle
Show resdisc_query.c syntax highlighted
/*
*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <ctype.h>
#include <sys/time.h>
#include <arpa/inet.h>
#include <math.h>
#include "emrun/emrun.h"
#include "link/link.h"
#include <link/link_headers.h>
#include <libmisc/misc.h>
#include <libdev/status_dev.h>
#include <libdev/command_dev.h>
#include <libdev/query_dev.h>
#include <libdev/packet_client.h>
#include <libdev/packet_dev.h>
#include <libdev/em_sched.h>
//#include <../centroute/centroute_i.h>
#include <resdisc_i.h>
#include <routing/routing_table.h>
#include <libdev/command_dev.h>
#include <devel/state/ssync.h>
#ifndef SECOND
#define SECOND 1000
#endif
#ifndef TOS_BCAST_ADDR
#define TOS_BCAST_ADDR 0xFFFF
#endif
int handle_exists_query(rq_av_t *av, rd_state_t *rdstate, rd_node_t **nodelist);
int handle_sensor_query(rq_av_t *av, rd_state_t *rdstate, rd_node_t **nodelist);
int handle_area_query(rq_av_t *av, rd_state_t *rdstate, rd_node_t **nodelist);
int generate_and_send_query_reply(rq_pkt_hdr_t *req_hdr, node_id_t dst,
rd_state_t *rdstate, int32_t num_nodes, rd_node_t *nodelist);
int send_query_fail(node_id_t dst, rq_pkt_hdr_t *req_hdr, rd_state_t *rdstate);
int handle_exists_query(rq_av_t *av, rd_state_t *rdstate, rd_node_t **nodelist)
{
int num_nodes=0;
node_element_t *node=NULL;
rd_node_t *tmp=NULL;
if (*nodelist!=NULL) {
elog(LOG_ERR, "Nodelist is NOT NULL!\n");
exit(1);
}
// The EXISTS query ignores the VALUE and OPERATOR part of the av
// so what we need to do is just copy all node ids found in the QUEUE
// and their corresponding shepherd
num_nodes = nodes_list_qlen(rdstate);
// we malloc num nodes for the nodelist
*nodelist = malloc((sizeof(rd_node_t)*num_nodes));
memset(*nodelist, 0, (num_nodes * sizeof(rd_node_t)));
tmp = *nodelist;
for (node = nodes_list_top(rdstate); node;
node = nodes_list_next(node)) {
tmp->node_id = node->resentry.node_id;
// always include the shepherd id for now (don't make it 0 if auth)
tmp->authoritative = node->resentry.shepherd_id;
// assignment done, increment tmp;
tmp++;
}
// all done, return num_nodes
return num_nodes;
}
int handle_sensor_query(rq_av_t *av, rd_state_t *rdstate, rd_node_t **nodelist)
{
return 0;
}
int handle_area_query(rq_av_t *av, rd_state_t *rdstate, rd_node_t **nodelist)
{
return 0;
}
int handle_query_request(node_id_t src,
rq_pkt_hdr_t *hdr, int len, rd_state_t *rdstate)
{
rq_av_t *av = (rq_av_t *)(hdr->data);
int32_t num_nodes=0;
rd_node_t *nodelist=NULL;
switch (av->attribute) {
case EXISTS:
num_nodes = handle_exists_query(av, rdstate, &nodelist);
break;
case SENSOR:
num_nodes = handle_sensor_query(av, rdstate, &nodelist);
break;
case AREA:
num_nodes = handle_area_query(av, rdstate, &nodelist);
break;
default:
elog(LOG_ERR, "Got unsupported ATTRIBUTE %d\n",
av->attribute);
break;
}
if (num_nodes>0) {
generate_and_send_query_reply(hdr, src, rdstate, num_nodes, nodelist);
} else {
send_query_fail(src, hdr, rdstate);
}
return 0;
}
int generate_and_send_query_reply(rq_pkt_hdr_t *req_hdr, node_id_t dst,
rd_state_t *rdstate, int32_t num_nodes, rd_node_t *nodelist)
{
int retval=0;
char *buf=NULL;
rq_av_t *rq_av = (rq_av_t *)(req_hdr->data);
link_pkt_t *pkt=NULL;
rq_pkt_hdr_t *rply_hdr=NULL;
rq_rply_pkt_t *rply=NULL;
int nodelist_len = num_nodes * sizeof(rd_node_t);
int payload_len = sizeof(rq_pkt_hdr_t) + sizeof(rq_rply_pkt_t) +
nodelist_len;
int total_len = payload_len + sizeof(link_pkt_t);
int n=0;
buf = malloc(total_len);
memset(buf, 0, total_len);
// set link pkt headers
pkt = (link_pkt_t *)buf;
pkt->src.id = my_node_id;
pkt->dst.id = dst;
pkt->type = PKT_TYPE_USER(RESDISC_QUERY);
rply_hdr = (rq_pkt_hdr_t *)(pkt->data);
rply = (rq_rply_pkt_t *)(rply_hdr->data);
// set reply hdr
rply_hdr->tid = req_hdr->tid;
rply_hdr->type = RESQ_REPLY;
// set reply
rply->retcode = RESQ_SUCCESS;
rply->num_nodes = num_nodes;
// copy request av
memcpy(&rply->request, rq_av, sizeof(rq_av_t));
// copy node list in rply pkt
memcpy(rply->nodelist, nodelist, nodelist_len);
// ready to go!
n = lu_send(rdstate->query_data_link, pkt, payload_len);
if (n<0) {
elog(LOG_ERR, "Can't send query reply pkt to %s: %m",
lu_name(rdstate->query_data_link, NULL));
retval = -1;
goto done;
} else {
elog(LOG_NOTICE, "Query reply to node %u containing %d nodes sent!\n",
dst, num_nodes);
}
done:
// free pointers
free(nodelist);
free(buf);
return retval;
}
int send_query_fail(node_id_t dst, rq_pkt_hdr_t *req_hdr, rd_state_t *rdstate)
{
int retval=0;
char *buf=NULL;
link_pkt_t *pkt=NULL;
rq_pkt_hdr_t *rply_hdr=NULL;
rq_rply_pkt_t *rply=NULL;
int payload_len = sizeof(rq_pkt_hdr_t) + sizeof(rq_rply_pkt_t);
int total_len = payload_len + sizeof(link_pkt_t);
int n=0;
buf = malloc(total_len);
memset(buf, 0, total_len);
// set link pkt headers
pkt = (link_pkt_t *)buf;
pkt->src.id = my_node_id;
pkt->dst.id = dst;
pkt->type = PKT_TYPE_USER(RESDISC_QUERY);
rply_hdr = (rq_pkt_hdr_t *)(pkt->data);
rply = (rq_rply_pkt_t *)(rply_hdr->data);
// set reply hdr
rply_hdr->tid = req_hdr->tid;
rply_hdr->type = RESQ_REPLY;
rply->retcode = ERESQ_NOT_FOUND;
rply->num_nodes=0;
n = lu_send(rdstate->query_data_link, pkt, payload_len);
if (n<0) {
elog(LOG_ERR, "Can't send query reply pkt to %s: %m",
lu_name(rdstate->query_data_link, NULL));
retval = -1;
goto done;
} else {
elog(LOG_NOTICE, "Query reply to node %u sent (retcode=%d) \n",
dst, rply->retcode);
}
done:
free(buf);
// set reply
return 0;
}
See more files for this project here