Show SwAgent.c syntax highlighted
/*
*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/*
* Copyright (c) 2001 The Regents of the University of California.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
* 3. All advertising materials mentioning features or use of this
* software must display the following acknowledgement:
* This product includes software developed by Networked &
* Embedded Systems Lab at UCLA
* 4. Neither the name of the University nor that of the Laboratory
* may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* Author: Simon Han (simonhan@ee.ucla.edu)
*/
#include "sensorware.h"
/*
#ifndef __USE_GNU
#define __USE_GNU
#endif
#ifndef __USE_ISOC99
#define __USE_ISOC99
#endif
*/
#ifdef COMPILE_MEASURE
extern int maxfunc(Tcl_Interp *interp);
#endif
pthread_key_t agent_key;
static unsigned int termInstance = 2;
int swAgentApiInit(Tcl_Interp *interp);
int swTermApiInit(Tcl_Interp *interp);
// this routine initializes agent
// the job here is to initialize common apis,
// allocate space for agent, open mailbox
// NOTE: this routine is used in terminal and each agents
// terminal specific initialization is handled separately.
SwAgent* swAgentInit(Tcl_Interp *interp, SwAddress *addr, char *code)
// interp : tcl interpreter
// family : agent family id
// code : tcl script
{
SwAgent *agent = NULL;
register int i;
// allocate memory for storing per agent data structure
if((agent = (SwAgent*)ckalloc(sizeof(SwAgent))) == NULL){
//fprintf(stderr, "no memory for agent\n");
return NULL;
}
agent->code = NULL;
// assign agent address
agent->mailbox.addr.user = addr->user;
agent->mailbox.addr.family = addr->family;
agent->mailbox.addr.instance = addr->instance;
// open agent mailbox
if(swMailOpen(&(agent->mailbox)) < 0){
//fprintf(stderr, "Could not open a mail box\n");
goto agent_init_failed;
}
// allocate space for storing tcl script
if(code != NULL){
agent->code = ckalloc(strlen(code) + 1);
if(agent->code == NULL){
swMailClose(&(agent->mailbox));
goto agent_init_failed;
}
strcpy(agent->code, code);
}
else{
agent->code = NULL;
}
// store tcl interpreter
agent->interp = interp;
if(pthread_setspecific(agent_key, agent) != 0){
fprintf(stderr, "setspecific failed\n");
swMailClose(&(agent->mailbox));
goto agent_init_failed;
}
// initialize interest table related variables
Tcl_InitHashTable(&(agent->intTable), TCL_STRING_KEYS);
agent->waitHistory = 0;
agent->emptyIndex = 0;
agent->numInt = 0;
for(i = 0; i < MAXNUMBERINTEREST; i++){
swInterestInit(&(agent->interest[i]), i, &(agent->mailbox.addr));
}
swNetInterestInit(interp);
return agent;
agent_init_failed:
if(agent->code) {ckfree(agent->code);}
if(agent) {ckfree((void*)agent);}
return NULL;
}
// this routing is called when pthread_exit() is called
// the main job is to free memory allocation and free
// tcl interpreter. Also, it calls device specific termination
// routines to free up any resource stored in device struct
void swAgentFinal(void *arg)
// arg : pointer to SwAgent
{
SwAgent *agent = (SwAgent *)arg;
DEBUG("agent final\n");
// free memory space for storing tcl script
if(agent->code != NULL){
ckfree(agent->code);
}
// delete tcl interpreter
Tcl_DeleteInterp(agent->interp);
{
// delete interests from hash table
Tcl_HashSearch s;
Tcl_HashEntry *intEntry;
while((intEntry = Tcl_FirstHashEntry(&(agent->intTable), &s)) != NULL){
SwInterest *interest = (SwInterest *)Tcl_GetHashValue(intEntry);
swInterestDispose(interest);
Tcl_DeleteHashEntry(intEntry);
}
Tcl_DeleteHashTable(&agent->intTable);
}
swMailClose(&(agent->mailbox));
ckfree((void*)agent);
}
inline void freeAgentMessage(SwMessage *msg){
if(msg->data){ckfree(msg->data);}
ckfree((void*)msg);
}
// the agent thread
// there are two places where the thread will be invoked
// 1. in Tcl_SwSpawn_Cmd()
// 2. in swNetRecv() from SwNet.c
void *swAgentThread(void *arg)
// arg : pointer to SwMessage
{
// create tcl interpreter
Tcl_Interp *interp = Tcl_CreateInterp();
// agent struct
SwAgent *agent;
// the message stored with tcl script
SwMessage *msgPtr = (SwMessage*)arg;
// script size in bytes
int code_size = strlen(msgPtr->data) + 1;
// parsed size.
int parsed_size = code_size;
//struct timeval mT;
if(interp == NULL || msgPtr == NULL) {
pthread_exit(NULL);
}
// initialize apis
if(msgPtr == NULL ||
(agent = swAgentInit(interp, &(msgPtr->hdr.dst), msgPtr->data))
== NULL){
// initialization failed
freeAgentMessage(msgPtr);
Tcl_DeleteInterp(interp);
pthread_exit(NULL);
return NULL;
}
// parse optional agent variables
while(parsed_size < msgPtr->hdr.size){
// initialize variables
int name_size = strlen(msgPtr->data + parsed_size) + 1;
int val_size = strlen(msgPtr->data + parsed_size + name_size) + 1;
Tcl_SetVar(interp, msgPtr->data + parsed_size,
msgPtr->data + parsed_size + name_size,
TCL_LEAVE_ERR_MSG);
parsed_size += name_size + val_size;
}
assert(parsed_size == msgPtr->hdr.size);
freeAgentMessage(msgPtr);
// run tcl engine
//swMeasureCtime(&mT);
Tcl_Eval(interp, agent->code, 0, (char **)NULL);
//swMeasurePrintTimeDiff(&mT);
// *** will be called automatically in pthread_exit()
// swAgentFinal();
pthread_exit(NULL);
return NULL;
}
// tcl interface for spawn, replicate, and migrate
int Tcl_SwSpawn_Cmd(ClientData clientData, Tcl_Interp *interp,
int argc, char *argv[])
{
// get agent structure
SwAgent *agent = pthread_getspecific(agent_key);
// get node id
int id = agent->mailbox.addr.node;
// target node id
int node = 0;
// target family id
int family_id = 0;
// buffer for storing script and variables
char *buf;
// target address
SwAddress dst;
// script size
int code_size;
// total packet size
int pkt_size;
register int i = 0;
int curr_size = 0;
// script
char *code;
// index in argv for agent variables
int varIdx;
if(argc < 2){
Tcl_SetResult(interp, "wrong number of arguements", TCL_VOLATILE);
return TCL_ERROR;
}
if((argc >= 2) && (sscanf(argv[1], "%d", &node) != 1)){
Tcl_SetResult(interp, "invalid node id", TCL_VOLATILE);
return TCL_ERROR;
}
if(argv[0][0] == 's' && argc >= 3 && sscanf(argv[2], "# family %d", &family_id) == 1){
// spawning
code = argv[2];
varIdx = 3;
}
else if(argv[0][0] != 's' && (agent->code != NULL)){
// replication or migration
code = agent->code;
family_id = agent->mailbox.addr.family;
varIdx = 2;
}
else{
Tcl_SetResult(interp, "invalid format", TCL_VOLATILE);
return TCL_ERROR;
}
if(family_id == 1){
Tcl_SetResult(interp, "invalid family id", TCL_VOLATILE);
return TCL_ERROR;
}
// construct agent structure...
// compute total pkt size
// compute code size
code_size = strlen(code) + 1;
pkt_size = code_size;
// compute variable sizes
for(i = varIdx; i < argc; i++){
char *var_value;
if((var_value = Tcl_GetVar(interp, argv[i], 0)) == NULL){
continue;
}
pkt_size += strlen(argv[i]) + 1 + strlen(var_value) + 1;
}
// allocate buffer for script + variables
if((buf = (char *) ckalloc(pkt_size)) == NULL){
Tcl_SetIntResult(interp, -ENOMEM);
return TCL_OK;
}
// XXX need to check upper size characters
strcpy(buf, code);
curr_size = code_size;
for(i = varIdx; i < argc; i++){
// copy variables
char *var_value;
if((var_value = Tcl_GetVar(interp, argv[i], 0)) != NULL){
int name_size = strlen(argv[i]) + 1;
int val_size = strlen(var_value) + 1;
strcpy(&buf[curr_size], argv[i]);
strcpy(&buf[curr_size + name_size], var_value);
curr_size += name_size + val_size;
}
}
assert(curr_size == pkt_size);
// create new agent...
dst.node = node;
dst.user = agent->mailbox.addr.user;
dst.family = (unsigned char)family_id;
if(agent->mailbox.addr.family == 1){
dst.instance = termInstance++;
//if(termInstance == 255){ termInstance++; }
}
else{
dst.instance = agent->mailbox.addr.instance;
}
if(node == id){
// loop back
pthread_t pid;
// allocate msg, will be freed in agent thread
SwMessage *msgPtr = (SwMessage *)ckalloc(sizeof(SwMessage));
if(msgPtr == NULL){
ckfree(buf);
Tcl_SetIntResult(interp, -ENOMEM);
return TCL_OK;
}
swMailConstructMsgHdr(&(msgPtr->hdr), NET_MSG_AGENT, &dst, &(agent->mailbox.addr),
pkt_size);
msgPtr->data = buf;
if(swMailReserve(&(msgPtr->hdr.dst)) == 0){
// create new thread and pass msg as arguement
if(pthread_create(&pid, &thread_attr, swAgentThread, msgPtr) != 0){
//fprintf(stderr, "unable to create thread\n");
ckfree(buf);
ckfree((void*)msgPtr);
Tcl_SetIntResult(interp, -ENOMEM);
return TCL_OK;
}
Tcl_SetIntResult(interp, pkt_size);
}
else{
Tcl_SetIntResult(interp, 0);
return TCL_OK;
}
}
else{
// send to the network
SwNetPacketHead pkt;
int return_val;
swMailConstructMsgHdr(&(pkt.msghdr),
NET_MSG_AGENT, &dst, &(agent->mailbox.addr),
pkt_size);
return_val = swNetSendNet(node, &pkt, buf, pkt_size, MAC_MSG_DATA);
ckfree(buf);
//if(return_val <= 0){
Tcl_SetIntResult(interp, return_val);
//return TCL_OK;
//}
}
if(argv[0][0] == 'm'){
// if migration, kill the agent by return TCL_ERROR
return TCL_ERROR;
}
else{
return TCL_OK;
}
}
// helper functions
// They are all specific to the calling thread.
// get agent's own address
inline SwAddress swAgentGetAddress(){
SwAgent *agent = pthread_getspecific(agent_key);
return agent->mailbox.addr;
}
// mailbox helper function
inline SwMailbox* swAgentGetMailboxHandle(){
SwAgent *agent = pthread_getspecific(agent_key);
return &(agent->mailbox);
}
// interest table helper functions
// create a space for interest
// first, we search to see if interest with name exist,
// if so, we dispose such interest for the new interest
// if not, we search for a space
SwInterest* swAgentCreateInterest(char *name){
SwAgent *agent = pthread_getspecific(agent_key);
int createSucceed;
Tcl_HashEntry *intEntry;
if(agent->numInt == MAXNUMBERINTEREST){ return NULL; }
intEntry = Tcl_CreateHashEntry(&(agent->intTable), name, &createSucceed);
if(!createSucceed){
// entry exist
SwInterest *interest = (SwInterest*)Tcl_GetHashValue(intEntry);
swInterestDispose(interest);
swInterestCreate(interest,
Tcl_GetHashKey(&agent->intTable, intEntry));
agent->waitHistory &= (~(1 << interest->idx));
return interest;
}
// otherwise, we find a slot for this interest
{
register int i;
for(i = agent->emptyIndex;
agent->interest[i % MAXNUMBERINTEREST].state != DISPOSED;
i++);
agent->numInt++;
// we point the emptyIndex to be the next slot since
// it is most likely place
agent->emptyIndex = (i + 1) % MAXNUMBERINTEREST;
i = i % MAXNUMBERINTEREST;
Tcl_SetHashValue(intEntry, &(agent->interest[i]));
swInterestCreate(&agent->interest[i],
Tcl_GetHashKey(&agent->intTable, intEntry));
agent->waitHistory &= (~(1 << (agent->interest[i].idx)));
return &(agent->interest[i]);
}
}
void swAgentDestroyInterest(char *name){
SwAgent *agent = pthread_getspecific(agent_key);
Tcl_HashEntry *intEntry;
SwInterest *interest;
if((intEntry = Tcl_FindHashEntry(&agent->intTable, name)) == NULL){
return;
}
interest = (SwInterest*)Tcl_GetHashValue(intEntry);
swInterestDispose(interest);
agent->numInt--;
// point emptyIndex to here
agent->emptyIndex = interest->idx;
Tcl_DeleteHashEntry(intEntry);
}
SwInterest* swAgentGetInterest(char *name){
SwAgent *agent = pthread_getspecific(agent_key);
Tcl_HashEntry *intEntry;
SwInterest *interest;
if((intEntry = Tcl_FindHashEntry(&agent->intTable, name)) == NULL){
return NULL;
}
interest = (SwInterest*)Tcl_GetHashValue(intEntry);
return interest;
}
inline int swAgentGetInterestIndex(char *name){
SwInterest *interest = swAgentGetInterest(name);
if(interest == NULL){
return -1;
}
return interest->idx;
}
// based on the interest names provided,
// we xor with history, and then toggle interest
// XXX not efficient...
int swAgentToggleInterest(int argc, char *argv[]){
SwAgent *agent = pthread_getspecific(agent_key);
int history;
int current = 0;
register int i = 0;
history = agent->waitHistory;
// compute current vector
for(i = 0; i < argc; i++){
Tcl_HashEntry *intEntry;
SwInterest *interest;
int idx;
if((intEntry = Tcl_FindHashEntry(&agent->intTable, argv[i])) == NULL){
return i;
}
interest = (SwInterest*)Tcl_GetHashValue(intEntry);
idx = swInterestGetIdx(interest);
current |= (1 << idx);
}
agent->waitHistory = current;
current ^= history;
for(i = 0; i < MAXNUMBERINTEREST; i++, current >>= 1){
if(current & 1){
swInterestToggle(&(agent->interest[i]));
}
}
return -1;
}
See more files for this project here