Show dts_exec.c syntax highlighted
char dts_exec_c_cvsid[] = "$Id: dts_exec.c,v 1.7 2007/01/08 01:06:24 mlukac Exp $";
#include "dts_i.h"
/*
TODO:
garbage collect commands - need to add timer to dts_main
*/
static
int dts_exec_toss_it(void *data, int fd, int cond, g_event_t *ev)
{
char buf[4096];
if (cond == FUSD_NOTIFY_INPUT) {
int status = read(fd, buf, sizeof(buf));
if (status < 0) {
if (errno == EAGAIN) goto out;
elog(LOG_WARNING, "unusual error reading from shell pipe: %m");
goto done;
}
else if (status == 0)
goto done;
else
goto out;
}
else if (cond) {
elog(LOG_WARNING, "exception on shell pipe");
goto done;
}
else {
elog(LOG_WARNING, "event handler triggered with no event flag?");
goto done;
}
done:
close(fd);
return EVENT_DONE;
out:
return EVENT_RENEW;
}
void dts_exec_command_cancel(dts_command_el_t *dtsce, exec_command_t *cmd)
{
elog(LOG_DEBUG(15), "command cancel");
if (cmd == NULL) return;
// if (cmd->parent) emproxy_command_remove(cmd->parent, cmd);
if (cmd->command) free(cmd->command);
if (cmd->response) buf_free(cmd->response);
g_event_destroy(cmd->response_event);
if (cmd->output_fd >= 0) {
if (g_event_add(cmd->output_fd, FUSD_NOTIFY_INPUT | FUSD_NOTIFY_EXCEPT,
dts_exec_toss_it, NULL, NULL, NULL) < 0) {
elog(LOG_WARNING, "Can't create event %m");
}
}
free(cmd);
dtsce->exec_cmd = NULL;
}
static
void dts_exec_command_reply(dts_t *dts, exec_command_t *cmd, int return_value)
{
elog(LOG_DEBUG(15), "command reply");
/* record completion time */
cmd->time_ended = time(0);
dts_response_t *response =
dts_response_create_response(dts, cmd->dtsce, cmd->response,
return_value);
dts_response_el_t *dtsre =
dts_response_find_or_create(dts, response,
sizeof(dts_response_t) +
buf_len(cmd->response) + 1,
NULL);
dts_ondisk_store_new_response(dts, dtsre);
dts_response_push_responses(dts);
free(response);
if (dts->command_exec_counter > 0) {
dts_command_trigger_exec(dts);
}
}
static
int dts_exec_command_handle_response(void *data, int fd, int cond, g_event_t *ev)
{
elog(LOG_DEBUG(15), "handel response");
exec_command_t *cmd = (exec_command_t *) data;
char buf[4096];
memset(buf, 0, 4096);
if (cond == FUSD_NOTIFY_INPUT) {
int status = read(fd, buf, sizeof(buf));
elog(LOG_DEBUG(5), "got some result: %i: %s", status, buf);
if (status < 0) {
if (errno == EAGAIN) goto out;
if (errno == EPIPE) goto done;
elog(LOG_WARNING, "unusual error reading from shell pipe: %m");
goto done;
}
else if (status == 0)
goto done;
else {
if (cmd->response->len < EXEC_COMMAND_MAXRESP) {
bufcpy(cmd->response, buf, status);
if (cmd->response->len >= EXEC_COMMAND_MAXRESP) {
bufcpy(cmd->response, "- DTS: Max len", 14);
dts_exec_command_reply(cmd->dts, cmd, -255);
// dts_exec_command_cancel(cmd->dtsce, cmd);
}
}
goto out;
}
}
else if (cond) {
elog(LOG_WARNING, "exception on shell pipe: %m");
goto done;
}
else {
elog(LOG_WARNING, "event handler triggered with no event flag?");
goto done;
}
done:
// dts_exec_command_reply(cmd->dts, cmd, -253);
// dts_exec_command_cancel(cmd->dtsce, cmd);
close(fd);
cmd->output_fd = -1;
return EVENT_DONE;
out:
return EVENT_RENEW;
}
int dts_exec_process_command(dts_t *dts, dts_command_el_t *dtsce)
{
exec_command_t *cmd = NULL;
int req_len = dtsce->length - sizeof(dts_command_t);
dtsce->executed = 1;
/* create new command struct */
cmd = g_new0(exec_command_t, 1);
cmd->child_pid = -1;
cmd->output_fd = -1;
// cmd->msg = *msg;
cmd->dts = dts;
cmd->dtsce = dtsce;
dtsce->exec_cmd = cmd;
cmd->command = malloc(req_len + 1);
memmove(cmd->command, dtsce->command->data, req_len);
cmd->command[req_len] = 0;
cmd->command_length = req_len;
cmd->time_started = time(0);
/* spawn the process.. */
/* create a pipe */
int in_fds[2];
int out_fds[2];
if (pipe(in_fds) < 0) {
elog(LOG_WARNING, "Can't create pipe: %m");
goto fail;
}
if (pipe(out_fds) < 0) {
elog(LOG_WARNING, "Can't create pipe: %m");
goto fail;
}
/* fork */
int pid = fork();
/* parent.. */
if (pid > 0) {
int in = in_fds[0];
int out = out_fds[1];
/* close other side of pipe */
close(in_fds[1]);
close(out_fds[0]);
cmd->output_fd = in;
cmd->child_pid = pid;
set_nonblock(in, 1);
/* write the commands */
if (write_to_fd(out, cmd->command, cmd->command_length) < 0) {
elog(LOG_WARNING, "Can't write command strings: %m");
goto fail;
}
close(out);
/* create response buffer */
cmd->response = buf_new();
/* set up event to read back.. */
if (g_event_add(in, FUSD_NOTIFY_INPUT | FUSD_NOTIFY_EXCEPT,
dts_exec_command_handle_response,
cmd, NULL, &(cmd->response_event)) < 0) {
elog(LOG_WARNING, "Can't create event: %m");
goto fail;
}
}
/* child... */
else if (pid == 0) {
struct rlimit limit;
int in = out_fds[0];
int out = in_fds[1];
/* reinstate default signal handlers */
signal(SIGCHLD, SIG_DFL);
signal(SIGTERM, SIG_DFL);
signal(SIGHUP, SIG_DFL);
signal(SIGINT, SIG_DFL);
/* clear the signal mask - may cause a sigint to be delivered */
sigset_t sigset;
sigemptyset(&sigset);
sigprocmask(SIG_SETMASK, &sigset, NULL);
/* detach from parent's signals */
setpgrp();
/* close all file descriptors. If rlimit doesn't work, guess
* there are 256 of them. */
if (getrlimit(RLIMIT_NOFILE, &limit) < 0)
limit.rlim_cur = 256;
/* close everything except pipes */
int i;
for (i = 0; i < limit.rlim_cur; i++)
if (i != in && i != out) close(i);
/* connect stdout and stderr to pipe */
if (out != STDOUT_FILENO && dup2(out, STDOUT_FILENO) != STDOUT_FILENO)
exit(253);
if (out != STDERR_FILENO && dup2(out, STDERR_FILENO) != STDERR_FILENO)
exit(252);
if (in != STDIN_FILENO && dup2(in, STDIN_FILENO) != STDIN_FILENO)
exit(251);
/* close orig.. */
if (out != STDOUT_FILENO && out != STDERR_FILENO && out != STDIN_FILENO)
close(out);
if (in != STDOUT_FILENO && in != STDERR_FILENO && in != STDIN_FILENO)
close(in);
/* ok, exec the shell.. */
execl("/bin/sh", "/bin/sh", NULL);
/* failed?? */
exit(250);
}
/* fork failed */
else {
elog(LOG_WARNING, "Can't fork!!!");
goto fail;
}
dts_ondisk_store_exec_command(dts, dtsce);
return 0;
fail:
close(in_fds[0]);
close(in_fds[1]);
close(out_fds[0]);
close(out_fds[1]);
bufcpy(cmd->response, "- DTS: Failed", 13);
dts_exec_command_reply(dts, cmd, -253);
dts_exec_command_cancel(dtsce, cmd);
return 1;
}
static
int dts_exec_command_gc_now(dts_t *dts, dts_command_el_t *dtsce)
{
int now = time(0);
exec_command_t *tmp = dtsce->exec_cmd;
/* if (tmp->time_ended &&
((now - tmp->time_ended) > EXEC_COMMAND_LINGER)) {
dts_exec_command_cancel(tmp);
}
else */
if ((tmp->time_ended == 0) && tmp->time_started &&
((now - tmp->time_started) > EXEC_COMMAND_MAXWAIT)) {
elog(LOG_WARNING, "timing out on command");
bufcpy(tmp->response, "- DTS: Timeout", 14);
dts_exec_command_reply(dts, tmp, -254);
// dts_exec_command_cancel(dtsce, tmp);
}
return 0;
}
int dts_exec_command_gc_timer(void *data, int interval, g_event_t *event)
{
dts_t *dts = (dts_t *) data;
dts_command_el_t *dtsce = NULL;
for (dtsce = dts_commands_top(dts); dtsce;
dtsce = dts_commands_next(dtsce)) {
if (dtsce->executed && dtsce->exec_cmd) {
dts_exec_command_gc_now(dts, dtsce);
}
}
return EVENT_RENEW;
}
static
int dts_exec_command_handle_sigchild(void *data, int signo, g_signal_context_t *sig_event)
{
elog(LOG_DEBUG(15), "command handle sigchild");
pid_t pid;
int status;
dts_t *dts = (dts_t *)data;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
/* search for command that matches this PID */
dts_command_el_t *dtsce = NULL;
exec_command_t *cmd = NULL;
for (dtsce = dts_commands_top(dts);
dtsce; dtsce = dts_commands_next(dtsce)) {
if (dtsce->exec_cmd)
if (dtsce->exec_cmd->child_pid == pid) {
cmd = dtsce->exec_cmd;
goto found;
}
}
/* not found.. */
elog(LOG_ERR, "error: process with unknown pid %d exited!", pid);
continue;
found:
elog(LOG_DEBUG(0), "caught %s (pid %d), exit status=%d, signal=%d",
//elog(LOG_WARNING, "caught %s (pid %d), exit status=%d, signal=%d",
signal_name(signo), pid,
WEXITSTATUS(status), WTERMSIG(status));
cmd->exit_status = status;
cmd->exited = 1;
/* read any remaining data.. */
if (cmd->output_fd >= 0)
dts_exec_command_handle_response(cmd, cmd->output_fd, FUSD_NOTIFY_INPUT, NULL);
/* reply */
dts_exec_command_reply(dts, cmd, status);
// dts_exec_command_cancel(dtsce, cmd);
}
if (pid < 0 && errno != ECHILD)
elog(LOG_ERR, "calling waitpid: %m");
return EVENT_RENEW;
}
void dts_exec_command_handler_init(dts_t *dts)
{
g_signal_opts_t opts = {
signo: SIGCHLD,
callback: dts_exec_command_handle_sigchild,
data: dts
};
if (g_signal_handler(&opts, NULL) < 0) {
elog(LOG_CRIT, "can't attach to sigchld signal: %m");
exit(1);
}
if (g_timer_add(2000, dts_exec_command_gc_timer,
dts, NULL, &(dts->command_gc_timer)) < 0) {
elog(LOG_CRIT, "Unable to create command gc timer: %m");
exit(1);
}
}
int sort_dts_commands(const void *a, const void *b)
{
return ( ((dts_command_el_t *) b)->command->sequence_number -
((dts_command_el_t *) a)->command->sequence_number );
}
int dts_command_exec_timer_fired(void *data, int interval, g_event_t *event)
{
dts_t * dts = (dts_t *) data;
elog(LOG_DEBUG(15), "exec timer fire");
int num_cmds = dts_commands_qlen(dts);
uint32_t oldest_seqno = 0;
dts_command_el_t *exec_me = NULL;
dts_command_el_t *(command_view[num_cmds]);
if (num_cmds == 0) {
goto done;
}
dts_command_fill_array(dts, command_view);
qsort(command_view, num_cmds, sizeof(dts_command_el_t *),
sort_dts_commands);
/* test print */
int k = 0;
for (k = 0; k < num_cmds; ++k) {
if (!command_view[k]->executed)
elog(LOG_WARNING, "To exec: Cmd %i: src: %i seq: %i",
k, command_view[k]->command->command_src_node,
command_view[k]->command->sequence_number);
}
/* NOT HAPPY WITH THIS!!! */
int i = 0;
for (i = 0; i < num_cmds; ++i) {
if (command_view[i]->executed == 0) {
oldest_seqno = dts_sequence_numbers_find_for_node(dts,
command_view[i]->command->command_src_node);
if (oldest_seqno == 0)
continue;
if (oldest_seqno == command_view[i]->command->sequence_number) {
exec_me = command_view[i];
break;
}
/* find if everything between oldest_seqno and this commands
sequence number are executed */
int j = 0;
int needed = command_view[i]->command->sequence_number - oldest_seqno;
for (j = 0; j < num_cmds; ++j) {
if ( (command_view[j]->command->command_src_node
== command_view[i]->command->command_src_node)
&&
(command_view[j]->command->sequence_number
< command_view[i]->command->sequence_number)
&&
(command_view[j]->command->sequence_number
>= oldest_seqno) ) {
needed -= 1;
}
}
if (needed == 0) {
exec_me = command_view[i];
break;
}
}
}
if (exec_me == NULL) {
goto done;
}
dts->command_exec_counter -= 1;
elog(LOG_DEBUG(4), "csn: %i el: %i cdst: %i me: %i",
exec_me->command->command_src_node,
exec_me->dont_exec_localy,
exec_me->command->command_dst_node,
my_node_id
);
/* final check... make sure we do this locally */
if (exec_me->command->command_src_node == my_node_id &&
exec_me->dont_exec_localy) {
exec_me->executed = 1;
dts_response_do_response_ack(dts, exec_me);
elog(LOG_WARNING, "No local set, not executing");
goto again;
}
if (exec_me->command->command_dst_node != 0 &&
exec_me->command->command_dst_node != my_node_id) {
exec_me->executed = 1;
dts_response_do_response_ack(dts, exec_me);
elog(LOG_WARNING, "Not the dts, not executing");
goto again;
}
int res = 0;
switch(exec_me->command->command_type) {
case DTS_COMMAND_TYPE_DEFAULT:
elog(LOG_WARNING, "Executing %i:%i",
exec_me->command->command_src_node,
exec_me->command->sequence_number);
res = dts_exec_process_command(dts, exec_me);
break;
case DTS_COMMAND_TYPE_TRANSFER:
res = dts_exec_xfer_process_xfer(dts, exec_me);
break;
case DTS_COMMAND_TYPE_STATUS:
res = dts_exec_status_process_status(dts, exec_me);
break;
default:
elog(LOG_WARNING, "Unknown command type %x",
exec_me->command->command_type);
res = 1;
}
/* this means something failed so we cannot rely on
someone else to startup the timer again */
if (res == 0) {
goto done;
}
again:
if (dts->command_exec_counter > 0) {
return EVENT_RENEW;
}
done:
elog(LOG_DEBUG(10), "Done exec for now... resched for 15 seconds");
g_timer_resched(dts->command_exec_timer, 15000);
// g_event_destroy(event);
// dts->command_exec_timer = NULL;
return EVENT_RENEW;
// return TIMER_RENEW_MS(15000);
}
See more files for this project here