socket_server.cc from EmStar at Krugle
Show socket_server.cc syntax highlighted
/* ex: set tabstop=4 expandtab shiftwidth=4 softtabstop=4: */
#include "tcp_socket.h"
//#define SERVER_IP "131.179.144.16"
//#define SERVER_IP "164.67.194.193"
#include "devel/as-emstar/ds.h"
__BEGIN_DECLS
#include <libmisc/misc.h>
#include <libdev/status_client.h>
__END_DECLS
#define BACKLOG 100 // how many pending connections queue will hold
static int as_tcp_socket_config(int sd, Adaptive_Sampling * as);
static void close_top(SocketIdQueue * sid_queue);
static void print_to_as_msg( void * buf );
static int as_tcp_socket_handle_read(void *data, int fd, int cond, g_event_t *event);
static char * addr_to_host(struct sockaddr_in * saddr );
static int tcp_server_handle_accept(void *data, int fd, int cond, g_event_t *event);
//init a local TCP connection, acts as a server, return socket_id
//return negative value if error.
int init_tcp_server_connection(int server_port, Adaptive_Sampling *as)
{
int sd, rc;
int val = 1;
struct sockaddr_in servAddr;
SocketIdQueue * sid_queue = new SocketIdQueue;
as->sid_queue = sid_queue;
servAddr.sin_family = AF_INET;
servAddr.sin_addr.s_addr = INADDR_ANY;
//use matlab on wren.
//servAddr.sin_addr.s_addr = inet_addr(SERVER_IP);
servAddr.sin_port = htons(server_port);
memset(&(servAddr.sin_zero), '\0', 8); // zero the rest of the struct
/* create socket */
sd = socket(AF_INET, SOCK_STREAM, 0);
if(sd<0) {
fprintf(stderr, "socket server: cannot open socket\n");
exit (1);
}
//DO I really need this setsockopt()
if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val) < 0) {
fprintf(stderr, "fusdnet: Unable to set SO_REUSEADDR on fusdnet server socket! %m\n");
exit(1);
}
rc = bind(sd, (struct sockaddr *) &servAddr, sizeof(servAddr));
if(rc<0) {
fprintf(stderr, "cannot bind port TCP %u\n", server_port);
perror("error");
exit (1);
}
if (listen(sd, BACKLOG) < 0) {
fprintf(stderr, "fusdnet: Unable to listen on fusdnet server socket! %m\n");
exit(1);
}
//set non_block
set_nonblock(sd, 1);
/* register an event.. */
if (g_event_add(sd, FUSD_NOTIFY_INPUT | FUSD_NOTIFY_EXCEPT, tcp_server_handle_accept, as,
NULL, NULL) < 0) {
fprintf(stderr, "fusdnet: Failed to create event for accept socket: %m\n");
exit(1);
}
return sd;
}
static
int tcp_server_handle_accept(void *data, int fd, int cond, g_event_t *event)
{
Adaptive_Sampling *as = (Adaptive_Sampling *) data;
SocketIdQueue * sid_queue = as->sid_queue;
struct sockaddr_in saddr;
size_t size = sizeof(saddr);
socket_id_t *sid;
/* accept new connection */
int status = accept(fd, (struct sockaddr *)&saddr, &size);
if (status < 0) {
elog(LOG_CRIT, "handle_accept: Accept returned error: %m");
goto out;
}
/* save this new client connection*/
sid = new socket_id_t;
memcpy( &(sid->from_addr), &saddr, sizeof(sockaddr_in));
sid->sd = status;
sid_queue->push( sid );
set_nonblock(status, 1);
as_tcp_socket_config(status, as);
out:
return EVENT_RENEW;
}
static
char * addr_to_host(struct sockaddr_in * saddr )
{
return inet_ntoa(saddr->sin_addr);
}
static
int as_tcp_socket_handle_read(void *data, int fd, int cond, g_event_t *event)
{
Adaptive_Sampling *as = (Adaptive_Sampling *) data;
SocketIdQueue * sid_queue = as->sid_queue;
char buf[4096];
int status;
to_as_msg_t *msg;
// to_TA_msg_t *msg_out;
loc_world cur_pos, upper_l, lower_r;
status = read(fd, buf, sizeof(buf));
/* handle errors */
socket_id_t * sid = sid_queue->front();
if (status < 0) {
if (!((errno == EINTR) || (errno == EAGAIN))) {
elog(LOG_WARNING, "Unexpected read error on socket %s: %m", addr_to_host(&(sid->from_addr)) );
close_top(sid_queue);
}
goto done;
}
/* handle close */
if (status == 0) {
close_top(sid_queue);
goto done;
}
/* handle new data */
assert( status == sizeof(to_as_msg_t) );
//init AS
msg = (to_as_msg_t *)buf;
if ( msg->msg_type == AS_EXIT )
exit(0);
cur_pos.x = msg->curx;
cur_pos.y = msg->cury;
upper_l.x = msg->xmin;
upper_l.y = msg->ymin;
lower_r.x = msg->xmax;
lower_r.y = msg->ymax;
elog( LOG_ERR, "REV %d bytes\n", status );
if ( as->state == AS_IDLE )
{
as->state = AS_BUSY;
as_restart_prep(as, cur_pos, upper_l, lower_r);
} else {
elog( LOG_ERR, "Sorry, I am busy, send me command later.\n" );
}
print_to_as_msg( (void *)buf );
/*
msg_out = new to_TA_msg_t;
msg_out->msg_type = AS_TO_TA;
msg_out->curx = 27;
msg_out->cury = 28;
tcp_send(sid->sd, msg_out, sizeof(to_TA_msg_t));
*/
done:
return EVENT_RENEW;
}
static
void print_to_as_msg( void * buf )
{
// to_as_msg_t * msg = (to_as_msg_t * )buf;
// fprintf( stderr, "Msg type: %d:", msg->msg_type);
// fprintf( stderr, "cur: %d, %d; X-%d, %d; Y-%d, %d\n", msg->curx, msg->cury, msg->xmin, msg->xmax, msg->ymin, msg->ymax);
}
static
void close_top(SocketIdQueue * sid_queue)
{
socket_id_t * sid = sid_queue->front();
sid_queue->pop();
close( sid->sd );
g_event_destroy( sid->read_event );
delete sid;
}
static
int as_tcp_socket_config(int sd, Adaptive_Sampling *as)
{
SocketIdQueue * sid_queue = as->sid_queue;
socket_id_t * sid = sid_queue->front();
sid->read_event = NULL;
if (g_event_add(sd, FUSD_NOTIFY_INPUT, as_tcp_socket_handle_read,
as, NULL, &(sid->read_event)) < 0) {
elog(LOG_CRIT, "Can't create event for new socket: %m");
return -1;
}
return 0;
}
void as_one_frame_done(Adaptive_Sampling *as)
{
to_TA_msg_t *msg_out;
socket_id_t *sid= as->sid_queue->front();
msg_out = new to_TA_msg_t;
msg_out->msg_type = AS_TO_TA;
msg_out->curx = as->pos.x;
msg_out->cury = as->pos.y;
as->state = AS_IDLE;
tcp_send(sid->sd, msg_out, sizeof(to_TA_msg_t));
delete msg_out;
}
See more files for this project here