Show emproxy.c syntax highlighted
/*
*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/*
* emProxy
*
* This program provides a UDP interface that enables a remote client
* to watch device interfaces on an em* system.
*
* $Id: emproxy.c,v 1.28 2006/07/16 10:05:50 girod Exp $
*/
char emproxy_c_id[] = "$Id: emproxy.c,v 1.28 2006/07/16 10:05:50 girod Exp $";
#include <stdio.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>
#include <libmisc/misc.h>
#include "emproxy_i.h"
#include <emrun/emrun.h>
#define _GNU_SOURCE
#include <getopt.h>
#undef _GNU_SOURCE
/* prototypes */
static void emproxy_status_update();
static int emproxy_handle_device(void *new_buffer, size_t size, void *data);
/* queue functions */
QUEUE_FUNCTION_INSTANTIATIONS(emproxy_client, _, clients, struct emp_client, struct emp_state);
QUEUE_FUNCTION_INSTANTIATIONS(emproxy_request, _, requests, struct emp_request, struct emp_client);
QUEUE_FUNCTION_INSTANTIATIONS(emproxy_command, _, commands, struct emp_command, struct emp_client);
QUEUE_FUNCTION_INSTANTIATIONS(emproxy_cached_node, _, nodes, struct cached_emp_node, struct emp_state);
/*
* global emProxy state var
*/
struct emp_state emproxy_state = {
server_fd: -1
};
/* default parameters for requests */
struct emp_request template = {
client_opts: {
read_refractory: 1000,
handler: emproxy_handle_device
},
data_id: -1
};
/*
* Client lookup / creation code
*/
struct emp_client *
emproxy_lookup_client_by_addr(struct emp_state *es, struct sockaddr *addr, int addr_len, int src_proto, int src_iface)
{
struct emp_client *ptr;
/* search current clients */
for (ptr = emproxy_client_top(es); ptr; ptr = emproxy_client_next(ptr)) {
/* compare return addresses */
if ((ptr->addr_len == addr_len) &&
(ptr->src_proto == src_proto) &&
(ptr->src_iface == src_iface) &&
(memcmp(&(ptr->src_addr), addr, addr_len) == 0)) {
/* got a match! */
return ptr;
}
}
/* create a new client */
ptr = g_new0(struct emp_client, 1);
ptr->addr_len = addr_len;
ptr->src_proto = src_proto;
ptr->src_iface = src_iface;
ptr->parent = es;
if (ptr->addr_len > sizeof(ptr->src_addr)) {
elog(LOG_WARNING, "Address length mismatch!");
ptr->addr_len = sizeof(ptr->src_addr);
}
memmove(&(ptr->src_addr), addr, ptr->addr_len);
ptr->last_heard = time(0);
/* add to list */
emproxy_client_push(es, ptr);
emproxy_gw_update_client(ptr, es);
return ptr;
}
char *emproxy_unparse_client(struct emp_client *client)
{
DECLARE_STATIC_BUF_RING(str, 5, 80);
switch (client->src_proto) {
case EMPROXY_PROTO_LINK:
sprintf(str, "%s:link", print_if_id(client->src_addr.sin_addr.s_addr));
break;
case EMPROXY_PROTO_UDP:
return net_unparse_address(&(client->src_addr));
default:
sprintf(str, "%s:unknown", print_if_id(client->src_addr.sin_addr.s_addr));
}
return str;
}
/*
* request comparison
*/
static
int emproxy_request_cmp(struct emp_request *r1, struct emp_request *r2)
{
struct emp_request R1 = *r1;
struct emp_request R2 = *r2;
/* clear string pointers */
R1.client_opts.argument = NULL;
R2.client_opts.argument = NULL;
R1.client_opts.devname = NULL;
R2.client_opts.devname = NULL;
R1.client_opts.private_data = NULL;
R2.client_opts.private_data = NULL;
/* memcmp */
if ((R1.dst_node != R2.dst_node) ||
(R1.data_id != R2.data_id) ||
memcmp(&(R1.client_opts), &(R2.client_opts), sizeof(R1.client_opts)) ||
misc_safe_strcmp(r1->client_opts.devname, r2->client_opts.devname) ||
misc_safe_strcmp(r1->client_opts.argument, r2->client_opts.argument))
return 1;
/* same */
return 0;
}
/*
* destroy request destroys sub allocations, does not free request itself
*/
static
void emproxy_request_destroy(struct emp_request *req)
{
/* free the request subelements */
if (req->client_opts.argument) {
free(req->client_opts.argument);
req->client_opts.argument = NULL;
}
if (req->client_opts.devname) {
free(req->client_opts.devname);
req->client_opts.devname = NULL;
}
}
/*
* request and client cancelation
*/
static
void emproxy_cancel_request(struct emp_client *client, struct emp_request *req)
{
/* cancel event */
g_status_client_destroy(req->event);
/* dequeue and destroy */
emproxy_request_remove(client, req);
emproxy_request_destroy(req);
free(req);
/* log message */
elog(LOG_DEBUG(0), "Client: %s, canceled request id %d",
emproxy_unparse_client(client), req->data_id);
}
static
void emproxy_cancel_all_pending(struct emp_client *client)
{
/* cancel all the requests */
struct emp_request *er;
while ((er = emproxy_request_top(client)))
emproxy_cancel_request(client, er);
/* cancel all the commands */
struct emp_command *cmd;
while ((cmd = emproxy_command_top(client)))
emproxy_command_cancel(cmd);
}
static
void emproxy_cancel_client(struct emp_client *client)
{
emproxy_cancel_all_pending(client);
/* kill timer */
g_event_destroy(client->send_timer);
/* remove from queue and free */
emproxy_client_remove(&emproxy_state, client);
free(client);
}
/*
* packing and sending to network
*/
int emproxy_send_data(struct emp_client *client, char *buf, size_t len)
{
int status = -1;
switch (client->src_proto) {
case EMPROXY_PROTO_LINK: {
buf_t *pkt_buf = buf_new();
int status;
link_pkt_t hdr = {
dst: { id: client->src_addr.sin_addr.s_addr },
type: PKT_TYPE_EMPROXY,
max_hops: 10
};
bufcpy(pkt_buf, &hdr, sizeof(hdr));
char reply = EMPROXY_LINK_REPLY_TYPE;
bufcpy(pkt_buf, &reply, sizeof(reply));
bufcpy(pkt_buf, buf, len);
status = lu_multi_send(client->parent->link_ref, client->src_iface,
(link_pkt_t*)pkt_buf->buf, pkt_buf->len - sizeof(link_pkt_t));
buf_free(pkt_buf);
break;
}
case EMPROXY_PROTO_UDP:
status = sendto(emproxy_state.server_fd, buf, len, 0, (struct sockaddr *)&(client->src_addr), client->addr_len);
break;
default:
elog(LOG_CRIT, "unknown protocol");
}
if (status < 0) {
elog(LOG_WARNING, "Client: %s, error sending message len %d: %m",
emproxy_unparse_client(client), len);
}
return status;
}
/* flush the current packet from the client's aggregation buffer */
static
int emproxy_flush_data(struct emp_client *client)
{
int status = 0;
if (client->udp_len > 0)
status = emproxy_send_data(client, client->udp_buf, client->udp_len);
client->udp_len = 0;
g_event_destroy(client->send_timer);
return status;
}
/* timeout to flush after a delay */
static
int emproxy_do_send(void *data, int interval, g_event_t *event)
{
emproxy_flush_data((struct emp_client *)data);
return TIMER_DONE;
}
int emproxy_push_data(struct emp_client *client, struct emproxy_reply_hdr *reply)
{
/* determine outgoing MTU */
uint16_t outbound_mtu = 0;
switch (client->src_proto) {
case EMPROXY_PROTO_UDP:
outbound_mtu = EMPROXY_UDP_THRESHOLD;
break;
case EMPROXY_PROTO_LINK: {
lu_multi_link_t *lu = lu_multi_index_link(client->parent->link_ref, client->src_iface);
if (lu == NULL || lu_get_mtu(lu_multi_link_get_lu(lu), &outbound_mtu) < 0) {
elog(LOG_WARNING, "unable to get MTU for link: %m");
return -1;
}
if (outbound_mtu <= 0) {
elog(LOG_WARNING, "MTU for link too small %d: %m", outbound_mtu);
return -1;
}
/* room for link type code */
outbound_mtu--;
break;
}
default:
elog(LOG_WARNING, "unknown protocol");
}
/* compute the data length */
int data_len = reply->data_length + sizeof(struct emproxy_reply_hdr);
/* if the new data would put the message over threshold, send the current packet now */
if ((data_len + client->udp_len) > outbound_mtu)
emproxy_flush_data(client);
/* append the new data if it is under threshold */
if (data_len < outbound_mtu) {
/* if this is the first thing queued, set timer to guarantee send */
if (client->udp_len == 0) {
g_timer_add(EMPROXY_AGGREGATION_DELAY, emproxy_do_send, client, NULL, &(client->send_timer));
}
/* copy the new data in */
memmove(client->udp_buf + client->udp_len, (char*)reply, data_len);
client->udp_len += data_len;
}
/* otherwise send it now */
else
emproxy_send_data(client, (char*)reply, data_len);
return 0;
}
/*
* process device data
*/
static
int emproxy_handle_device(void *new_buffer, size_t size, void *data)
{
struct emp_request *req = (struct emp_request *)data;
int out_buf[65536/sizeof(int)];
struct emproxy_reply_hdr *hdr = (struct emproxy_reply_hdr *)out_buf;
int max_size = sizeof(out_buf) - sizeof(struct emproxy_reply_hdr);
elog(LOG_DEBUG(0), "Client: %s, got data on ID %d",
emproxy_unparse_client(req->parent), req->data_id);
/*
* construct a return buffer and send it
*/
hdr->node_id = my_node_id;
hdr->data_id = req->data_id;
gettimeofday(&(hdr->report_time), NULL);
req->last_heard = hdr->report_time.tv_sec;
/* max size.. */
if (size > max_size) {
elog(LOG_WARNING, "Truncating very large response from device %s (%d bytes)",
req->client_opts.devname, size);
size = max_size;
}
hdr->data_length = size;
/* copy the data */
memmove(hdr->data, (char*)new_buffer, size);
/* push the update */
emproxy_push_data(req->parent, hdr);
/* free the status report! */
if (new_buffer)
free(new_buffer);
return EVENT_RENEW;
}
/*
* process requests
*/
static
void emproxy_add_request(struct emp_client *client, struct emp_request *req)
{
struct emp_request *er;
/*
* first check to make sure the request is valid
*/
/* verify ID present */
if (req->data_id < 0) {
if (req->client_opts.devname)
elog(LOG_WARNING, "Client: %s, ignoring request with no ID",
emproxy_unparse_client(client));
goto free;
}
/* verify file name present */
if (req->client_opts.devname == NULL) {
elog(LOG_WARNING, "Client: %s, ignoring request with no device specified",
emproxy_unparse_client(client));
goto free;
}
/*
* check for redundant request
*/
for (er = emproxy_request_top(client); er; er = emproxy_request_next(er)) {
if (emproxy_request_cmp(req, er) == 0) {
elog(LOG_DEBUG(0), "Request already present.. ignoring");
er->mark = 0;
goto free;
}
}
/*
* add new request: copy and don't free
*/
er = g_copy(struct emp_request, req);
er->mark = 0;
er->parent = client;
er->client_opts.private_data = er;
emproxy_request_push(client, er);
/*
* spawn the necessary event.
* test node id if dst node is set
*/
if ((!er->dst_node || er->dst_node == my_node_id) &&
((my_node_id != SIM_COMPONENT_ID) || (er->dst_node == my_node_id)) &&
!client->parent->gw_only)
g_status_client_full(&(er->client_opts), &(er->event));
return;
free:
emproxy_request_destroy(req);
}
#define CASE(str) \
if (strncmp(key, (str), strlen(str)) == 0)
static
void emproxy_parse_pair(struct emp_request *req, char *key, char *value)
{
CASE("ascii") {
req->client_opts.read_as_ascii = 1;
}
if (value) {
CASE("dev") {
req->client_opts.devname = strdup(sim_path(value));
}
CASE("arg") {
req->client_opts.argument = strdup(value);
req->client_opts.arg_len = strlen(value);
}
CASE("reread") {
req->client_opts.reread_period = atoi(value);
}
CASE("ratelimit") {
req->client_opts.read_refractory = atoi(value);
}
CASE("node") {
req->dst_node = atoi(value);
if (req->dst_node == -1)
req->dst_node = SIM_COMPONENT_ID;
}
CASE("id") {
req->data_id = atoi(value);
}
}
}
static
void emproxy_parse_header_pair(struct emp_client *client, struct client_message *msg,
char *key, char *value)
{
if (value == NULL) {
elog(LOG_WARNING, "Parse error in header form client %s: no value for key %s",
emproxy_unparse_client(client), key);
return;
}
/* check the session nonces */
CASE("session") {
int session = atoi(value);
/* clear all requests if nonces mismatch */
if (session != client->session) {
if (client->session)
elog(LOG_NOTICE, "Session nonce mismatch.. clearing old requests for client %s",
emproxy_unparse_client(client));
emproxy_cancel_all_pending(client);
}
client->session = session;
}
/* parse the type key */
CASE("type") {
key = value;
CASE("request") {
msg->request_type=EMPROXY_REQ_REQUEST;
}
CASE("command") {
msg->request_type=EMPROXY_REQ_COMMAND;
}
return;
}
CASE("seq") {
msg->seq_no = atoi(value);
}
CASE("src") {
msg->src = atoi(value);
}
CASE("dest") {
msg->dest_node = atoi(value);
}
}
static
void emproxy_process_request(struct emp_state *es, char *request, int req_len,
struct sockaddr *src_addr, int addr_len, int src_proto, int src_iface)
{
parser_state_t *ps = misc_parse_init(request, MISC_PARSE_COLON_SCHEME);
struct emp_request new_req = template;
struct emp_request *er;
struct client_message msg = {
request_type: EMPROXY_REQ_REQUEST
};
/* lookup the client */
struct emp_client *client;
client = emproxy_lookup_client_by_addr(es, src_addr, addr_len, src_proto, src_iface);
/* update client heard time */
client->last_heard = time(0);
/* parse the request header
* format: session=4534534:type=request\n */
while (misc_parse_next_kvp(ps) >= 0) {
emproxy_parse_header_pair(client, &msg, ps->key, ps->value);
if ((ps->pair_delimit == '\n') || (ps->pair_delimit == 0))
break;
}
/* filter on destination node */
if (msg.dest_node && (my_node_id != msg.dest_node))
goto out;
/* handle request type... */
switch (msg.request_type) {
case EMPROXY_REQ_REQUEST:
/* mark all requests */
for (er = emproxy_request_top(client); er; er = emproxy_request_next(er))
er->mark = 1;
/* parse the request buffer
* format: key=val:...\n */
while (misc_parse_next_kvp(ps) >= 0) {
emproxy_parse_pair(&new_req, ps->key, ps->value);
if ((ps->pair_delimit == '\n') || (ps->pair_delimit == 0)) {
emproxy_add_request(client, &new_req);
memmove(&new_req, &template, sizeof(new_req));
}
}
emproxy_request_destroy(&new_req);
/* gc any marked requests */
for (er = emproxy_request_top(client); er; ) {
struct emp_request *tmp = er;
er = emproxy_request_next(er);
if (tmp->mark) {
elog(LOG_DEBUG(0), "Removing old request %s", tmp->client_opts.devname);
emproxy_cancel_request(client, tmp);
}
}
break;
case EMPROXY_REQ_COMMAND:
/* ignore if from us and local */
if (es->ignore_local_cmds && my_node_id == msg.src) {
goto out;
}
while (misc_parse_next_kvp(ps) >= 0) {
if (strcmp(ps->key, "cmd") == 0) {
int length;
char *raw = misc_parse_get_raw_value(ps, &length);
emproxy_process_command(es, client, &msg, raw, length);
break;
}
}
break;
default:
elog(LOG_WARNING, "Unhandled request type %d", msg.request_type);
break;
}
/* trigger client status update */
emproxy_status_update();
out:
misc_parse_cleanup(ps);
}
static
int emproxy_request_handler(void *data, int fd, int cond, g_event_t *event)
{
if (cond == FUSD_NOTIFY_INPUT) {
char request[65536];
int msg_len;
uint addr_len;
struct sockaddr_in src_addr;
/* receive message */
addr_len = sizeof(src_addr);
msg_len = recvfrom(fd, request, sizeof(request)-1, 0, (struct sockaddr *)&src_addr, &addr_len);
elog(LOG_DEBUG(0), "Got request: %s", request);
/* read error? */
if (msg_len < 0) {
elog(LOG_CRIT, "Read error from server socket: %m");
exit(1);
}
/* forward if we are demuxing */
if (emproxy_state.proxy_demux) {
buf_t *buf = buf_new();
bufcpy(buf, (char*)&src_addr, sizeof(struct sockaddr_in));
bufcpy(buf, request, msg_len);
pd_receive(emproxy_state.proxy_demux, buf->buf, buf->len);
buf_free(buf);
}
/* null-term */
request[msg_len] = 0;
/* process request */
emproxy_process_request(&emproxy_state, request, msg_len+1,
(struct sockaddr *)&src_addr, addr_len, EMPROXY_PROTO_UDP, 0);
}
else {
elog(LOG_CRIT, "Exception from udp server socket: %m");
exit(1);
}
return EVENT_RENEW;
}
static
int emproxy_link_handler(lu_multi_link_t *lu_l, link_pkt_t *pkt, ssize_t data_len)
{
struct sockaddr_in src_addr;
src_addr.sin_addr.s_addr = pkt->src.id;
/* null term */
char *buf = (char*)pkt;
if (data_len > 0 && pkt->data[0] == EMPROXY_LINK_REQUEST_TYPE) {
memmove(buf, pkt->data + 1, data_len - 1);
buf[data_len-1] = 0;
/* process request */
emproxy_process_request(&emproxy_state, buf, data_len,
(struct sockaddr *)&src_addr, sizeof(src_addr),
EMPROXY_PROTO_LINK, lu_multi_link_get_index(lu_l));
}
else
elog(LOG_DEBUG(0), "dropping message, len=%d, first char is 0x%x",
data_len, data_len > 0 ? pkt->data[0] : 0);
free(pkt);
return EVENT_RENEW;
}
static
int emproxy_demux_handler(void *buffer, ssize_t size, pd_client_context_t *client)
{
struct emp_state *state = (struct emp_state *)(pd_client_opts(client)->data);
char bufcpy[65536];
struct sockaddr_in addr;
/* check length */
if (size < sizeof(struct sockaddr_in)) {
elog(LOG_WARNING, "Received bogus message from emproxy demux, len=%d", size);
goto out;
}
/* parse out the sockaddr */
memmove(&addr, buffer, sizeof(struct sockaddr_in));
/* parse out data and null-term */
size -= sizeof(struct sockaddr_in);
if (size > sizeof(bufcpy)-1) {
elog(LOG_WARNING, "Received msg too large from demux, %d", size);
goto out;
}
memmove(bufcpy, buffer+sizeof(struct sockaddr_in), size);
bufcpy[size] = 0;
elog(LOG_DEBUG(0), "Got request: %s", bufcpy);
/* process it */
emproxy_process_request(state, bufcpy, size+1, (struct sockaddr *)&addr,
sizeof(struct sockaddr_in), EMPROXY_PROTO_UDP, 0);
out:
free(buffer);
return EVENT_RENEW;
}
static
int emproxy_link_configure(lu_context_t *lu)
{
int arg=1;
elog(LOG_DEBUG(0), "configuring link for loop mode");
if (lu_ioctl(lu, PD_SET_LOOP_MODE, &arg) < 0) {
elog(LOG_WARNING, "failed to configure link for loop mode: %m");
}
return EVENT_RENEW;
}
static
int emproxy_open_link(struct emp_state *emp)
{
lu_multi_opts_t opts = {
lu_opts: {
opts: {
pkt_type: PKT_TYPE_EMPROXY
},
configure: emproxy_link_configure
},
if_class: LINK_CLASS_NETWORK,
receive: emproxy_link_handler,
withhold_warning: 1
};
if (lu_open_multi(&opts, &(emp->link_ref)) < 0) {
elog(LOG_WARNING, "Unable to open network link class: %m");
return -1;
}
return 0;
}
static
int emproxy_config_port()
{
/* get a socket */
emproxy_state.server_fd = socket(AF_INET, SOCK_DGRAM,0);
if (emproxy_state.server_fd < 0) {
elog(LOG_CRIT, "Socket call failed: %m");
exit(1);
}
/* if we are in_sim, and we are NOT the master,
* then we open the master emproxy device and
* watch that */
if (in_sim && !sim_is_component()) {
pd_client_opts_t opts = {
devname: EMPROXY_DEMUX_DEV,
receive: emproxy_demux_handler,
data: &emproxy_state
};
if (pd_client_open(&opts, NULL) < 0) {
elog(LOG_CRIT, "Unable to open demux device: %m");
exit(1);
}
}
/* if we are not sim (or we ARE the master), we bind the socket
* and watch it */
else {
g_event_opts_t opts = {
event_name: "udp server",
close_on_destroy: 1
};
/* configure address */
struct sockaddr_in serv_addr = {};
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(emproxy_get_port());
/* bind the socket */
if (bind(emproxy_state.server_fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
elog(LOG_CRIT, "Can't bind socket: %m");
elog(LOG_CRIT, "... perhaps another emproxy is running? I'll just hang out then");
return -1;
}
/* Install event to handle UDP */
if (g_event_add(emproxy_state.server_fd, FUSD_NOTIFY_INPUT | FUSD_NOTIFY_EXCEPT,
emproxy_request_handler, NULL, &opts, NULL) < 0) {
elog(LOG_CRIT, "Can't install udp server event: %m");
exit(1);
}
}
return 0;
}
/*
* garbage collection
*/
static
int emproxy_gc_timeout(void *data, int interval, g_event_t *event)
{
int now = time(0);
struct emp_client *ptr;
int change = 0;
/* check for clients that are aging out */
for (ptr = emproxy_client_top(&emproxy_state); ptr; ) {
struct emp_client *tmp = ptr;
ptr = emproxy_client_next(ptr);
/* gc the old commands */
emproxy_command_gc_now(tmp);
if ((now - tmp->last_heard) > EMPROXY_CLIENT_TIMEOUT) {
elog(LOG_DEBUG(0), "Client: %s, timing out!",
emproxy_unparse_client(tmp));
emproxy_cancel_client(tmp);
change = 1;
}
}
/* trigger update */
if (change)
emproxy_status_update();
return TIMER_RENEW;
}
/*
* emproxy status device
*/
static
int emproxy_status_printable(status_context_t *info, buf_t *buf)
{
struct emp_client *ptr;
struct emp_request *er;
int now = time(0);
bufprintf(buf, "Active Emproxy clients:\n");
for (ptr = emproxy_client_top(&emproxy_state); ptr;
ptr = emproxy_client_next(ptr)) {
bufprintf(buf, " %s: (req %d sec ago) [session nonce=%x]\n",
emproxy_unparse_client(ptr),
now - ptr->last_heard, ptr->session);
if (emproxy_request_top(ptr)) {
bufprintf(buf, "Requests:\n");
for (er = emproxy_request_top(ptr); er;
er = emproxy_request_next(er)) {
bufprintf(buf, " %d: %s, ", er->data_id, er->client_opts.devname);
if (er->client_opts.argument)
bufprintf(buf, "arg=%s, ", er->client_opts.argument);
if (er->client_opts.read_as_ascii)
bufprintf(buf, "ascii, ");
if (er->client_opts.reread_period)
bufprintf(buf, "reread=%d, ", er->client_opts.reread_period);
if (er->client_opts.read_refractory)
bufprintf(buf, "ratelimit=%d, ", er->client_opts.read_refractory);
if (er->dst_node) {
if (er->dst_node == my_node_id)
bufprintf(buf, "dst=US, ");
else if (er->dst_node == SIM_COMPONENT_ID)
bufprintf(buf, "dst=SIM, ");
else
bufprintf(buf, "dst=%d, ", er->dst_node);
}
if (er->last_heard)
bufprintf(buf, "last read %d sec ago\n", now - er->last_heard);
else
bufprintf(buf, "never read\n");
}
}
if (emproxy_command_top(ptr)) {
struct emp_command *cmd;
bufprintf(buf, "Commands:\n");
int now = time(0);
for (cmd = emproxy_command_top(ptr); cmd; cmd = emproxy_command_next(cmd)) {
bufprintf(buf, " \"%s\" (%d bytes)\n", cmd->command, cmd->command_length);
bufprintf(buf, " seqno=%d/dst=%d: ", cmd->msg.seq_no, cmd->msg.dest_node);
if (cmd->time_started) bufprintf(buf, "Started %d secs ago, ", now - cmd->time_started);
if (cmd->time_ended) bufprintf(buf, "Completed %d secs ago, ", now - cmd->time_ended);
bufprintf(buf, "pid=%d ", cmd->child_pid);
if (cmd->exited) bufprintf(buf, "exited, status=%x", cmd->exit_status);
bufprintf(buf, "\n");
}
}
bufprintf(buf, "\n");
}
return STATUS_MSG_COMPLETE;
}
static
void emproxy_status_init()
{
status_dev_opts_t opts = {
device: {
devname: EMPROXY_STATUS_DEV
},
printable: emproxy_status_printable
};
g_status_dev(&opts, &(emproxy_state.proxy_status));
}
static
void emproxy_status_update()
{
g_status_dev_notify(emproxy_state.proxy_status);
}
/*
* proxy demux device for emsim
*/
void emproxy_demux_init()
{
packet_dev_opts_t opts = {
device: {
devname: EMPROXY_DEMUX_DEV
}
};
if (g_packet_dev(&opts, &(emproxy_state.proxy_demux)) < 0) {
elog(LOG_CRIT, "Unable to create proxy demux device: %m");
exit(1);
}
}
/*
* Main Program and Initialization
*/
void usage(char *s)
{
fprintf(stderr, "Usage: %s [-h] [--gw-only] [--ignore-local-cmds]\n"
" Use ignore-local-cmds to disable local execution of commands \n"
" (e.g. on control laptop) \n", s);
exit(1);
}
void emproxy_shutdown(void *data)
{
elog(LOG_NOTICE, "EmProxy shutting down..");
exit(0);
}
int main(int argc, char *argv[])
{
emrun_opts_t emrun_opts = {
shutdown: emproxy_shutdown,
};
int dormant = 0;
/* emstar generic init */
misc_init(&argc, argv, CVSTAG);
/* enable demux if we are a sim component */
if (sim_is_component())
emproxy_state.sim_demux = 1;
emproxy_state.gw_only = misc_parse_out_switch(&argc, argv, "gw-only", 0);
emproxy_state.ignore_local_cmds = misc_parse_out_switch(&argc, argv, "ignore-local-cmds", 0);
/* Bind to UDP port, install event handler */
dormant = (emproxy_config_port() < 0);
if (misc_args_remain(&argc, argv)) {
usage(argv[0]);
}
/* do rest of setup only if we're not "dormant" */
if (!dormant) {
/* open the network class */
emproxy_open_link(&emproxy_state);
/* Create client list device */
emproxy_status_init();
/* Create request summary device */
emproxy_gw_init(&emproxy_state);
/* Create demux device if requested */
if (emproxy_state.sim_demux)
emproxy_demux_init();
/* create gc timer */
g_timer_add(EMPROXY_GC_INTERVAL, emproxy_gc_timeout, NULL,
NULL, NULL);
/* set signal handler */
emproxy_command_handler_init(&emproxy_state);
}
/* Register with emrun and start the event loop */
emrun_init(&emrun_opts);
elog(LOG_NOTICE, "Emproxy service starting%s",
emproxy_state.sim_demux ? ", sim_demux mode" : "");
g_main();
elog(LOG_CRIT, "event loop exited unexpectedly!");
return 1;
}
See more files for this project here