Show emproxy_gw.c syntax highlighted
/*
*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "emproxy_i.h"
#include <time.h>
#include "emproxy/emproxy.h"
#include <emrun/emrun.h>
/*
* emproxy gateway interface
*
* note: this needs to save the last result so it can report it
* to better support emview coming up late
*/
int timer_set = 0;
static int update_configuration(void* data, int interval, g_event_t* event);
static
int emproxy_gw_push_reply(struct emp_client *ptr, emproxy_reply_hdr_t *hdr, char *devname, int freshen)
{
struct emp_request *er;
for (er = emproxy_request_top(ptr); er;
er = emproxy_request_next(er)) {
/* if destination set and it's not for us, skip it */
if (er->dst_node && (er->dst_node != hdr->node_id))
continue;
/* if the device names match, report it */
if (strcmp(er->client_opts.devname, devname) == 0) {
hdr->data_id = er->data_id;
if (freshen) er->last_heard = time(0);
if (emproxy_push_data(ptr, hdr) < 0) {
elog(LOG_WARNING, "push_data failed: %m");
}
}
}
return 0;
}
/* If set_timer is 1, then we will set a timer to try and re-send the
* configuration data to the emproxy gateway periodically - if the node
* needs to be cached */
cached_emp_node_t* find_cached_node(node_id_t node_id, struct emp_state* es,
int set_timer)
{
cached_emp_node_t* node = NULL;
for (node = emproxy_cached_node_top(es); node;
node = emproxy_cached_node_next(node)) {
if (node->node_id == node_id) {
break;
}
}
/* If no struct exists, create it */
if (!node) {
node = g_new0(cached_emp_node_t, 1);
node->node_id = node_id;
node->emview_cmd = buf_new();
emproxy_cached_node_push(es,node);
/* If we have to cache a node, then set the timer, if requested */
if ((!timer_set) && (set_timer)) {
g_timer_add(10000, update_configuration, es, NULL, NULL);
set_timer = 1;
}
}
return node;
}
static
void cache_configuration(emproxy_reply_hdr_t* hdr, int message_len,
struct emp_state* es)
{
cached_emp_node_t* node = NULL;
/* If that node does not exist (i.e. it is faked or it hasnt come
* up yet), then cache it for now, and try to send it later */
if ((node = find_cached_node(hdr->node_id, es, 1))) {
if (node->emview_cmd) {
buf_free(node->emview_cmd);
}
node->emview_cmd=buf_new();
bufcpy(node->emview_cmd,(char *)hdr, message_len);
}
else {
elog(LOG_WARNING,"WARN couldn't find/add struct for node %d\n",
hdr->node_id);
}
}
static
int send_cmd_to_gw(struct emp_state *es, emproxy_reply_hdr_t* hdr,
int message_len)
{
char* hdr_data = misc_string_realloc_null_term(hdr->data, hdr->data_length);
buf_t* cmd_buf = buf_new();
bufprintf(cmd_buf, "%s", hdr_data);
int retval = 1;
/* Become the node who i want to send this to */
if (hdr->node_id == my_node_id || !in_sim)
retval = 0;
else {
sim_become_node(hdr->node_id);
if (emproxy_emit_buf_to_gw(EMRUN_EMVIEW_CONFIG_DEVNAME, hdr->node_id, NULL, cmd_buf) < 0) {
retval = 0;
}
sim_restore_node();
}
buf_free(cmd_buf); free(hdr_data);
return retval;
}
static
void free_cached_configuration(cached_emp_node_t* node)
{
buf_free(node->emview_cmd);
node->emview_cmd=buf_new();
}
static
int update_configuration(void* data, int interval, g_event_t* event)
{
struct emp_state* es = (struct emp_state *)data;
cached_emp_node_t* node = NULL;
/* Go through all nodes, and see if we need to update the configuration */
for (node = emproxy_cached_node_top(es); node;
node = emproxy_cached_node_next(node)) {
/* If the emview_cmd string holds something, then we try to update */
if (node->emview_cmd->len > 0) {
/* If we are able to sucessfully send the command, then clear any cfg data
* that may have been cached for this node */
if (send_cmd_to_gw(es, (emproxy_reply_hdr_t*) node->emview_cmd->buf,
node->emview_cmd->len) > 0) {
free_cached_configuration(node);
}
}
/* NR todo See if the node has any data, and refresh that as well */
}
return TIMER_RENEW;
}
static
int emproxy_gw_submit(status_context_t *ctx, char *command, size_t buf_size)
{
struct emp_state *es = (struct emp_state *) sd_data(ctx);
int retval = EVENT_RENEW;
/* parse the preceding device name */
int command_len = strlen(command);
int message_len = buf_size - command_len - 1;
/* verify length */
if (message_len < sizeof(emproxy_reply_hdr_t)) {
elog(LOG_WARNING, "GW device received malformed packet: message length %d", message_len);
elog(LOG_WARNING, "GW device: '%s'", command);
retval |= EVENT_ERROR(EINVAL);
}
else {
struct emp_client *ptr;
char *devname = strdup(command);
/* shift down the message */
emproxy_reply_hdr_t *hdr = (emproxy_reply_hdr_t *)command;
memmove(command, command + command_len + 1, message_len);
/* correct length */
hdr->data_length = message_len - sizeof(emproxy_reply_hdr_t);
/* set report time if not set */
if (hdr->report_time.tv_sec == 0 && hdr->report_time.tv_usec == 0) {
gettimeofday(&(hdr->report_time), NULL);
}
/* CHeck if there is anything in the header */
if (message_len > sizeof(emproxy_reply_hdr_t)) {
/* Check if this is a configuration message */
if (strcmp(EMRUN_EMVIEW_CONFIG_DEVNAME, devname) == 0) {
/* Make sure hdr->data is null-terminated for ease of string
* handling */
/* If we are the source node, then write this to our emrun file */
if (hdr->node_id == my_node_id) {
int fd = open(EMRUN_COMMAND_DEVNAME, O_RDWR);
if (fd < 0) {
elog(LOG_WARNING, "Unable to open emrun command device %s: %m\n",
EMRUN_COMMAND_DEVNAME);
}
else {
buf_t* cmd_buf = buf_new();
int status;
char* hdr_data =
misc_string_realloc_null_term(hdr->data, hdr->data_length);
bufprintf(cmd_buf, "emview=%s\n", hdr_data);
free(hdr_data);
if ((status = write(fd, cmd_buf->buf, cmd_buf->len)) < 0) {
elog(LOG_WARNING, "Unable to write to fd %d: %m\n", fd);
}
close(fd);
buf_free(cmd_buf);
goto skip_send;
}
}
/* Otherwise attempt to write this to the correct node's gateway file */
else {
/* If we are unable to, then cache the configuration */
if (send_cmd_to_gw(es, hdr, message_len) == 0) {
cache_configuration(hdr, message_len, es);
} else {
goto skip_send;
}
}
}
/* NR Otherwise assume this is just data, and save it to emproxy_data */
}
/* now scan for clients that want this data.. */
for (ptr = emproxy_client_top(es); ptr;
ptr = emproxy_client_next(ptr)) {
/* push reply, freshen timestamp */
emproxy_gw_push_reply(ptr, hdr, devname, 1);
}
skip_send:
free(devname);
}
return retval;
}
void emproxy_gw_update_client(struct emp_client* ptr, struct emp_state *es)
{
struct cached_emp_node *node = NULL;
for (node = emproxy_cached_node_top(es); node;
node = emproxy_cached_node_next(node)) {
/* Only update configuration for nodes we have received config for */
if (node->emview_cmd->len > 0) {
emproxy_reply_hdr_t *hdr = (emproxy_reply_hdr_t *)(node->emview_cmd->buf);
emproxy_gw_push_reply(ptr, hdr, EMRUN_EMVIEW_CONFIG_DEVNAME, 0);
}
}
}
static
int emproxy_gw_usage(status_context_t *info, buf_t *buf)
{
bufprintf(buf,
"EmProxy Gateway Device.\n"
"Accepts messages from gateways to other protocols, forwards them.\n"
"See emproxy.h for information on the format.\n\n");
return STATUS_MSG_COMPLETE;
}
void emproxy_gw_init(struct emp_state *es)
{
status_dev_opts_t opts = {
device: {
devname: EMPROXY_GATEWAY_DEV,
device_info: es
},
printable: emproxy_gw_usage,
write: emproxy_gw_submit
};
g_status_dev(&opts, &(es->proxy_gw));
}
See more files for this project here