libusfusd_socket.c from EmStar at Krugle
Show libusfusd_socket.c syntax highlighted
/*
*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "libusfusd_i.h"
/*
* connection and thread releasing and destruction
*/
int usfusd_conn_destroyed(usfusd_conn_t *c)
{
return (c == NULL) || (c->destroyed);
}
void usfusd_conn_destroy(usfusd_conn_t *c)
{
usfusd_t *state = c->parent;
if (c) {
/* call the callbacks to trigger activity */
if (!c->destroyed) {
int i;
c->destroyed = 1;
c->refcount++;
/* force close for any threads blocked on this */
for (i=0; i<state->thread_count; i++) {
usfusd_thread_t *t = state->threads[i];
if (t && (t->curr_conn == c)) {
fusd_msg_t close_msg = {
magic: FUSD_MSG_MAGIC,
cmd: FUSD_FOPS_REPLY,
subcmd: FUSD_CLOSE,
};
/* set up close reply */
t->reply_msg = malloc(sizeof(fusd_msg_t));
memmove(t->reply_msg, &close_msg, sizeof(fusd_msg_t));
/* wake thread.. */
usfusd_thread_wake(t);
}
}
/* wake reader */
if (state->reader)
usfusd_thread_wake(state->reader);
c->refcount--;
}
if (c->refcount <= 0) {
state->conns[c->socket] = NULL;
if (c->path) free(c->path);
__close(c->socket);
free(c);
}
}
}
void usfusd_conn_release(usfusd_conn_t *c)
{
if (c) {
c->refcount--;
if (c->destroyed)
usfusd_conn_destroy(c);
}
}
void usfusd_thread_release_conn(usfusd_thread_t *t)
{
usfusd_conn_t *conn = t->curr_conn;
if (t->pending_msg) {
if (conn) {
switch (t->pending_msg->subcmd) {
case FUSD_READ:
conn->read_call = 0; break;
case FUSD_WRITE:
conn->write_call = 0; break;
case FUSD_IOCTL:
conn->ioctl_call = 0; break;
}
pthread_cond_broadcast(&(conn->call_block));
}
free(t->pending_msg);
t->pending_msg = NULL;
}
if (t->reply_msg) free(t->reply_msg);
t->reply_msg = NULL;
if (conn)
usfusd_conn_release(conn);
t->curr_conn = NULL;
}
void usfusd_thread_release_read_mutex(usfusd_thread_t *t)
{
usfusd_t *state = t->parent;
if (state->reader == t) {
int i;
state->reader = NULL;
/* unref sockets being read */
for (i=0; i<FD_SETSIZE; i++) {
usfusd_conn_t *c = state->conns[i];
if (c && (c->read_active)) {
c->read_active = 0;
usfusd_conn_release(c);
}
}
pthread_mutex_unlock(&state->read_mutex);
}
}
/*
* usfusd_thread_lookup()
* looks up a thread structure, creating one if needed
*/
/* Key for the thread-specific state */
static pthread_key_t usfusd_key;
/* cleanup handler to free state allocated re this thread */
static
void usfusd_thread_cleanup(void *data)
{
int i;
usfusd_thread_t *c = (usfusd_thread_t *)data;
usfusd_t *state = c->parent;
pthread_mutex_lock(&(state->mutex));
for (i=0; i<state->thread_count; i++) {
if (state->threads[i] == c) {
/* release read mutex if reader */
usfusd_thread_release_read_mutex(c);
/* close the notify channel */
__close(c->pipe[0]);
__close(c->pipe[1]);
/* free pointers */
if (c->sfds) free(c->sfds);
/* release connection if blocked */
usfusd_thread_release_conn(c);
/* clear from thread list */
state->threads[i] = NULL;
free(c);
break;
}
}
pthread_mutex_unlock(&(state->mutex));
}
usfusd_thread_t *usfusd_thread_lookup(usfusd_t *state)
{
int pid = getpid();
usfusd_thread_t *c=NULL;
int i,j;
pthread_mutex_lock(&(state->mutex));
/* try fast lookup.. */
if ((c = pthread_getspecific(usfusd_key)))
goto found;
/* linear search */
for (i=0,j=-1; i<state->thread_count; i++)
if (state->threads[i]) {
if (state->threads[i]->pid == pid) {
c = state->threads[i];
elog(LOG_CRIT, "hmm, weird..");
goto found;
}
}
else j=i;
/* not found.. */
/* extend the thread state vector */
if (j < 0) {
state->threads = realloc(state->threads, sizeof(usfusd_thread_t *)*(state->thread_count+1));
if (state->threads == NULL) {
elog(LOG_CRIT, "Can't alloc new thread state! %m... aborting\n");
exit(1);
}
j = state->thread_count;
state->thread_count++;
}
/* create new client state */
c = malloc(sizeof(usfusd_thread_t));
if (c == NULL) {
elog(LOG_CRIT, "Can't alloc new thread state (2)! %m... aborting\n");
exit(1);
}
/* initialize new client. */
memset(c, 0, sizeof(usfusd_thread_t));
c->pid = pid;
c->parent = state;
if (pipe(c->pipe) < 0) {
elog(LOG_CRIT, "Failed to create wakeup pipe: %m");
exit(1);
}
/* add client to list */
state->threads[j] = c;
/* set cleanup handler */
pthread_key_create(&usfusd_key, usfusd_thread_cleanup);
pthread_setspecific(usfusd_key, c);
found:
pthread_mutex_unlock(&state->mutex);
return c;
}
/*
* Wakeup interface. A pipe is used to enable a thread's poll call to be interrupted.
*/
void usfusd_thread_wake(usfusd_thread_t *thread)
{
if (write(thread->pipe[1], "", 1) != 1) {
elog(LOG_WARNING, "Failed to write to wakeup pipe! %m\n");
}
}
void usfusd_process_wakeup(usfusd_thread_t *thread, struct pollfd *pfd)
{
if (pfd->fd == thread->pipe[0]) {
if (pfd->revents & POLLIN) {
char buf[4096];
if (read(thread->pipe[0], buf, sizeof(buf)) < 0) {
if (errno != EINTR) {
elog(LOG_WARNING, "Read error from wakeup pipe?? %m\n");
}
}
}
}
else
elog(LOG_WARNING, "poll config problem\n");
}
/*
* Poll state update functions
*
* usfusd_notify_poll_state() is used to record new poll state
* usfusd_update_poll_state() is used to reflect current state in poll_fd list.
*/
/* returns true on immediate trigger */
int usfusd_update_poll_state(usfusd_thread_t *thread, struct pollfd *fds, int fd_count, int *v_errno)
{
int i;
int trigger = 0;
pthread_mutex_lock(&thread->parent->mutex);
/* advertise poll state */
if (thread->sfds)
elog(LOG_WARNING, "Warning.. unexpectedly replaced poll advertisement??\n");
thread->sfds = fds;
thread->sfd_count = fd_count;
/* check right now.. */
for (i=0; i<fd_count; i++) {
if (thread->parent->conns[fds[i].fd] == NULL)
*v_errno = EBADF;
else {
fds[i].revents = fds[i].events & thread->parent->conns[fds[i].fd]->poll_state;
if (fds[i].revents)
trigger++;
}
}
pthread_mutex_unlock(&thread->parent->mutex);
return trigger;
}
int usfusd_cwrite_msg(usfusd_conn_t *conn, fusd_msg_t *msg, char *data_buf)
{
int retval = -1;
if (conn) {
pthread_mutex_lock(&conn->write_mutex);
retval = usfusd_write_msg(conn->socket, msg, data_buf);
pthread_mutex_unlock(&conn->write_mutex);
}
return retval;
}
int usfusd_poll_request(usfusd_conn_t *conn)
{
fusd_msg_t poll_msg = {
magic: FUSD_MSG_MAGIC,
cmd: FUSD_FOPS_CALL,
subcmd: FUSD_POLL_DIFF,
parm: {
fops_msg: {
cmd: conn->poll_state
}
}
};
usfusd_fill_msg(&poll_msg, conn);
return usfusd_cwrite_msg(conn, &poll_msg, NULL);
}
static
int usfusd_notify_poll_state(usfusd_conn_t *conn, int new_state)
{
int i,j;
int retval = 0;
usfusd_t *state = conn->parent;
pthread_mutex_lock(&state->mutex);
/* set the new state */
retval = (conn->poll_state != new_state);
conn->poll_state = new_state;
/* check each thread's advertised fds */
for (i=0; i<state->thread_count; i++)
if (state->threads[i] && state->threads[i]->sfds)
for (j=0; j<state->threads[i]->sfd_count; j++)
if (conn->socket == state->threads[i]->sfds[j].fd) {
int revents =
(state->threads[i]->sfds[j].events & new_state);
if (revents) {
/* update the revents field */
state->threads[i]->sfds[j].revents = revents;
/* Trigger this thread to wake */
usfusd_thread_wake(state->threads[i]);
}
}
pthread_mutex_unlock(&state->mutex);
return retval;
}
/*
* usfusd_dispatch_incoming()
* handles poll_diff replies
* calls appropriate handler for waiting threads
*/
int usfusd_dispatch_incoming(usfusd_conn_t *conn, fusd_msg_t *msg)
{
usfusd_t *state = conn->parent;
usfusd_thread_t *t=NULL;
int i;
int retval = 0;
if (msg->cmd != FUSD_FOPS_REPLY) {
elog(LOG_WARNING, "recd non reply from fusd??");
return 0;
}
/* handle various message types */
/* handle poll_diff */
if (msg->subcmd == FUSD_POLL_DIFF) {
usfusd_notify_poll_state(conn, msg->parm.fops_msg.arg);
usfusd_poll_request(conn);
return 0;
}
/* otherwise look for recipient */
pthread_mutex_lock(&state->mutex);
for (i=0; i<state->thread_count; i++) {
t=state->threads[i];
if (t && t->pending_msg &&
(conn == t->curr_conn) &&
(t->pending_msg->subcmd == msg->subcmd))
goto found;
}
elog(LOG_WARNING, "no pending message waiting??");
goto out;
found:
/* ok, save the response and wake thread */
t->reply_msg = msg;
usfusd_thread_wake(t);
retval = 1;
out:
pthread_mutex_unlock(&state->mutex);
return retval;
}
/*
* usfusd_process_sockets()
* services readable fusd sockets without blocking
*/
void usfusd_process_sockets(usfusd_thread_t *t, struct pollfd *fds, int fd_count)
{
int i;
for (i=0; i<fd_count; i++)
if (fds[i].revents & POLLIN) {
char *buf[4096];
int retval;
usfusd_conn_t *c = t->parent->conns[fds[i].fd];
fusd_msg_t *msg;
/* fusd state must be present */
if (c == NULL) {
elog(LOG_WARNING, "Trying to process non-fusd socket??");
goto next;
}
/* ok, attempt a read */
retval = read(fds[i].fd, buf, sizeof(buf));
/* read error */
if (retval < 0) {
if ((errno == EAGAIN) || (errno == EINTR))
goto next;
elog(LOG_WARNING, "error reading from socket: %m\n");
goto close;
}
/* eof */
if (retval == 0)
goto close;
/* store new data */
if (c->incoming == NULL)
c->incoming = buf_new();
bufcpy(c->incoming, buf, retval);
msg = (fusd_msg_t *)(c->incoming->buf);
/* got a message yet? */
if ((c->incoming->len >= sizeof(fusd_msg_t)) &&
(c->incoming->len >= sizeof(fusd_msg_t) + msg->datalen)) {
buf_t *new_buf = NULL;
int msglen = sizeof(fusd_msg_t) + msg->datalen;
int no_free = 1;
if (c->incoming->len > msglen) {
new_buf = buf_new();
bufcpy(new_buf, c->incoming->buf + msglen,
c->incoming->len - msglen);
}
/* dispatch it */
no_free = usfusd_dispatch_incoming(c, msg);
if (no_free)
c->incoming->buf = NULL;
buf_free(c->incoming);
c->incoming = new_buf;
}
goto next;
close:
usfusd_conn_destroy(c);
next:
continue;
}
}
/* called with mutex locked, returns pointer to the lock var */
void usfusd_lock_type(usfusd_thread_t *thread, usfusd_conn_t *conn, fusd_msg_t *msg)
{
int *call=NULL;
switch (msg->subcmd) {
case FUSD_READ:
call = &(conn->read_call); break;
case FUSD_WRITE:
call = &(conn->write_call); break;
case FUSD_IOCTL:
call = &(conn->ioctl_call); break;
}
if (call == NULL) {
elog(LOG_WARNING, "locking wrong type??");
}
conn->refcount++;
thread->pending_msg = msg;
thread->reply_msg = NULL;
thread->curr_conn = conn;
/* lock request */
while (*call)
pthread_cond_wait(&conn->call_block, &conn->parent->mutex);
*call = 1;
}
See more files for this project here