Code Search for Developers
 
 
  

thread.cpp from Equalizer - Multipipe Rendering at Krugle


Show thread.cpp syntax highlighted


/* Copyright (c) 2005-2007, Stefan Eilemann <eile@equalizergraphics.com> 
   All rights reserved. */

#include "thread.h"

#include "base.h"
#include "debug.h"
#include "lock.h"
#include "log.h"
#include "scopedMutex.h"
#include "executionListener.h"

#include <errno.h>
#include <pthread.h>

using namespace eqBase;
using namespace std;

/* 
 * EQ_WIN32_SDP_JOIN_WAR: When using SDP connections on Win32, the join() of the
 * receiver thread blocks indefinitely, even after the thread has exited. This
 * workaround uses a monitor to implement the join functionality independently
 * of pthreads.
 */

SpinLock                        Thread::_listenerLock;
std::vector<ExecutionListener*> Thread::_listeners;
pthread_key_t                   Thread::_cleanupKey=Thread::_createCleanupKey();

#ifdef EQ_CHECK_THREADSAFETY
static pthread_t getThreadIdZero()
{
    pthread_t id;
    memset( &id, 0, sizeof( pthread_t ));
    return id;
}

pthread_t                       eqBase::threadIdZero = getThreadIdZero();
#endif

pthread_key_t Thread::_createCleanupKey()
{
    const int error = pthread_key_create(&_cleanupKey, Thread::_notifyStopping);
    if( error )
    {
        EQERROR
            << "Can't create thread-specific key for thread cleanup handler: " 
            << strerror( error ) << std::endl;
        EQASSERT( !error );
    }
    return _cleanupKey;
}

Thread::Thread()
        : _state(STATE_STOPPED)
#ifdef EQ_WIN32_SDP_JOIN_WAR
        , _retVal( 0 )
#endif
{
    memset( &_threadID, 0, sizeof( pthread_t ));
    _syncChild.set();
}

Thread::~Thread()
{
}

void* Thread::runChild( void* arg )
{
    Thread* thread = static_cast<Thread*>(arg);
    thread->_runChild();
    return NULL; // not reached
}

void Thread::_runChild()
{
#ifdef EQ_WIN32_SDP_JOIN_WAR
    _running = true;
#endif
    _threadID = pthread_self(); // XXX remove, set during create already?

    if( !init( ))
    {
        EQINFO << "Thread failed to initialise" << endl;
        _state = STATE_STOPPED;
        _syncChild.unset();
        pthread_exit( NULL );
    }

    _state    = STATE_RUNNING;
    EQINFO << "Thread successfully initialised" << endl;
    pthread_setspecific( _cleanupKey, this ); // install cleanup handler
    _notifyStarted();
    _syncChild.unset(); // sync w/ parent

    void* result = run();
    this->exit( result );
    EQUNREACHABLE;
}

void Thread::_notifyStarted()
{
    ScopedMutex< SpinLock > mutex( _listenerLock );

    EQINFO << "Calling " << _listeners.size() << " thread started listeners"
           << endl;
    for( vector<ExecutionListener*>::const_iterator iter = _listeners.begin();
         iter != _listeners.end(); ++iter )
        
        (*iter)->notifyExecutionStarted();
}

void Thread::_notifyStopping( void* )
{
    pthread_setspecific( _cleanupKey, NULL );

    ScopedMutex< SpinLock > mutex( _listenerLock );
    EQINFO << "Calling " << _listeners.size() << " thread stopping listeners"
           <<endl;
    for( vector<ExecutionListener*>::const_iterator iter = _listeners.begin();
         iter != _listeners.end(); ++iter )
        
        (*iter)->notifyExecutionStopping();
}

bool Thread::start()
{
    if( _state != STATE_STOPPED )
        return false;

    _state = STATE_STARTING;

    pthread_attr_t attributes;
    pthread_attr_init( &attributes );
    pthread_attr_setscope( &attributes, PTHREAD_SCOPE_SYSTEM );

    int nTries = 10;
    while( nTries-- )
    {
        const int error = pthread_create( &_threadID, &attributes,
                                          runChild, this );

        if( error == 0 ) // succeeded
        {
            EQVERB << "Created pthread " << this << endl;
            break;
        }
        if( error != EAGAIN || nTries==0 )
        {
            EQWARN << "Could not create thread: " << strerror( error )
                   << endl;
            return false;
        }
    }

    _syncChild.set(); // sync with child's entry func
    _state = STATE_RUNNING;
    return true;
}

void Thread::exit( void* retVal )
{
    EQASSERTINFO( isCurrent(), "Thread::exit not called from child thread" );

    EQINFO << "Exiting thread" << endl;
    _state = STATE_STOPPING;

#ifdef EQ_WIN32_SDP_JOIN_WAR
    _running = false;
    _retVal  = retVal;
#endif

    pthread_exit( (void*)retVal );
    EQUNREACHABLE;
}

void Thread::cancel()
{
    EQASSERTINFO( !isCurrent(), "Thread::cancel called from child thread" );

    EQINFO << "Cancelling thread" << endl;
    _state = STATE_STOPPING;

    pthread_cancel( _threadID );
    EQUNREACHABLE;
}

bool Thread::join( void** retVal )
{
    if( _state == STATE_STOPPED )
        return false;
    if( isCurrent( )) // can't join self
        return false;

    EQVERB << "Joining thread" << endl;
#ifdef EQ_WIN32_SDP_JOIN_WAR
    _running.waitEQ( false );
#else
    void *_retVal;
    const int error = pthread_join( _threadID, &_retVal);
    if( error != 0 )
    {
        EQWARN << "Error joining thread: " << strerror(error) << endl;
        return false;
    }
#endif

    _state = STATE_STOPPED;
    if( retVal )
        *retVal = _retVal;
    return true;
}

bool Thread::isCurrent() const
{
    return pthread_equal( pthread_self(), _threadID );
}

void Thread::addListener( ExecutionListener* listener )
{
    ScopedMutex< SpinLock > mutex( _listenerLock );
    _listeners.push_back( listener );
}

std::ostream& eqBase::operator << ( std::ostream& os, const Thread* thread )
{
#ifdef WIN32_VC
    os << "Thread " << thread->_threadID.p;
#else
    os << "Thread " << thread->_threadID;
#endif
	os << " state " 
		<< ( thread->_state == Thread::STATE_STOPPED ? "stopped" :
			thread->_state == Thread::STATE_STARTING ? "starting" :
			thread->_state == Thread::STATE_RUNNING ? "running" :
			thread->_state == Thread::STATE_STOPPING ? "stopping" : "unknown" );

#ifdef WIN32_VC
	os << " called from " << pthread_self().p;
#else
	os << " called from " << pthread_self();
#endif

    return os;
}




See more files for this project here

Equalizer - Multipipe Rendering

Equalizer is a programming interface and resource management system for scalable graphics applications for clusters and shared memory systems. It is build upon a scalable programming interface solving the problems common to any multipipe application.

Project homepage: http://sourceforge.net/projects/equalizer
Programming language(s): C,C++
License: lgpl21

  barrier.cpp
  barrier.h
  base.h
  clock.h
  debug.cpp
  debug.h
  definesWin32.h
  executionListener.h
  hash.h
  idPool.cpp
  idPool.h
  launcher.cpp
  launcher.h
  lock.cpp
  lock.h
  log.cpp
  log.h
  monitor.h
  mtQueue.h
  nonCopyable.h
  perThread.h
  process.cpp
  process.h
  refPtr.h
  referenced.h
  requestHandler.cpp
  requestHandler.h
  rng.h
  scopedMutex.h
  sema.cpp
  sema.h
  spinLock.cpp
  spinLock.h
  stdExt.h
  thread.cpp
  thread.h
  timedLock.cpp
  timedLock.h
  types.h
  userdata.h