Bullet Collision Detection & Physics Library

PosixThreadSupport.cpp

Go to the documentation of this file.
00001 /*
00002 Bullet Continuous Collision Detection and Physics Library
00003 Copyright (c) 2003-2007 Erwin Coumans  http://bulletphysics.com
00004 
00005 This software is provided 'as-is', without any express or implied warranty.
00006 In no event will the authors be held liable for any damages arising from the use of this software.
00007 Permission is granted to anyone to use this software for any purpose, 
00008 including commercial applications, and to alter it and redistribute it freely, 
00009 subject to the following restrictions:
00010 
00011 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
00012 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
00013 3. This notice may not be removed or altered from any source distribution.
00014 */
00015 
00016 #include <stdio.h>
00017 #include "PosixThreadSupport.h"
00018 #ifdef USE_PTHREADS
00019 #include <errno.h>
00020 #include <unistd.h>
00021 
00022 #include "SpuCollisionTaskProcess.h"
00023 #include "SpuNarrowPhaseCollisionTask/SpuGatheringCollisionTask.h"
00024 
00025 #define checkPThreadFunction(returnValue) \
00026     if(0 != returnValue) { \
00027         printf("PThread problem at line %i in file %s: %i %d\n", __LINE__, __FILE__, returnValue, errno); \
00028     }
00029 
00030 // The number of threads should be equal to the number of available cores
00031 // Todo: each worker should be linked to a single core, using SetThreadIdealProcessor.
00032 
00033 // PosixThreadSupport helps to initialize/shutdown libspe2, start/stop SPU tasks and communication
00034 // Setup and initialize SPU/CELL/Libspe2
00035 PosixThreadSupport::PosixThreadSupport(ThreadConstructionInfo& threadConstructionInfo)
00036 {
00037         startThreads(threadConstructionInfo);
00038 }
00039 
00040 // cleanup/shutdown Libspe2
00041 PosixThreadSupport::~PosixThreadSupport()
00042 {
00043         stopSPU();
00044 }
00045 
00046 #if (defined (__APPLE__))
00047 #define NAMED_SEMAPHORES
00048 #endif
00049 
00050 // this semaphore will signal, if and how many threads are finished with their work
00051 static sem_t* mainSemaphore=0;
00052 
00053 static sem_t* createSem(const char* baseName)
00054 {
00055         static int semCount = 0;
00056 #ifdef NAMED_SEMAPHORES
00057 
00058         char name[32];
00059         snprintf(name, 32, "/%s-%d-%4.4d", baseName, getpid(), semCount++); 
00060         sem_t* tempSem = sem_open(name, O_CREAT, 0600, 0);
00061 
00062         if (tempSem != reinterpret_cast<sem_t *>(SEM_FAILED))
00063         {
00064 //        printf("Created \"%s\" Semaphore %p\n", name, tempSem);
00065         }
00066         else
00067         {
00068                 //printf("Error creating Semaphore %d\n", errno);
00069                 exit(-1);
00070         }
00072 #else
00073         sem_t* tempSem = new sem_t;
00074         checkPThreadFunction(sem_init(tempSem, 0, 0));
00075 #endif
00076         return tempSem;
00077 }
00078 
00079 static void destroySem(sem_t* semaphore)
00080 {
00081 #ifdef NAMED_SEMAPHORES
00082         checkPThreadFunction(sem_close(semaphore));
00083 #else
00084         checkPThreadFunction(sem_destroy(semaphore));
00085         delete semaphore;
00086 #endif  
00087 }
00088 
00089 static void *threadFunction(void *argument) 
00090 {
00091 
00092         PosixThreadSupport::btSpuStatus* status = (PosixThreadSupport::btSpuStatus*)argument;
00093 
00094         
00095         while (1)
00096         {
00097             checkPThreadFunction(sem_wait(status->startSemaphore));
00098                 
00099                 void* userPtr = status->m_userPtr;
00100 
00101                 if (userPtr)
00102                 {
00103                         btAssert(status->m_status);
00104                         status->m_userThreadFunc(userPtr,status->m_lsMemory);
00105                         status->m_status = 2;
00106                         checkPThreadFunction(sem_post(mainSemaphore));
00107                         status->threadUsed++;
00108                 } else {
00109                         //exit Thread
00110                         status->m_status = 3;
00111                         checkPThreadFunction(sem_post(mainSemaphore));
00112                         printf("Thread with taskId %i exiting\n",status->m_taskId);
00113                         break;
00114                 }
00115                 
00116         }
00117 
00118         printf("Thread TERMINATED\n");
00119         return 0;
00120 
00121 }
00122 
00124 void PosixThreadSupport::sendRequest(uint32_t uiCommand, ppu_address_t uiArgument0, uint32_t taskId)
00125 {
00127         
00129         
00130 
00131 
00132         switch (uiCommand)
00133         {
00134         case    CMD_GATHER_AND_PROCESS_PAIRLIST:
00135                 {
00136                         btSpuStatus&    spuStatus = m_activeSpuStatus[taskId];
00137                         btAssert(taskId >= 0);
00138                         btAssert(taskId < m_activeSpuStatus.size());
00139 
00140                         spuStatus.m_commandId = uiCommand;
00141                         spuStatus.m_status = 1;
00142                         spuStatus.m_userPtr = (void*)uiArgument0;
00143 
00144                         // fire event to start new task
00145                         checkPThreadFunction(sem_post(spuStatus.startSemaphore));
00146                         break;
00147                 }
00148         default:
00149                 {
00151                         btAssert(0);
00152                 }
00153 
00154         };
00155 
00156 
00157 }
00158 
00159 
00161 void PosixThreadSupport::waitForResponse(unsigned int *puiArgument0, unsigned int *puiArgument1)
00162 {
00164         
00166 
00167 
00168         btAssert(m_activeSpuStatus.size());
00169 
00170         // wait for any of the threads to finish
00171         checkPThreadFunction(sem_wait(mainSemaphore));
00172         
00173         // get at least one thread which has finished
00174         size_t last = -1;
00175         
00176         for(size_t t=0; t < size_t(m_activeSpuStatus.size()); ++t) {
00177             if(2 == m_activeSpuStatus[t].m_status) {
00178                 last = t;
00179                 break;
00180             }
00181         }
00182 
00183         btSpuStatus& spuStatus = m_activeSpuStatus[last];
00184 
00185         btAssert(spuStatus.m_status > 1);
00186         spuStatus.m_status = 0;
00187 
00188         // need to find an active spu
00189         btAssert(last >= 0);
00190 
00191         *puiArgument0 = spuStatus.m_taskId;
00192         *puiArgument1 = spuStatus.m_status;
00193 }
00194 
00195 
00196 
00197 void PosixThreadSupport::startThreads(ThreadConstructionInfo& threadConstructionInfo)
00198 {
00199         printf("%s creating %i threads.\n", __FUNCTION__, threadConstructionInfo.m_numThreads);
00200         m_activeSpuStatus.resize(threadConstructionInfo.m_numThreads);
00201         
00202         mainSemaphore = createSem("main");                
00203         //checkPThreadFunction(sem_wait(mainSemaphore));
00204    
00205         for (int i=0;i < threadConstructionInfo.m_numThreads;i++)
00206         {
00207                 printf("starting thread %d\n",i);
00208 
00209                 btSpuStatus&    spuStatus = m_activeSpuStatus[i];
00210 
00211                 spuStatus.startSemaphore = createSem("threadLocal");                
00212                 
00213                 checkPThreadFunction(pthread_create(&spuStatus.thread, NULL, &threadFunction, (void*)&spuStatus));
00214 
00215                 spuStatus.m_userPtr=0;
00216 
00217                 spuStatus.m_taskId = i;
00218                 spuStatus.m_commandId = 0;
00219                 spuStatus.m_status = 0;
00220                 spuStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc();
00221                 spuStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc;
00222         spuStatus.threadUsed = 0;
00223 
00224                 printf("started thread %d \n",i);
00225                 
00226         }
00227 
00228 }
00229 
00230 void PosixThreadSupport::startSPU()
00231 {
00232 }
00233 
00234 
00236 void PosixThreadSupport::stopSPU()
00237 {
00238         for(size_t t=0; t < size_t(m_activeSpuStatus.size()); ++t) 
00239         {
00240             btSpuStatus&        spuStatus = m_activeSpuStatus[t];
00241             printf("%s: Thread %i used: %ld\n", __FUNCTION__, int(t), spuStatus.threadUsed);
00242 
00243         spuStatus.m_userPtr = 0;       
00244         checkPThreadFunction(sem_post(spuStatus.startSemaphore));
00245         checkPThreadFunction(sem_wait(mainSemaphore));
00246 
00247         printf("destroy semaphore\n"); 
00248             destroySem(spuStatus.startSemaphore);
00249             printf("semaphore destroyed\n");
00250                 checkPThreadFunction(pthread_join(spuStatus.thread,0));
00251         }
00252         printf("destroy main semaphore\n");
00253         destroySem(mainSemaphore);
00254         printf("main semaphore destroyed\n");
00255         m_activeSpuStatus.clear();
00256 }
00257 
00258 class PosixCriticalSection : public btCriticalSection 
00259 {
00260         pthread_mutex_t m_mutex;
00261         
00262 public:
00263         PosixCriticalSection() 
00264         {
00265                 pthread_mutex_init(&m_mutex, NULL);
00266         }
00267         virtual ~PosixCriticalSection() 
00268         {
00269                 pthread_mutex_destroy(&m_mutex);
00270         }
00271         
00272         ATTRIBUTE_ALIGNED16(unsigned int mCommonBuff[32]);
00273         
00274         virtual unsigned int getSharedParam(int i)
00275         {
00276                 return mCommonBuff[i];
00277         }
00278         virtual void setSharedParam(int i,unsigned int p)
00279         {
00280                 mCommonBuff[i] = p;
00281         }
00282         
00283         virtual void lock()
00284         {
00285                 pthread_mutex_lock(&m_mutex);
00286         }
00287         virtual void unlock()
00288         {
00289                 pthread_mutex_unlock(&m_mutex);
00290         }
00291 };
00292 
00293 
00294 #if defined(_POSIX_BARRIERS) && (_POSIX_BARRIERS - 20012L) >= 0
00295 /* OK to use barriers on this platform */
00296 class PosixBarrier : public btBarrier 
00297 {
00298         pthread_barrier_t m_barr;
00299         int m_numThreads;
00300 public:
00301         PosixBarrier()
00302         :m_numThreads(0)        {       }
00303         virtual ~PosixBarrier() {
00304                 pthread_barrier_destroy(&m_barr);
00305         }
00306         
00307         virtual void sync()
00308         {
00309                 int rc = pthread_barrier_wait(&m_barr);
00310                 if(rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
00311                 {
00312                         printf("Could not wait on barrier\n");
00313                         exit(-1);
00314                 }
00315         }
00316         virtual void setMaxCount(int numThreads)
00317         {
00318                 int result = pthread_barrier_init(&m_barr, NULL, numThreads);
00319                 m_numThreads = numThreads;
00320                 btAssert(result==0);
00321         }
00322         virtual int  getMaxCount()
00323         {
00324                 return m_numThreads;
00325         }
00326 };
00327 #else
00328 /* Not OK to use barriers on this platform - insert alternate code here */
00329 class PosixBarrier : public btBarrier 
00330 {
00331         pthread_mutex_t m_mutex;
00332         pthread_cond_t m_cond;
00333         
00334         int m_numThreads;
00335         int     m_called;
00336         
00337 public:
00338         PosixBarrier()
00339         :m_numThreads(0)
00340         {
00341         }
00342         virtual ~PosixBarrier() 
00343         {
00344                 if (m_numThreads>0)
00345                 {
00346                         pthread_mutex_destroy(&m_mutex);
00347                         pthread_cond_destroy(&m_cond);
00348                 }
00349         }
00350         
00351         virtual void sync()
00352         {               
00353                 pthread_mutex_lock(&m_mutex);
00354                 m_called++;
00355                 if (m_called == m_numThreads) {
00356                         m_called = 0;
00357                         pthread_cond_broadcast(&m_cond);
00358                 } else {
00359                         pthread_cond_wait(&m_cond,&m_mutex);
00360                 }
00361                 pthread_mutex_unlock(&m_mutex);
00362                 
00363         }
00364         virtual void setMaxCount(int numThreads)
00365         {
00366                 if (m_numThreads>0)
00367                 {
00368                         pthread_mutex_destroy(&m_mutex);
00369                         pthread_cond_destroy(&m_cond);
00370                 }
00371                 m_called = 0;
00372                 pthread_mutex_init(&m_mutex,NULL);
00373                 pthread_cond_init(&m_cond,NULL);
00374                 m_numThreads = numThreads;
00375         }
00376         virtual int  getMaxCount()
00377         {
00378                 return m_numThreads;
00379         }
00380 };
00381 
00382 #endif//_POSIX_BARRIERS
00383 
00384 
00385 
00386 btBarrier* PosixThreadSupport::createBarrier()
00387 {
00388         PosixBarrier* barrier = new PosixBarrier();
00389         barrier->setMaxCount(getNumTasks());
00390         return barrier;
00391 }
00392 
00393 btCriticalSection* PosixThreadSupport::createCriticalSection()
00394 {
00395         return new PosixCriticalSection();
00396 }
00397 
00398 #endif // USE_PTHREADS
00399