libusfusd_fops.c from EmStar at Krugle
Show libusfusd_fops.c syntax highlighted
/*
*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "libusfusd_i.h"
#define FOPS_RETVAL(x) ((x)->parm.fops_msg.retval)
/*
* global per-process state variable
*/
static usfusd_t state = {
mutex: PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
read_mutex: PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
};
void *__usfusd_thread_lookup()
{ return usfusd_thread_lookup(&state); }
/*
* usfusd_close()
*
* Closes an existing connection to ufusd, if open.
*
*/
int usfusd_close(int fd)
{
int retval = 0;
pthread_mutex_lock(&state.mutex);
if (state.conns[fd])
usfusd_conn_destroy(state.conns[fd]);
else {
errno = EBADF;
retval = -1;
}
pthread_mutex_unlock(&state.mutex);
return retval;
}
/*
* usfusd_open()
*
* Establishes a new connection to a usfusd daemon. Sends the
* open request and waits for a reply. Then it writes an initial
* poll_diff request. Also sets up new connection state, etc.
*
*/
int usfusd_open(const char *path, mode_t mode)
{
int retval = -1;
int sock = -1;
fusd_msg_t open_msg = {
magic: FUSD_MSG_MAGIC,
cmd: FUSD_FOPS_CALL,
subcmd: FUSD_OPEN
};
usfusd_conn_t conn = {
flags: mode,
call_block: PTHREAD_COND_INITIALIZER,
write_mutex: PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
parent: &state
};
/* fill in rest of messages */
usfusd_fill_msg(&open_msg, &conn);
/* parse the path and fill it into conn struct */
if (usfusd_parse_path(&conn, path) < 0)
goto out;
/* check name length */
open_msg.datalen = strlen(conn.path)+1;
if (open_msg.datalen > FUSD_MAX_NAME_LENGTH) {
errno = -ENAMETOOLONG;
goto out;
}
/* connect to this address */
sock = usfusd_connect(&(conn.dest));
if (sock < 0)
goto out;
/* save the socket */
conn.socket = sock;
/* send a client open request */
if (usfusd_write_msg(conn.socket, &open_msg, conn.path) < 0)
goto out;
/* read back reply */
if (usfusd_read_msg(conn.socket, &open_msg, NULL, 0) < 0)
goto out;
/* check retval */
if (FOPS_RETVAL(&open_msg)) {
errno = -FOPS_RETVAL(&open_msg);
goto out;
}
/* start off with a poll_diff request */
if (usfusd_poll_request(&conn) < 0)
goto out;
/* OK, now create new connection state */
pthread_mutex_lock(&state.mutex);
/* should not be stuff there already */
if (state.conns[sock]) {
elog(LOG_CRIT, "Conn state already present??");
kill(getpgrp(), SIGSEGV);
}
/* malloc and copy */
state.conns[sock] = malloc(sizeof(usfusd_conn_t));
memmove(state.conns[sock], &conn, sizeof(usfusd_conn_t));
pthread_mutex_unlock(&state.mutex);
/* success! */
return 0;
out:
retval = errno;
if (conn.path) free(conn.path);
if (sock >= 0) __close(sock);
errno = retval;
return -1;
}
/*
* usfusd_poll()
*
* This function is a wrapper around poll() that also enables
* hooks to the usfusd sockets protocol and to unblocking.
*/
int usfusd_poll(struct pollfd *ufds, unsigned int nfds, int timeout)
{
/* reformulate the poll vector. count which fds are fusd sockets,
* and modify */
int retval = 0;
int i,j;
int fusd = 0;
int non_fusd = 0;
int new_poll = 0;
int read_mode = 0;
buf_t *fds = buf_new();
struct pollfd *pfds;
buf_t *sock_fds = buf_new();
usfusd_thread_t *thread = usfusd_thread_lookup(&state);
struct timeval start;
struct timeval now;
int remain_time;
int first_pass = 1;
int trigger = 0;
int v_errno = 0;
/* record time of start */
gettimeofday(&start, NULL);
pthread_mutex_lock(&state.mutex);
/* count fusd and non-fusd */
for (i=0; i<nfds; i++) {
if (state.conns[ufds[i].fd]) {
/* check for destroyed flag */
if (usfusd_conn_destroyed(state.conns[ufds[i].fd])) {
v_errno = EBADF;
goto free;
}
bufcpy(sock_fds, &(ufds[i]), sizeof(struct pollfd));
fusd++;
}
else {
/* add non-fusd fds */
non_fusd++;
bufcpy(fds, &(ufds[i]), sizeof(struct pollfd));
}
}
/* add the wakeup pipe */
{
struct pollfd fd = {
fd: thread->pipe[0],
events: POLLIN
};
bufcpy(fds, &fd, sizeof(struct pollfd));
}
/* try to get read mutex */
if (pthread_mutex_trylock(&state.read_mutex) == 0) {
read_mode = 1;
state.reader = thread;
/* add in fusd devs */
for (i=0; i<FD_SETSIZE; i++)
if (!usfusd_conn_destroyed(state.conns[i])) {
struct pollfd fd = {
fd: i,
events: POLLIN
};
state.conns[i]->refcount++;
state.conns[i]->read_active++;
bufcpy(fds, &fd, sizeof(struct pollfd));
}
}
/* advertise the requested poll state, trigger immediately if needed */
trigger = usfusd_update_poll_state(thread, (struct pollfd *)sock_fds->buf, fusd, &v_errno);
pthread_mutex_unlock(&state.mutex);
/* compute fd count */
new_poll = fds->len / sizeof(struct pollfd);
/* if we already triggered, just to a polling poll call */
if (trigger) timeout = 0;
repeat_poll:
/* run poll */
if (timeout > 0) {
gettimeofday(&now, NULL);
remain_time = timeout - (misc_tv_offset_neg(&now, &start) / 1000);
if (remain_time < 0) {
remain_time = 0;
if (!first_pass) goto timedout;
}
}
else if (timeout < 0) remain_time = -1;
else if (!first_pass) goto timedout;
else remain_time = 0;
/* poll at least once.. */
first_pass = 0;
/* ok, now poll! */
pfds = (struct pollfd *)fds->buf;
retval = poll(pfds, new_poll, remain_time);
/* on error save errno */
if (retval < 0)
v_errno = errno;
/* if nothing was ready or on error, there's nothing to do.. */
if (retval <= 0)
goto timedout;
/* Handle the wakeup pipe */
usfusd_process_wakeup(thread, pfds + non_fusd);
/* if in read mode process the sockets */
if (read_mode)
usfusd_process_sockets(thread, pfds + non_fusd + 1, new_poll - non_fusd - 1);
/* count the triggers */
trigger = 0;
/* count real triggers */
pfds = (struct pollfd *)fds->buf;
for (i=0; i<non_fusd; i++)
if (pfds[i].revents) trigger++;
/* count virtual triggers */
pthread_mutex_lock(&state.mutex);
if (thread->sfds)
for (i=0; i<thread->sfd_count; i++)
if (thread->sfds[i].revents) trigger++;
pthread_mutex_unlock(&state.mutex);
/* keep polling if no requests have triggered */
if ((trigger == 0) &&
((thread->pending_msg == NULL) ||
(thread->reply_msg == NULL)))
goto repeat_poll;
timedout:
pthread_mutex_lock(&state.mutex);
/* copy back poll reply */
for (i=0,j=0; i<nfds; i++) {
if (state.conns[ufds[i].fd]) {
if (usfusd_conn_destroyed(state.conns[ufds[i].fd]))
goto except;
ufds[i].revents = ufds[i].events & state.conns[ufds[i].fd]->poll_state;
}
else {
if (pfds[j].fd != ufds[i].fd) {
int k;
for (k=0; k<new_poll; k++)
if (pfds[k].fd == ufds[i].fd) {
j = k;
goto copy;
}
goto except;
}
copy:
ufds[i].revents = pfds[j].revents;
j++;
continue;
except:
ufds[i].revents = ufds[i].events & (POLLPRI | POLLERR | POLLHUP);
}
}
free:
/* free stuff */
buf_free(fds);
buf_free(sock_fds);
/* clear poll requested state */
thread->sfds = NULL;
thread->sfd_count = 0;
/* unlock the read mutex */
usfusd_thread_release_read_mutex(thread);
pthread_mutex_unlock(&state.mutex);
/* set return value */
if (v_errno) {
errno = v_errno;
return -1;
}
return trigger;
}
/*
* usfusd_read()
*/
int usfusd_read(int fd, char *buf, int length)
{
usfusd_thread_t *thread = usfusd_thread_lookup(&state);
usfusd_conn_t *conn;
int retval = -1;
int v_errno = 0;
fusd_msg_t read_msg = {
subcmd: FUSD_READ
};
/* lock read request */
pthread_mutex_lock(&state.mutex);
conn = state.conns[fd];
if (conn)
usfusd_lock_type(thread, conn, &read_msg);
pthread_mutex_unlock(&state.mutex);
/* poll */
if (!usfusd_conn_destroyed(conn))
usfusd_poll(NULL, 0, -1);
pthread_mutex_lock(&state.mutex);
if (!usfusd_conn_destroyed(conn)) {
/* check returned message */
if (thread->reply_msg && (thread->reply_msg->subcmd == FUSD_READ)) {
if (FOPS_RETVAL(thread->reply_msg))
v_errno = FOPS_RETVAL(thread->reply_msg);
else {
retval = i_min(thread->reply_msg->datalen, length);
memmove(buf, ((char*)thread->reply_msg) + sizeof(fusd_msg_t),
retval);
}
goto release;
}
}
v_errno = EBADF;
release:
usfusd_thread_release_conn(thread);
pthread_mutex_unlock(&state.mutex);
if (v_errno) errno = v_errno;
return retval;
}
See more files for this project here