Show bundleserver.c syntax highlighted
char bundlerserver_c_cvsid[] = "$Id:";
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <sys/mman.h>
#include <dirent.h>
#include <sys/socket.h>
#include <arpa/inet.h>
//#include <gtk.h>
#include "link/neighbor.h"
#include "emrun/emrun.h"
#include "libmisc/misc.h"
#include "libmisc/misc_init.h"
#include "libmisc/queue.h"
#include "libdev/query_dev.h"
#include "libdev/status_dev.h"
#include "fusd/fusd.h"
#include "libdev/query_client.h"
#include "routing/routing_table.h"
#include "devel/filemover/filesender.h"
#include "bundleserver.h"
#include "bundleserver_i.h"
#define BUNDLESERVER_DEST "131.179.144.42"
#define BUNDLESERVER_ROUTES "/click/srcr_lt/routes"
#define BUNDLESERVER_SELFIP "127.0.0.1"
#define BUNDLESERVER_DEST_IP_FILE "/tmp/destination_ip"
QUEUE_FUNCTION_INSTANTIATIONS(bundleserver_job_queue, bundleserver_ptrs, bundleserver_list,
bundleserver_job_elem_t, bundleserver_job_queue_t);
static void bundleserver_shutdown (void *data);
static int bundleserver_query_close (query_context_t *q);
static void bundleserver_query_usage (query_context_t *q, buf_t *buf);
static int bundleserver_query_enqueue (query_context_t *q, char *command, size_t buf_size);
static int bundleserver_query_process (query_context_t *q, char *command, size_t buf_size,
buf_t *print, buf_t *bin);
static int bundleserver_status_print (status_context_t * info, buf_t *buf);
static int bundleserver_status_binary (status_context_t *info, buf_t *buf);
static void bundleserver_submit_file (bundleserver_state_t * state,
const char * filename,
bundleserver_header_t header);
static int bundleserver_timer_dir(gpointer data, int interval, g_event_t *ev);
static int bundleserver_timer_routes(gpointer data, int interval, g_event_t *ev);
static void update_charstar(char ** charstar, const char * msg);
static int bundleserver_timer_dest_ip(gpointer data, int interval, g_event_t *ev);
static int neighbor_list_new(neighbor_t *nb_list, int count, void *data)
{
return EVENT_RENEW;
}
static void bundleserver_set_self_ip(bundleserver_state_t * state)
{
char selfip[128];
uint32_t roof_id = my_node_id;
elog(LOG_INFO,"My ID is %d",my_node_id);
while(roof_id > 1000) {
roof_id -= 1000;
}
elog(LOG_INFO,"My roof_id is %d",roof_id);
snprintf(selfip,128,"5.0.0.%d",roof_id);
update_charstar(&state->self_ip, selfip);
elog(LOG_INFO,"Setting selfip to %s",state->self_ip);
return;
}
/*****************************************
* Print out roofnet routing tree
*/
static int bundleserver_status_binary(status_context_t *info, buf_t *buf)
{
bundleserver_state_t * state = (bundleserver_state_t*)sd_data(info);
bufcpy(buf, &state->route_entry, sizeof(route_entry_t));
return STATUS_MSG_COMPLETE;
}
/*****************************************
* Print out status of jobs
*/
static int bundleserver_status_print(status_context_t * info, buf_t *buf)
{
return STATUS_MSG_COMPLETE;
}
/*****************************************
* Don't do much.
* TODO: close all connections, clean up jobs, etc.
*/
static void bundleserver_shutdown(void * data)
{
elog(LOG_NOTICE, "bundleserver module shutting down.");
exit(0);
}
/*****************************************
* Pretty standard
*/
void usage(char * name)
{
misc_print_usage (name, "", "");
exit(1);
}
/*****************************************
* MAIN
*/
int main(int argc, char * argv[])
{
bundleserver_state_t state = {
status_dev_ref: NULL,
query_dev_ref: NULL,
// TODO change this:
};
char * uses = NULL;
elog(LOG_INFO,"Starting up.");
// Init job queue
state.job_queue = (bundleserver_job_queue_t *)malloc(sizeof(bundleserver_job_queue_t));
bundleserver_job_queue_init(state.job_queue);
update_charstar(&state.dest_ip, BUNDLESERVER_DEST);
// TODO get self ip from somewhere else. maybe roofnet?
update_charstar(&state.self_ip, BUNDLESERVER_SELFIP);
// Get destination from IP file
// bundleserver_get_dest_ip(state);
elog(LOG_INFO,"Setting destination IP: %s",state.dest_ip);
misc_init(&argc, argv, CVSTAG);
// get the --uses arg
uses = link_parse_uses(&argc, argv, NULL);
if (uses == NULL) {
elog(LOG_CRIT, "Please specify a link to use!");
usage(argv[0]);
}
{ // EmRun will trigger this callback on shutdown
emrun_opts_t emrun_opts = {
shutdown: bundleserver_shutdown, // shutdown handler
data: &state // pointer passed to shutdown handler
};
emrun_init(&emrun_opts);
}
// Setup link for sending / receiving messages.
{
// if_id_t temp_id;
lu_opts_t lu_opts = {
opts: {
name: uses,
data: &state,
//pkt_type: PKT_TYPE_MDIFF
},
//receive: Mdiff_receiver
};
// if (lu_open(&lu_opts, &state.link_ref) < 0) {
if (lu_open(&lu_opts, NULL) < 0) {
elog(LOG_CRIT, "can't open %s: %m", link_name(&lu_opts.opts, NULL));
exit(1);
}
/* if (!lu_get_if_id(state.link_ref, &temp_id)) {
state.MD_LOCAL_ADDRESS = (uint16_t)temp_id;
state.addr_base = 0xFFFF0000 & temp_id;
}
else {
elog(LOG_CRIT, "can't get interface id for %s.", link_name(&lu_opts.opts,NULL));
exit(1);
}
*/
}
// Setup neighbor list
{
neighbor_opts_t n_opts = {
link_name: uses,
new_list: neighbor_list_new,
data: &state
};
if (g_neighbors(&n_opts, NULL) < 0) {
elog(LOG_CRIT, "Failed to open neighborlist");
exit(1);
}
}
{ // Setup status device
status_dev_opts_t status_opts = {
device: {
devname: BUNDLESERVER_STATUS_DEV,
device_info: &state
},
printable: bundleserver_status_print,
binary: bundleserver_status_binary,
};
if (g_status_dev(&status_opts, &(state.status_dev_ref)) < 0) {
elog(LOG_CRIT, "Unable to create status device: %m");
exit(1);
}
}
{ // Setup query device
query_dev_opts_t query_opts = {
device: {
devname: BUNDLESERVER_QUERY_DEV,
device_info: &state
},
close: bundleserver_query_close,
usage: bundleserver_query_usage,
enqueue: bundleserver_query_enqueue,
process: bundleserver_query_process,
};
if (query_dev_new(&query_opts, &(state.query_dev_ref)) < 0) {
elog(LOG_CRIT, "Unable to create query device");
exit(1);
}
}
{ // Setup query client
query_client_opts_t q_opts = {
device: FILESENDER_QUERY_DEV,
// response: bundleserver_queryclient_response_handler
};
if (query_client_new(&q_opts, &(state.query_client_ref)) < 0) {
elog(LOG_CRIT, "Unable to create query client");
exit(1);
}
}
{ // Set up dir checker timer
if ( (g_timer_add(5000, // I think this is ms
bundleserver_timer_dir,
&state,
NULL,
NULL)) < 0)
{
elog(LOG_CRIT, "Unable to create timer event");
exit(1);
}
}
{ // Set up routes checker timer
if ( (g_timer_add(5000, // I think this is ms
bundleserver_timer_routes,
&state,
NULL,
NULL)) < 0)
{
elog(LOG_CRIT, "Unable to create timer event");
exit(1);
}
}
{ // Set up destination checker timer
if ( (g_timer_add(5000, // I think this is ms
bundleserver_timer_dest_ip,
&state,
NULL,
NULL)) < 0)
{
elog(LOG_CRIT, "Unable to create timer event");
exit(1);
}
}
bundleserver_timer_dest_ip(&state,0,NULL);
bundleserver_timer_routes(&state,0,NULL);
bundleserver_set_self_ip(&state);
g_main();
elog(LOG_ALERT, "Event system terminated abnormally.");
return 1;
}
/*****************************************
* Don't do much.
*/
static int bundleserver_query_close (query_context_t *q)
{
elog(LOG_NOTICE, "Got query close callback.");
return 0; // ??? How is the return value interpreted?
}
/*****************************************
* blah
*/
static void bundleserver_query_usage (query_context_t *q, buf_t *buf)
{
elog(LOG_NOTICE, "Got query usage callback.");
bufprintf(buf, "I have nothing to say right now.\n");
return;
}
/*****************************************
* Should do basic checking like verifying file name.
*/
static int bundleserver_query_enqueue (query_context_t *q, char *command, size_t buf_size)
{
elog(LOG_NOTICE, "Got query enqueue callback.");
// Check to see if the filename is valid and readable.
return QUERY_ENQUEUE; //??? Is there anything else that I can return
//to indicate an error?
}
/*****************************************
* Shutdown, close, free as necessary.
static void free_bundleserver_job(bundleserver_job_t * job)
{
// int close_result = 0;
// int errsv = 0;
elog(LOG_INFO,"freeing job.");
if(job) {
if (job->file_name) free(job->file_name);
if (job->dest_ip) free(job->dest_ip);
if (job->next_ip) free(job->next_ip);
if (job->src_ip) free(job->src_ip);
if (job->status) free(job->status);
}
return;
}
*/
/*****************************************
* I might turn this into something more interesting, especially if
* the job object supports "log" instead of a single status message.
*/
static void update_charstar(char ** charstar, const char * msg)
{
if (*charstar) free(*charstar);
*charstar = strdup(msg);
return;
}
// returns null if there's no basename: ends in /
/*static char * getbasename(const char * name)
{
char * firstslash = NULL;
firstslash = strrchr(name, '/');
if (firstslash == NULL) {
return name;
}
if (++firstslash == '\0') {
return NULL;
}
return firstslash;
}
*/
/*****************************************
* This is where the job and connection get created. fun.
*/
static int bundleserver_query_process (query_context_t *q, char *command, size_t buf_size,
buf_t *print, buf_t *bin)
{
elog(LOG_INFO," ");
bundleserver_state_t * state = (bundleserver_state_t *) qdev_data(q);
struct stat statbuf;
bzero(&statbuf,sizeof(statbuf));
char * prepend = "BUNDLE__";
int fdin = 0;
int fdout = 0;
size_t dst_size = strlen(command) + strlen(prepend) + 1;
char *dst = malloc(dst_size);
bzero(dst,dst_size);
char * firstslash = NULL;
char * dirname = NULL;
const char * basename = NULL;
void * src_mmap;
void * dst_mmap;
// need to create a bundle file out this:
// create header
// use mmap to create a new file, prepended with "BUNDLE__"
// pass this new guy to bundleserver_process_bundle
bundleserver_header_t header;
elog(LOG_INFO, "Parsing file name");
// create destination name:
// get base name
dirname = strdup(command);
firstslash = strrchr(dirname, '/');
if (firstslash == NULL) {
elog(LOG_CRIT,"Oops. Need full name. Sorry");
goto error;
} else {
basename = firstslash + 1;
*firstslash = '\0';
}
elog(LOG_INFO,"dirname: %s, basename: %s", dirname, basename);
// get dir name
snprintf(dst, dst_size, "%s/%s%s", dirname, prepend, basename);
elog(LOG_INFO, "destination name is: %s", dst);
// command is exactly the file name for now:
elog(LOG_INFO, "Opening file %s", command);
if ( (fdin = open(command, O_RDONLY)) < 0) {
elog(LOG_CRIT,"Unable to open file: %s: %m", command);
goto error;
}
elog(LOG_INFO, "Opening file %s", dst);
if ( (fdout = open(dst, O_RDWR | O_CREAT | O_TRUNC,
S_IRWXU | S_IRWXG | S_IRWXO)) < 0)
{
elog(LOG_CRIT,"Unable to open file: %s: %m", dst);
goto error;
}
elog(LOG_INFO, "fstat(%s)", command);
if ( fstat(fdin, &statbuf) < 0) {
elog(LOG_CRIT,"Unable to fstat file:%s :%m", dst);
goto error;
}
// set size of output file
elog(LOG_INFO, "setting size of file %s", dst);
if (lseek(fdout, statbuf.st_size - 1 + sizeof(header), SEEK_SET) == -1)
{
elog(LOG_CRIT, "Unable to lseek file: %s: %m", dst);
goto error;
}
if ( write(fdout, "", 1) != 1 )
{
elog(LOG_CRIT, "Unable to write file: %s: %m", dst);
goto error;
}
// set up header
elog(LOG_INFO, "Creating bundle header");
bzero(&header,sizeof(header));
header.version = 1;
header.type = 1;
// header.src = strdup("FIX_ME"); // TODO: what's my IP?
strncpy(header.src, state->self_ip, sizeof(header.src) -1 );
// header.dest = strdup("127.0.0.1");
strncpy(header.dest, state->dest_ip, sizeof(header.dest) -1 );
header.length = sizeof(header) + statbuf.st_size;
// mmap the files
elog(LOG_INFO, "mmaping source file %s", command);
if ( (src_mmap = mmap(
0,
statbuf.st_size,
PROT_READ,
MAP_FILE | MAP_SHARED,
fdin,
0
))
== (caddr_t) -1)
{
elog(LOG_CRIT, "Unable to create mmap for file: %s: %m", command);
goto error;
}
elog(LOG_INFO, "mmaping dest file %s", dst);
if ( (dst_mmap = mmap(
0,
header.length,
PROT_READ | PROT_WRITE,
MAP_FILE | MAP_SHARED,
fdout,
0
))
== (caddr_t) -1)
{
elog(LOG_CRIT, "Unable to create mmap for file: %s: %m", dst);
goto error;
}
// Alright, ready to copy
elog(LOG_INFO, "Writing header to %s", dst);
memcpy(dst_mmap, &header, sizeof(header));
elog(LOG_INFO, "Writing file to %s", dst);
memcpy(dst_mmap, src_mmap, header.length);
elog(LOG_INFO, "Done");
// OK, now submit the file.
bundleserver_submit_file(state, dst, header);
goto done;
error:
done:
if (dst_mmap > 0) munmap(dst_mmap, header.length);
if (src_mmap > 0) munmap(src_mmap, statbuf.st_size);
if (fdout > 0) close(fdout);
if (fdin > 0) close(fdin);
free(dst);
if (dirname > 0) free(dirname);
return QUERY_DONE;
}
/*
create job
insert job into queue, otherwise free up the job.
*/
static void bundleserver_submit_file (bundleserver_state_t * state,
const char * filename,
bundleserver_header_t header)
{
// TODO fix this next hop
// char * next_ip = "127.0.0.1";
// elog(LOG_INFO,"hardcoding ip to %s",next_ip);
size_t command_size = sizeof(filesender_query_command_t) + strlen(filename) + 1;
buf_t * response;
bundleserver_job_t * job = (bundleserver_job_t*)malloc(sizeof(bundleserver_job_t));
bzero(job, sizeof(bundleserver_job_t));
bundleserver_job_elem_t * job_elem = (bundleserver_job_elem_t *)
malloc(sizeof(bundleserver_job_elem_t));
bzero(job_elem, sizeof(bundleserver_job_elem_t));
filesender_query_command_t * command =
(filesender_query_command_t*)malloc(command_size);
bzero(command,command_size);
// set up job in queue
elog(LOG_INFO, "Creating and queueing job.");
job->state = state;
job->file_name = strdup(filename);
job->dest_ip = strdup(header.dest);
job->next_ip = strdup(state->next_ip);
elog(LOG_INFO,"job->next_ip is %s",job->next_ip);
job->src_ip = strdup(header.src);
job->status = strdup("new");
job->payload_size = header.length + sizeof(bundleserver_header_t);
job_elem->job = job;
bundleserver_job_queue_push(state->job_queue, job_elem);
// craft filesender_command data structure
elog(LOG_INFO, "Crafting query request");
strncpy(command->ip, job->next_ip, sizeof(command->ip) - 1);
elog(LOG_INFO,"command->ip:%s",command->ip);
strncpy(command->filename, filename, strlen(filename));
// submit query
elog(LOG_INFO, "Submitting request");
query_synchronous_s(FILESENDER_QUERY_DEV, (char *)(command), command_size, &response);
elog(LOG_INFO, "Done.");
free(command);
return;
}
ssize_t readn(int fd, void *vptr, size_t n)
{
size_t nleft;
ssize_t nread;
char * ptr;
ptr = vptr;
nleft = n;
while (nleft > 0) {
if ( (nread = read(fd, ptr, nleft)) < 0) {
if (errno == EINTR)
nread = 0; /* and call read() again */
else
return(-1);
} else if (nread == 0)
break; /* EOF */
nleft -= nread;
ptr += nread;
}
return(n - nleft); /* return >= 0 */
}
/* end readn */
/*******************************************************/
ssize_t Readn(int fd, void *ptr, size_t nbytes)
{
ssize_t n;
if ( (n = readn(fd, ptr, nbytes)) < 0) {
//printf("readn error\n");
elog(LOG_CRIT,"Read error: %m");
exit(1);
}
return(n);
}
/*******************************************************/
static int bundleserver_timer_routes(gpointer data, int interval, g_event_t *ev)
{
bundleserver_state_t * state = (bundleserver_state_t *)data;
elog(LOG_INFO," ");
char dest[128];
char self[128];
char blah[128];
char nexthop[128];
FILE * fd = NULL;
int found_match = 0;
// int found_nexthop = 0;
elog(LOG_INFO,"Checking routes:%s", BUNDLESERVER_ROUTES);
if (!state->dest_ip) {
return EVENT_RENEW;
}
if ( (fd = fopen(BUNDLESERVER_ROUTES,"r")) != NULL) {
elog(LOG_INFO,"Found route file.");
fflush(stdout);
while(fscanf(fd,"%s%s%s%s%*[ .0-9]",dest,self,blah,nexthop) >=4 )
{
if (strcmp(dest, state->dest_ip) == 0)
{
found_match = 1;
elog(LOG_INFO,"Roofnet has next_hop: %s",nexthop);
update_charstar(&state->next_ip, nexthop);
char fourth[4];
sscanf(nexthop,"%*[0-9].%*[0-9].%*[0-9].%[0-9]", fourth);
elog(LOG_INFO,"Destination Node ID: %d", atoi(fourth));
state->route_entry.next_hop = atoi(fourth);
// state->route_entry.dst = 104;
// fill out route entry
// nexthop just needs to be (uint32_t)
//uint32_t sin_addr;
//inet_pton(AF_INET, nexthop, &sin_addr);
// the 1<<24 is to increment 5.0.0.* to 6.0.0.* :P
// state->route_entry.next_hop = ntohl(sin_addr) + (1<<24);
//state->route_entry.next_hop = 104;
//elog(LOG_INFO,"state->route_entry.next_hop = %d",
// state->route_entry.next_hop);
}
}
if (found_match == 0) {
elog(LOG_INFO, "Didn't find destination %s in routes",
state->dest_ip);
// do this to reflect that roofnet doesn't have a next hop.
// this affects the routing tree that is displayed in emview.
//state->route_entry.next_hop = my_node_id;
}
fclose(fd);
}
else {
elog(LOG_CRIT,"Unable to open routes file:%s :%m", BUNDLESERVER_ROUTES);
fflush(stdout);
}
return TIMER_RENEW;
// open fil
// fscanf until i found matching dest.
// set the next hop
// close file
}
/*******************************************************/
static int bundleserver_timer_dir(gpointer data, int interval, g_event_t *ev)
{
bundleserver_state_t * state = (bundleserver_state_t *)data;
elog(LOG_INFO," ");
char * dir = "/tmp/fr_completed";
DIR * dp;
struct dirent *dirp;
// open directory
// elog(LOG_INFO,"Checking for new files in: %s", dir);
if ( (dp = opendir(dir)) == NULL) {
elog(LOG_CRIT,"Unable to open dir: %s: %m", dir);
exit(1);
}
while ( (dirp = readdir(dp)) != NULL) {
if (strcmp(dirp->d_name, ".") == 0
|| strcmp(dirp->d_name, "..") == 0)
continue;
// ok. we have a new file.
// elog(LOG_INFO, "Found file: %s", dirp->d_name);
char old_name[128];
bzero(old_name,sizeof(old_name));
snprintf(old_name, sizeof(old_name), "%s/%s", dir, dirp->d_name);
char new_name[128];
bzero(new_name,sizeof(new_name));
// if we're the destination, then move to inbox
if (strcmp(state->self_ip,state->dest_ip) == 0) {
elog(LOG_INFO,"I'm the final destination...");
snprintf(new_name, sizeof(new_name), "/tmp/bundle_inbox/%s",dirp->d_name);
elog(LOG_INFO, "Renaming %s -> %s",old_name, new_name);
if ( (rename(old_name, new_name)) < 0)
{
elog(LOG_CRIT,"Error renaming %s -> %s: %m",old_name,new_name);
exit(1);
}
}
else {
// We have to forward, so craft a query
// I have the name of the file.
elog(LOG_INFO,"I have to forward. But first move to /tmp/bundle_tmp");
snprintf(new_name, sizeof(new_name),"/tmp/bundle_tmp/%s",
dirp->d_name);
elog(LOG_INFO,"Renaming %s -> %s",old_name,new_name);
if ( (rename(old_name,new_name)) < 0) {
elog(LOG_CRIT,"Error renaming %s -> %s: %m", old_name,new_name);
exit(1);
}
// copy in bundle header
elog(LOG_INFO, "Copying bundle header from %s", new_name);
int fd ;
if ( (fd= open(new_name, O_RDONLY)) < 0) {
elog(LOG_CRIT,"Unable to open file: %s: %m", new_name);
exit(1);
}
// read in header
bundleserver_header_t header;
if ( Readn(fd, &header, sizeof(header)) != sizeof(header) ) {
elog(LOG_INFO,"Weird. Unable to read header from File: %s: %m",
new_name);
}
close(fd);
elog(LOG_INFO,"Submitting file.");
//bundleserver_submit_file(state, dst, header);
bundleserver_submit_file(state, new_name, header);
}
}
closedir(dp);
return TIMER_RENEW;
}
static int bundleserver_timer_dest_ip(gpointer data, int interval, g_event_t *ev)
{
bundleserver_state_t * state = (bundleserver_state_t *)data;
FILE * fd = NULL;
char dest[128];
if ( (fd = fopen(BUNDLESERVER_DEST_IP_FILE,"r")) != NULL) {
elog(LOG_INFO,"Found destination file.");
fflush(stdout);
if (fscanf(fd,"%s",dest) >= 1)
{
// update state
update_charstar(&state->dest_ip, dest);
elog(LOG_INFO,"Destination IP: %s",dest);
// update routing entry (need only last byte as int)
char fourth[4];
sscanf(dest,"%*[0-9].%*[0-9].%*[0-9].%[0-9]", fourth);
elog(LOG_INFO,"Destination Node ID: %d", atoi(fourth));
state->route_entry.dst = atoi(fourth);
//state->route_entry.dst = 104;
/* uint32_t sin_addr;
inet_pton(AF_INET, dest, &sin_addr);
state->route_entry.dst = ntohl(sin_addr)+(1<<24);
elog(LOG_INFO,"state->route_entry.dest = %d",
state->route_entry.dst);
*/
// uint32_t sin_addr;
//inet_pton(AF_INET,dest,&sin_addr);
}
fclose(fd);
}
else {
elog(LOG_CRIT,"Unable to open destination file%s :%m",
BUNDLESERVER_DEST_IP_FILE);
fflush(stdout);
}
return TIMER_RENEW;
}
See more files for this project here