Show emproxy_cmd.c syntax highlighted
/*
*
* Copyright (c) 2005 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
char emproxy_cmd_c_id[] = "$Id: emproxy_cmd.c,v 1.7 2005/09/22 20:06:57 girod Exp $";
#include "emproxy_i.h"
#include "libdev/g_signals.h"
#include <time.h>
#include <sys/wait.h>
#include <limits.h>
#include <sys/resource.h>
static
int emproxy_command_handle_response(void *data, int fd, int cond, g_event_t *ev);
static
int emproxy_toss_it(void *data, int fd, int cond, g_event_t *ev)
{
char buf[4096];
if (cond == FUSD_NOTIFY_INPUT) {
int status = read(fd, buf, sizeof(buf));
if (status < 0) {
if (errno == EAGAIN) goto out;
elog(LOG_WARNING, "unusual error reading from shell pipe: %m");
goto done;
}
else if (status == 0)
goto done;
else
goto out;
}
else if (cond) {
elog(LOG_WARNING, "exception on shell pipe");
goto done;
}
else {
elog(LOG_WARNING, "event handler triggered with no event flag?");
goto done;
}
done:
close(fd);
return EVENT_DONE;
out:
return EVENT_RENEW;
}
void emproxy_command_cancel(struct emp_command *cmd)
{
if (cmd->parent) emproxy_command_remove(cmd->parent, cmd);
if (cmd->command) free(cmd->command);
if (cmd->response) buf_free(cmd->response);
g_event_destroy(cmd->response_event);
if (cmd->output_fd >= 0) {
if (g_event_add(cmd->output_fd, FUSD_NOTIFY_INPUT | FUSD_NOTIFY_EXCEPT,
emproxy_toss_it, NULL, NULL, NULL) < 0) {
elog(LOG_WARNING, "Can't create event: %m");
}
}
free(cmd);
}
static
void emproxy_command_reply(struct emp_client *client, struct emp_command *cmd)
{
int repeat = cmd->time_ended != 0;
/* record completion time */
cmd->time_ended = time(0);
/* send the message back */
buf_t *msg = buf_new();
int length = repeat ? 0 :
i_min(EMPROXY_COMMAND_MAXRESP, cmd->response ? cmd->response->len : 0);
emproxy_reply_hdr_t hdr = {
node_id: my_node_id,
data_id: -(cmd->msg.seq_no),
report_time: {
tv_sec: cmd->time_ended
},
data_length: length + sizeof(emproxy_cmd_reply_hdr_t)
};
bufcpy(msg, &hdr, sizeof(hdr));
emproxy_cmd_reply_hdr_t cmd_hdr = {
exit_status: cmd->exit_status,
exited: cmd->exited
};
bufcpy(msg, &cmd_hdr, sizeof(cmd_hdr));
if (cmd->response && !repeat) bufcpy(msg, cmd->response->buf, length);
emproxy_send_data(client, msg->buf, msg->len);
buf_free(msg);
}
static
int emproxy_command_handle_sigchild(void *data, int signo, g_signal_context_t *sig_event)
{
pid_t pid;
int status;
struct emp_state *e = (struct emp_state *)data;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
/* search for command that matches this PID */
struct emp_client *ptr;
struct emp_command *cmd;
for (ptr = emproxy_client_top(e); ptr; ptr = emproxy_client_next(ptr)) {
for (cmd = emproxy_command_top(ptr); cmd;
cmd = emproxy_command_next(cmd)) {
if (cmd->child_pid == pid)
goto found;
}
}
/* not found.. */
elog(LOG_ERR, "error: process with unknown pid %d exited!", pid);
continue;
found:
elog(LOG_DEBUG(0), "caught %s (pid %d), exit status=%d, signal=%d", signal_name(signo), pid,
WEXITSTATUS(status), WTERMSIG(status));
cmd->exit_status = status;
cmd->exited = 1;
/* read any remaining data.. */
if (cmd->output_fd >= 0)
emproxy_command_handle_response(cmd, cmd->output_fd, FUSD_NOTIFY_INPUT, NULL);
/* reply */
emproxy_command_reply(cmd->parent, cmd);
}
if (pid < 0 && errno != ECHILD)
elog(LOG_ERR, "calling waitpid: %m");
return EVENT_RENEW;
}
static
int emproxy_command_handle_response(void *data, int fd, int cond, g_event_t *ev)
{
struct emp_command *cmd = (struct emp_command *)data;
char buf[4096];
if (cond == FUSD_NOTIFY_INPUT) {
int status = read(fd, buf, sizeof(buf));
if (status < 0) {
if (errno == EAGAIN) goto out;
if (errno == EPIPE) goto done;
elog(LOG_WARNING, "unusual error reading from shell pipe: %m");
goto done;
}
else if (status == 0)
goto done;
else {
if (cmd->response->len < EMPROXY_COMMAND_MAXRESP) {
bufcpy(cmd->response, buf, status);
if (cmd->response->len >= EMPROXY_COMMAND_MAXRESP)
emproxy_command_reply(cmd->parent, cmd);
}
goto out;
}
}
else if (cond) {
elog(LOG_WARNING, "exception on shell pipe");
goto done;
}
else {
elog(LOG_WARNING, "event handler triggered with no event flag?");
goto done;
}
done:
close(fd);
cmd->output_fd = -1;
return EVENT_DONE;
out:
return EVENT_RENEW;
}
void emproxy_process_command(struct emp_state *es, struct emp_client *client,
struct client_message *msg, char *request, int req_len)
{
/* check for seqno already present */
struct emp_command *cmd;
for (cmd = emproxy_command_top(client); cmd; cmd = emproxy_command_next(cmd)) {
/* drop duplicate? */
if (cmd->msg.seq_no == msg->seq_no) {
return;
}
}
/* create new command struct */
cmd = g_new0(struct emp_command, 1);
cmd->child_pid = -1;
cmd->output_fd = -1;
cmd->msg = *msg;
cmd->parent = client;
cmd->command = malloc(req_len+1);
memmove(cmd->command, request, req_len);
cmd->command[req_len] = 0;
cmd->command_length = req_len;
cmd->time_started = time(0);
emproxy_command_push(client, cmd);
/* spawn the process.. */
/* create a pipe */
int in_fds[2];
int out_fds[2];
if (pipe(in_fds) < 0) {
elog(LOG_WARNING, "Can't create pipe: %m");
goto fail;
}
if (pipe(out_fds) < 0) {
elog(LOG_WARNING, "Can't create pipe: %m");
goto fail;
}
/* fork */
int pid = fork();
/* parent.. */
if (pid > 0) {
int in = in_fds[0];
int out = out_fds[1];
/* close other side of pipe */
close(in_fds[1]);
close(out_fds[0]);
cmd->output_fd = in;
cmd->child_pid = pid;
set_nonblock(in, 1);
/* write the commands */
if (write_to_fd(out, cmd->command, cmd->command_length) < 0) {
elog(LOG_WARNING, "Can't write command strings: %m");
goto fail;
}
close(out);
/* create response buffer */
cmd->response = buf_new();
/* set up event to read back.. */
if (g_event_add(in, FUSD_NOTIFY_INPUT | FUSD_NOTIFY_EXCEPT,
emproxy_command_handle_response,
cmd, NULL, &(cmd->response_event)) < 0) {
elog(LOG_WARNING, "Can't create event: %m");
goto fail;
}
}
/* child... */
else if (pid == 0) {
struct rlimit limit;
int in = out_fds[0];
int out = in_fds[1];
/* reinstate default signal handlers */
signal(SIGCHLD, SIG_DFL);
signal(SIGTERM, SIG_DFL);
signal(SIGHUP, SIG_DFL);
signal(SIGINT, SIG_DFL);
/* clear the signal mask - may cause a sigint to be delivered */
sigset_t sigset;
sigemptyset(&sigset);
sigprocmask(SIG_SETMASK, &sigset, NULL);
/* detach from parent's signals */
setpgrp();
/* close all file descriptors. If rlimit doesn't work, guess
* there are 256 of them. */
if (getrlimit(RLIMIT_NOFILE, &limit) < 0)
limit.rlim_cur = 256;
/* close everything except pipes */
int i;
for (i = 0; i < limit.rlim_cur; i++)
if (i != in && i != out) close(i);
/* connect stdout and stderr to pipe */
if (out != STDOUT_FILENO && dup2(out, STDOUT_FILENO) != STDOUT_FILENO)
exit(253);
if (out != STDERR_FILENO && dup2(out, STDERR_FILENO) != STDERR_FILENO)
exit(252);
if (in != STDIN_FILENO && dup2(in, STDIN_FILENO) != STDIN_FILENO)
exit(251);
/* close orig.. */
if (out != STDOUT_FILENO && out != STDERR_FILENO && out != STDIN_FILENO)
close(out);
if (in != STDOUT_FILENO && in != STDERR_FILENO && in != STDIN_FILENO)
close(in);
/* ok, exec the shell.. */
execl("/bin/sh", "/bin/sh", NULL);
/* failed?? */
exit(250);
}
/* fork failed */
else {
elog(LOG_WARNING, "Can't fork!!!");
goto fail;
}
return;
fail:
close(in_fds[0]);
close(in_fds[1]);
close(out_fds[0]);
close(out_fds[1]);
emproxy_command_cancel(cmd);
}
void emproxy_command_gc_now(struct emp_client *client)
{
int now = time(0);
/* check to gc commands */
struct emp_command *cmd;
for (cmd = emproxy_command_top(client); cmd; ) {
struct emp_command *tmp = cmd;
cmd = emproxy_command_next(cmd);
if (tmp->time_ended &&
((now - tmp->time_ended) > EMPROXY_COMMAND_LINGER)) {
emproxy_command_cancel(tmp);
}
else if ((tmp->time_ended == 0) && tmp->time_started &&
((now - tmp->time_started) > EMPROXY_COMMAND_MAXWAIT)) {
emproxy_command_reply(client, tmp);
}
}
}
void emproxy_command_handler_init(struct emp_state *e)
{
g_signal_opts_t opts = {
signo: SIGCHLD,
callback: emproxy_command_handle_sigchild,
data: e
};
if (g_signal_handler(&opts, NULL) < 0) {
elog(LOG_CRIT, "can't attach to sigchld signal: %m");
exit(1);
}
}
See more files for this project here