/* Copyright (C) 2014 Szilard Biro Copyright (C) 2018 Harry Sintonen Copyright (C) 2019/20 Stefan "Bebbo" Franke - AmigaOS 3 port / bug fixes This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software. Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions: 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. 3. This notice may not be removed or altered from any source distribution. */ #ifdef __MORPHOS__ #include #endif #include #include #include #include #ifdef __AROS__ #include #define TIMESPEC_TO_TIMEVAL(tv, ts) { \ (tv)->tv_sec = (ts)->tv_sec; \ (tv)->tv_usec = (ts)->tv_nsec / 1000; } #elif !defined(__AMIGA__) #include #define StackSwapArgs PPCStackSwapArgs #define NewStackSwap NewPPCStackSwap #endif #define __PTHREAD_SEMAPHORE_TYPE struct SignalSemaphore #define __PTHREAD_MINLIST_TYPE struct MinList #include #include #include #include #include #include #include "pthread.h" #include "debug.h" #if defined(__AMIGA__) #include #include #define NEWLIST(a) NewList(a) #include #define INITIALSIZE 1 #ifndef IPTR #define IPTR ULONG #endif # define ForeachNode(l,n) \ for (n=(void *)(l)->mlh_Head; \ ((struct Node *)(n))->ln_Succ; \ n=(void *)(((struct Node *)(n))->ln_Succ)) #endif #define SIGB_PARENT SIGBREAKB_CTRL_F #define SIGF_PARENT (1 << SIGB_PARENT) #define SIGB_COND_FALLBACK SIGBREAKB_CTRL_E #define SIGF_COND_FALLBACK (1 << SIGB_COND_FALLBACK) #define SIGB_TIMER_FALLBACK SIGBREAKB_CTRL_D #define SIGF_TIMER_FALLBACK (1 << SIGB_TIMER_FALLBACK) #define NAMELEN 32 #define PTHREAD_FIRST_THREAD_ID (1) #define PTHREAD_BARRIER_FLAG (1UL << 31) //#define USE_ASYNC_CANCEL typedef struct { struct MinNode node; struct Task *task; ULONG sigmask; } CondWaiter; typedef struct { void (*destructor)(void*); short used; } TLSKey; typedef struct { struct MinNode node; void (*routine)(void*); void *arg; } CleanupHandler; typedef struct { void* (*start)(void*); void *arg; struct Task *parent; struct Task *task; void *ret; pthread_attr_t attr; void *tlsvalues[PTHREAD_KEYS_MAX]; struct MinList cleanup; char started; char finished; short cancelstate; short canceltype; short canceled; jmp_buf jmp; } ThreadInfo; static ThreadInfo *_threads; static unsigned numThreads; static struct SignalSemaphore thread_sem; static TLSKey *_tlskeys; static unsigned numTlskeys; static struct SignalSemaphore tls_sem; // // Helper functions // static int SemaphoreIsInvalid(struct SignalSemaphore *sem) { int r = (!sem || sem->ss_Link.ln_Type != NT_SIGNALSEM || sem->ss_WaitQueue.mlh_Tail != NULL); DB2(bug("%s(%8lx)=%ld\n", __FUNCTION__, sem, r)); return r; } static int SemaphoreIsMine(struct SignalSemaphore *sem) { struct Task *me; DB2(bug("%s(%8lx)\n", __FUNCTION__, sem)); #ifdef __AMIGA__ me = SysBase->ThisTask; #else me = FindTask(NULL); #endif return (sem && sem->ss_NestCount > 0 && sem->ss_Owner == me); } static ThreadInfo* GetThreadInfo(pthread_t thread) { DB2(bug("%s(%ld)\n", __FUNCTION__, thread)); // TODO: more robust error handling? if (thread < numThreads) return &_threads[thread]; return 0; } static pthread_t GetThreadId(struct Task *task) { pthread_t i; // would recurse... // DB2(bug("%s(%ld)\n", __FUNCTION__, task)); // 0 is main task, First thread id will be 1 so that it is different than default value of pthread_t for (i = 0; i < numThreads; i++) { if (_threads[i].task == task) break; } return i; } #if defined __mc68000__ /* No CAS instruction on m68k */ static int __m68k_sync_val_compare_and_swap(int *v, int o, int n) { volatile int * vv = (volatile int *)v; Disable(); if (*vv == o) { *vv = n; Enable(); return n; } Enable(); return o; } #undef __sync_val_compare_and_swap #define __sync_val_compare_and_swap(v, o, n) __m68k_sync_val_compare_and_swap(v, o, n) #undef __sync_lock_test_and_set #endif // // Thread specific data functions // int pthread_key_create(pthread_key_t *key, void (*destructor)(void*)) { TLSKey *tls; int i; D(bug("%s(%ld, %ld)\n", __FUNCTION__, key, destructor)); if (key == NULL) return EINVAL; ObtainSemaphore(&tls_sem); for (i = 0; i < numTlskeys; i++) { if (_tlskeys[i].used == FALSE) break; } if (i == PTHREAD_KEYS_MAX) { ReleaseSemaphore(&tls_sem); return EAGAIN; } if (i == numTlskeys) { // resize the thread's storage. unsigned n = numTlskeys + numTlskeys + 2; if (n > PTHREAD_KEYS_MAX) n = PTHREAD_KEYS_MAX; TLSKey *t = (TLSKey*) realloc(_tlskeys, n * sizeof(TLSKey)); if (!t) { ReleaseSemaphore(&tls_sem); return EAGAIN; } _tlskeys = t; numTlskeys = n; } tls = &_tlskeys[i]; tls->used = TRUE; tls->destructor = destructor; ReleaseSemaphore(&tls_sem); *key = i; return 0; } int pthread_key_delete(pthread_key_t key) { TLSKey *tls; D(bug("%s(%ld)\n", __FUNCTION__, key)); if (key >= numTlskeys) return EINVAL; tls = &_tlskeys[key]; ObtainSemaphore(&tls_sem); if (tls->used == FALSE) { ReleaseSemaphore(&tls_sem); return EINVAL; } tls->used = FALSE; tls->destructor = NULL; ReleaseSemaphore(&tls_sem); return 0; } int pthread_setspecific(pthread_key_t key, const void *value) { pthread_t thread; ThreadInfo *inf; TLSKey *tls; D(bug("%s(%ld)\n", __FUNCTION__, key)); if (key >= numTlskeys) return EINVAL; thread = pthread_self(); tls = &_tlskeys[key]; ObtainSemaphoreShared(&tls_sem); if (tls->used == FALSE) { ReleaseSemaphore(&tls_sem); return EINVAL; } ReleaseSemaphore(&tls_sem); ObtainSemaphore(&thread_sem); inf = GetThreadInfo(thread); inf->tlsvalues[key] = (void*) value; ReleaseSemaphore(&thread_sem); return 0; } void* pthread_getspecific(pthread_key_t key) { pthread_t thread; ThreadInfo *inf; void *value = NULL; D(bug("%s(%ld)\n", __FUNCTION__, key)); if (key >= PTHREAD_KEYS_MAX) return NULL; thread = pthread_self(); ObtainSemaphore(&thread_sem); inf = GetThreadInfo(thread); value = inf->tlsvalues[key]; ReleaseSemaphore(&thread_sem); return value; } // // Mutex attribute functions // int pthread_mutexattr_init(pthread_mutexattr_t *attr) { D(bug("%s(%8lx)\n", __FUNCTION__, attr)); if (attr == NULL) return EINVAL; attr->kind = PTHREAD_MUTEX_DEFAULT; return 0; } int pthread_mutexattr_destroy(pthread_mutexattr_t *attr) { D(bug("%s(%8lx)\n", __FUNCTION__, attr)); if (attr == NULL) return EINVAL; memset(attr, 0, sizeof(pthread_mutexattr_t)); return 0; } int pthread_mutexattr_gettype(pthread_mutexattr_t *attr, int *kind) { D(bug("%s(%8lx, %8lx)\n", __FUNCTION__, attr, kind)); if (attr == NULL) return EINVAL; if (kind) *kind = attr->kind; return 0; } int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind) { D(bug("%s(%8lx)\n", __FUNCTION__, attr)); if (attr == NULL || !(kind >= PTHREAD_MUTEX_NORMAL && kind <= PTHREAD_MUTEX_ERRORCHECK)) return EINVAL; attr->kind = kind; return 0; } // // Mutex functions // static int _pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr, BOOL staticinit) { DB2(bug("%s(%8lx, %8lx)\n", __FUNCTION__, mutex, attr)); if (mutex == NULL) return EINVAL; if (attr) mutex->kind = attr->kind; else if (!staticinit) mutex->kind = PTHREAD_MUTEX_DEFAULT; InitSemaphore(&mutex->semaphore); mutex->incond = 0; return 0; } int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) { D(bug("%s(%8lx, %8lx)\n", __FUNCTION__, mutex, attr)); return _pthread_mutex_init(mutex, attr, FALSE); } int pthread_mutex_destroy(pthread_mutex_t *mutex) { D(bug("%s(%8lx)\n", __FUNCTION__, mutex)); if (mutex == NULL) return EINVAL; // probably a statically allocated mutex if (SemaphoreIsInvalid(&mutex->semaphore)) return 0; if (/*mutex->incond ||*/AttemptSemaphore(&mutex->semaphore) == FALSE) return EBUSY; if (mutex->incond) { ReleaseSemaphore(&mutex->semaphore); return EBUSY; } ReleaseSemaphore(&mutex->semaphore); memset(mutex, 0, sizeof(pthread_mutex_t)); return 0; } int pthread_mutex_lock(pthread_mutex_t *mutex) { D(bug("%s(%8lx)\n", __FUNCTION__, mutex)); if (mutex == NULL) return EINVAL; struct SignalSemaphore *sigSem = &mutex->semaphore; // initialize static mutexes if (SemaphoreIsInvalid(sigSem)) _pthread_mutex_init(mutex, NULL, TRUE); // normal mutexes would simply deadlock here if (mutex->kind == PTHREAD_MUTEX_ERRORCHECK && SemaphoreIsMine(sigSem)) return EDEADLK; ObtainSemaphore(sigSem); if (mutex->kind == PTHREAD_MUTEX_NORMAL && sigSem->ss_NestCount > 1) { // should have blocked - fix this ReleaseSemaphore(sigSem); return EDEADLK; } #ifndef __AMIGA__ if (from->tv_secs >= unix_to_amiga) { to->tv_secs = from->tv_secs - unix_to_amiga; to->tv_micro = from->tv_micro; } else { to->tv_secs = 0; to->tv_micro = 0; } #endif return 0; } #if defined(__MORPHOS__) || defined(__AMIGA__) static int _obtain_sema_timed(struct SignalSemaphore *sema, const struct timeval *end, int shared) { struct MsgPort msgport; struct SemaphoreMessage msg; #ifndef __AMIGA__ struct Message *m1, *m2; #endif struct timerequest timerio; struct Task *task; #ifdef __AMIGA__ task = SysBase->ThisTask; #else task = FindTask(NULL); #endif msgport.mp_SigBit = AllocSignal(-1); if ((BYTE) msgport.mp_SigBit == -1) return EINVAL; msgport.mp_Node.ln_Type = NT_MSGPORT; msgport.mp_Flags = PA_SIGNAL; msgport.mp_SigTask = task; NEWLIST(&msgport.mp_MsgList); msg.ssm_Semaphore = 0; msg.ssm_Message.mn_Node.ln_Type = NT_MESSAGE; msg.ssm_Message.mn_Node.ln_Name = (char*) shared; msg.ssm_Message.mn_ReplyPort = &msgport; timerio.tr_node.io_Message.mn_Node.ln_Type = NT_MESSAGE; timerio.tr_node.io_Command = TR_ADDREQUEST; timerio.tr_node.io_Message.mn_ReplyPort = &msgport; #ifdef __AMIGA__ timerio.tr_time = *end; timerio.tr_node.io_Device = DOSBase->dl_TimeReq->tr_node.io_Device; timerio.tr_node.io_Unit = DOSBase->dl_TimeReq->tr_node.io_Unit; // Procure is broken on older systems... hand made... struct SemaphoreRequest sr; sr.sr_Waiter = task; SendIO((APTR) & timerio); ULONG mask = SIGF_SINGLE | (1 << msgport.mp_SigBit); Forbid(); task->tc_SigRecvd &= ~mask; AddTail((struct List*) &sema->ss_WaitQueue, (struct Node*) &sr.sr_Link); ULONG signal = Wait(mask); Permit(); if (signal & SIGF_SINGLE) { msg.ssm_Semaphore = sema; } #else UNIXTIME_TO_AMIGATIME(end, &timerio.tr_time); timerio.tr_node.io_Device = waitutc.tr_node.io_Device; timerio.tr_node.io_Unit = waitutc.tr_node.io_Unit; Procure(sema, &msg); SendIO((APTR)&timerio); WaitPort(&msgport); m1 = GetMsg(&msgport); m2 = GetMsg(&msgport); if (m1 == &timerio.tr_node.io_Message || m2 == &timerio.tr_node.io_Message) Vacate(sema, &msg); #endif else { AbortIO((APTR) & timerio); WaitIO((APTR) & timerio); } FreeSignal(msgport.mp_SigBit); if (msg.ssm_Semaphore == 0) return ETIMEDOUT; return 0; } #endif int pthread_mutex_timedlock(pthread_mutex_t *mutex, const struct timespec *abstime) { struct timeval end; #ifndef __AMIGA__ struct timeval now; #endif int result; D(bug("%s(%8lx, %7lx)\n", __FUNCTION__, mutex, abstime)); if (mutex == NULL) return EINVAL; if (abstime == NULL) return pthread_mutex_lock(mutex); else if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) return EINVAL; TIMESPEC_TO_TIMEVAL(&end, abstime); #if defined(__MORPHOS__) || defined(__AMIGA__) result = pthread_mutex_trylock(mutex); if (result != EBUSY) return result; return _obtain_sema_timed(&mutex->semaphore, &end, 0); #else // busy waiting is not very nice, but ObtainSemaphore doesn't support timeouts while ((result = pthread_mutex_trylock(mutex)) == EBUSY) { sched_yield(); gettimeofday(&now, NULL); if (timercmp(&end, &now, <)) return ETIMEDOUT; } return result; #endif } int pthread_mutex_trylock(pthread_mutex_t *mutex) { ULONG ret; D(bug("%s(%8lx)\n", __FUNCTION__, mutex)); if (mutex == NULL) return EINVAL; // initialize static mutexes if (SemaphoreIsInvalid(&mutex->semaphore)) _pthread_mutex_init(mutex, NULL, TRUE); if (mutex->kind != PTHREAD_MUTEX_RECURSIVE && SemaphoreIsMine(&mutex->semaphore)) return EDEADLK; ret = AttemptSemaphore(&mutex->semaphore); return (ret == TRUE) ? 0 : EBUSY; } int pthread_mutex_unlock(pthread_mutex_t *mutex) { D(bug("%s(%8lx)\n", __FUNCTION__, mutex)); if (mutex == NULL) return EINVAL; // initialize static mutexes if (SemaphoreIsInvalid(&mutex->semaphore)) _pthread_mutex_init(mutex, NULL, TRUE); if (mutex->kind != PTHREAD_MUTEX_NORMAL && !SemaphoreIsMine(&mutex->semaphore)) return EPERM; ReleaseSemaphore(&mutex->semaphore); return 0; } // // Condition variable attribute functions // int pthread_condattr_init(pthread_condattr_t *attr) { D(bug("%s(%8lx)\n", __FUNCTION__, attr)); if (attr == NULL) return EINVAL; memset(attr, 0, sizeof(pthread_condattr_t)); return 0; } int pthread_condattr_destroy(pthread_condattr_t *attr) { D(bug("%s(%8lx)\n", __FUNCTION__, attr)); if (attr == NULL) return EINVAL; memset(attr, 0, sizeof(pthread_condattr_t)); return 0; } // // Condition variable functions // int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr) { D(bug("%s(%8lx, %8lx)\n", __FUNCTION__, cond, attr)); if (cond == NULL) return EINVAL; InitSemaphore(&cond->semaphore); NEWLIST((struct List* )&cond->waiters); return 0; } int pthread_cond_destroy(pthread_cond_t *cond) { D(bug("%s(%8lx)\n", __FUNCTION__, cond)); if (cond == NULL) return EINVAL; // probably a statically allocated condition if (SemaphoreIsInvalid(&cond->semaphore)) return 0; if (AttemptSemaphore(&cond->semaphore) == FALSE) return EBUSY; if (!IsListEmpty((struct List*) &cond->waiters)) { ReleaseSemaphore(&cond->semaphore); return EBUSY; } ReleaseSemaphore(&cond->semaphore); memset(cond, 0, sizeof(pthread_cond_t)); return 0; } static int _pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime, BOOL relative) { CondWaiter waiter; BYTE signal; ULONG sigs = 0; ULONG timermask = 0; struct MsgPort timermp; struct timerequest timerio; struct Task *task; DB2(bug("%s(%8lx, %8lx, %8lx)\n", __FUNCTION__, cond, mutex, abstime)); if (cond == NULL || mutex == NULL) return EINVAL; pthread_testcancel(); // initialize static conditions if (SemaphoreIsInvalid(&cond->semaphore)) pthread_cond_init(cond, NULL); #ifdef __AMIGA__ task = SysBase->ThisTask; #else task = FindTask(NULL); #endif if (abstime) { // prepare MsgPort timermp.mp_Node.ln_Type = NT_MSGPORT; timermp.mp_Node.ln_Pri = 0; timermp.mp_Node.ln_Name = NULL; timermp.mp_Flags = PA_SIGNAL; timermp.mp_SigTask = task; signal = AllocSignal(-1); if (signal == -1) { signal = SIGB_TIMER_FALLBACK; SetSignal(SIGF_TIMER_FALLBACK, 0); } timermp.mp_SigBit = signal; NEWLIST(&timermp.mp_MsgList); // prepare IORequest timerio.tr_node.io_Message.mn_Node.ln_Type = NT_MESSAGE; timerio.tr_node.io_Message.mn_Node.ln_Pri = 0; timerio.tr_node.io_Message.mn_Node.ln_Name = NULL; timerio.tr_node.io_Message.mn_ReplyPort = &timermp; timerio.tr_node.io_Message.mn_Length = sizeof(struct timerequest); // open timer.device if (OpenDevice((STRPTR) TIMERNAME, UNIT_MICROHZ, &timerio.tr_node, 0) != 0) { if (timermp.mp_SigBit != SIGB_TIMER_FALLBACK) FreeSignal(timermp.mp_SigBit); return EINVAL; } // prepare the device command and send it timerio.tr_node.io_Command = TR_ADDREQUEST; timerio.tr_node.io_Flags = 0; TIMESPEC_TO_TIMEVAL(&timerio.tr_time, abstime); if (!relative) { struct timeval starttime; // absolute time has to be converted to relative // GetSysTime can't be used due to the timezone offset in abstime gettimeofday(&starttime, NULL); timersub(&timerio.tr_time, &starttime, &timerio.tr_time); } timermask = 1 << timermp.mp_SigBit; sigs |= timermask; SendIO((struct IORequest*) &timerio); } // prepare a waiter node waiter.task = task; signal = AllocSignal(-1); if (signal == -1) { signal = SIGB_COND_FALLBACK; SetSignal(SIGF_COND_FALLBACK, 0); } waiter.sigmask = 1 << signal; sigs |= waiter.sigmask; // add it to the end of the list ObtainSemaphore(&cond->semaphore); AddTail((struct List*) &cond->waiters, (struct Node*) &waiter); ReleaseSemaphore(&cond->semaphore); // wait for the condition to be signalled or the timeout mutex->incond++; pthread_mutex_unlock(mutex); sigs = Wait(sigs); pthread_mutex_lock(mutex); mutex->incond--; // remove the node from the list ObtainSemaphore(&cond->semaphore); Remove((struct Node*) &waiter); ReleaseSemaphore(&cond->semaphore); if (signal != SIGB_COND_FALLBACK) FreeSignal(signal); if (abstime) { // clean up the timerequest if (!CheckIO((struct IORequest*) &timerio)) { AbortIO((struct IORequest*) &timerio); WaitIO((struct IORequest*) &timerio); } CloseDevice((struct IORequest*) &timerio); if (timermp.mp_SigBit != SIGB_TIMER_FALLBACK) FreeSignal(timermp.mp_SigBit); // did we timeout? if (sigs & timermask) return ETIMEDOUT; } return 0; } int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime) { D(bug("%s(%8lx, %8lx, %8lx)\n", __FUNCTION__, cond, mutex, abstime)); return _pthread_cond_timedwait(cond, mutex, abstime, FALSE); } int pthread_cond_timedwait_relative_np(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *reltime) { D(bug("%s(%8lx, %8lx, %8lx)\n", __FUNCTION__, cond, mutex, reltime)); return _pthread_cond_timedwait(cond, mutex, reltime, TRUE); } int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) { D(bug("%s(%8lx)\n", __FUNCTION__, cond)); return _pthread_cond_timedwait(cond, mutex, NULL, FALSE); } static int _pthread_cond_broadcast(pthread_cond_t *cond, BOOL onlyfirst) { CondWaiter *waiter; DB2(bug("%s(%8lx, %ld)\n", __FUNCTION__, cond, onlyfirst)); if (cond == NULL) return EINVAL; // initialize static conditions if (SemaphoreIsInvalid(&cond->semaphore)) pthread_cond_init(cond, NULL); // signal the waiting threads ObtainSemaphore(&cond->semaphore); ForeachNode(&cond->waiters, waiter) { Signal(waiter->task, waiter->sigmask); if (onlyfirst) break; } ReleaseSemaphore(&cond->semaphore); return 0; } int pthread_cond_signal(pthread_cond_t *cond) { D(bug("%s(%8lx)\n", __FUNCTION__, cond)); return _pthread_cond_broadcast(cond, TRUE); } int pthread_cond_broadcast(pthread_cond_t *cond) { D(bug("%s(%8lx)\n", __FUNCTION__, cond)); return _pthread_cond_broadcast(cond, FALSE); } // // Barrier functions // int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count) { D(bug("%s(%8lx, %8lx, %ld)\n", __FUNCTION__, barrier, attr, count)); if (barrier == NULL || count == 0) return EINVAL; barrier->curr_height = count; barrier->total_height = PTHREAD_BARRIER_FLAG; pthread_cond_init(&barrier->breeched, NULL); pthread_mutex_init(&barrier->lock, NULL); return 0; } int pthread_barrier_destroy(pthread_barrier_t *barrier) { D(bug("%s(%8lx)\n", __FUNCTION__, barrier)); if (barrier == NULL) return EINVAL; if (pthread_mutex_trylock(&barrier->lock) != 0) return EBUSY; if (barrier->total_height > PTHREAD_BARRIER_FLAG) { pthread_mutex_unlock(&barrier->lock); return EBUSY; } pthread_mutex_unlock(&barrier->lock); if (pthread_cond_destroy(&barrier->breeched) != 0) return EBUSY; pthread_mutex_destroy(&barrier->lock); barrier->curr_height = barrier->total_height = 0; return 0; } int pthread_barrier_wait(pthread_barrier_t *barrier) { D(bug("%s(%8lx)\n", __FUNCTION__, barrier)); if (barrier == NULL) return EINVAL; pthread_mutex_lock(&barrier->lock); // wait until everyone exits the barrier while (barrier->total_height > PTHREAD_BARRIER_FLAG) pthread_cond_wait(&barrier->breeched, &barrier->lock); // are we the first to enter? if (barrier->total_height == PTHREAD_BARRIER_FLAG) barrier->total_height = 0; barrier->total_height++; if (barrier->total_height == barrier->curr_height) { barrier->total_height += PTHREAD_BARRIER_FLAG - 1; pthread_cond_broadcast(&barrier->breeched); pthread_mutex_unlock(&barrier->lock); return PTHREAD_BARRIER_SERIAL_THREAD; } else { // wait until enough threads enter the barrier while (barrier->total_height < PTHREAD_BARRIER_FLAG) pthread_cond_wait(&barrier->breeched, &barrier->lock); barrier->total_height--; // get entering threads to wake up if (barrier->total_height == PTHREAD_BARRIER_FLAG) pthread_cond_broadcast(&barrier->breeched); pthread_mutex_unlock(&barrier->lock); return 0; } } // // Read-write lock attribute functions // int pthread_rwlockattr_init(pthread_rwlockattr_t *attr) { D(bug("%s(%8lx)\n", __FUNCTION__, attr)); if (attr == NULL) return EINVAL; memset(attr, 0, sizeof(pthread_rwlockattr_t)); return 0; } int pthread_rwlockattr_destroy(pthread_rwlockattr_t *attr) { D(bug("%s(%8lx)\n", __FUNCTION__, attr)); if (attr == NULL) return EINVAL; memset(attr, 0, sizeof(pthread_rwlockattr_t)); return 0; } // // Read-write lock functions // int pthread_rwlock_init(pthread_rwlock_t *lock, const pthread_rwlockattr_t *attr) { D(bug("%s(%8lx, %8lx)\n", __FUNCTION__, lock, attr)); if (lock == NULL) return EINVAL; InitSemaphore(&lock->semaphore); return 0; } int pthread_rwlock_destroy(pthread_rwlock_t *lock) { D(bug("%s(%8lx)\n", __FUNCTION__, lock)); if (lock == NULL) return EINVAL; // probably a statically allocated rwlock if (SemaphoreIsInvalid(&lock->semaphore)) return 0; if (AttemptSemaphore(&lock->semaphore) == FALSE) return EBUSY; ReleaseSemaphore(&lock->semaphore); memset(lock, 0, sizeof(pthread_rwlock_t)); return 0; } int pthread_rwlock_tryrdlock(pthread_rwlock_t *lock) { ULONG ret; D(bug("%s(%8lx)\n", __FUNCTION__, lock)); if (lock == NULL) return EINVAL; // initialize static rwlocks if (SemaphoreIsInvalid(&lock->semaphore)) pthread_rwlock_init(lock, NULL); ret = AttemptSemaphoreShared(&lock->semaphore); return (ret == TRUE) ? 0 : EBUSY; } int pthread_rwlock_trywrlock(pthread_rwlock_t *lock) { ULONG ret; D(bug("%s(%8lx)\n", __FUNCTION__, lock)); if (lock == NULL) return EINVAL; // initialize static rwlocks if (SemaphoreIsInvalid(&lock->semaphore)) pthread_rwlock_init(lock, NULL); ret = AttemptSemaphore(&lock->semaphore); return (ret == TRUE) ? 0 : EBUSY; } int pthread_rwlock_rdlock(pthread_rwlock_t *lock) { D(bug("%s(%8lx)\n", __FUNCTION__, lock)); if (lock == NULL) return EINVAL; pthread_testcancel(); // initialize static rwlocks if (SemaphoreIsInvalid(&lock->semaphore)) pthread_rwlock_init(lock, NULL); // "Results are undefined if the calling thread holds a write lock on rwlock at the time the call is made." #if !defined(__MORPHOS__) && !defined(__AMIGA__) // we might already have a write lock if (SemaphoreIsMine(&lock->semaphore)) return EDEADLK; #endif ObtainSemaphoreShared(&lock->semaphore); return 0; } int pthread_rwlock_timedrdlock(pthread_rwlock_t *lock, const struct timespec *abstime) { struct timeval end; int result; D(bug("%s(%8lx, %8lx)\n", __FUNCTION__, lock, abstime)); if (lock == NULL) return EINVAL; if (abstime == NULL) return pthread_rwlock_rdlock(lock); else if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) return EINVAL; pthread_testcancel(); TIMESPEC_TO_TIMEVAL(&end, abstime); #if defined(__MORPHOS__) || defined(__AMIGA__) result = pthread_rwlock_tryrdlock(lock); if (result != EBUSY) return result; return _obtain_sema_timed(&lock->semaphore, &end, 1); #else // busy waiting is not very nice, but ObtainSemaphore doesn't support timeouts while ((result = pthread_rwlock_tryrdlock(lock)) == EBUSY) { struct timeval now; sched_yield(); gettimeofday(&now, NULL); if (timercmp(&end, &now, <)) return ETIMEDOUT; } return result; #endif } int pthread_rwlock_wrlock(pthread_rwlock_t *lock) { D(bug("%s(%8lx)\n", __FUNCTION__, lock)); if (lock == NULL) return EINVAL; pthread_testcancel(); // initialize static rwlocks if (SemaphoreIsInvalid(&lock->semaphore)) pthread_rwlock_init(lock, NULL); if (SemaphoreIsMine(&lock->semaphore)) return EDEADLK; ObtainSemaphore(&lock->semaphore); return 0; } int pthread_rwlock_timedwrlock(pthread_rwlock_t *lock, const struct timespec *abstime) { struct timeval end; int result; D(bug("%s(%8lx, %8lx)\n", __FUNCTION__, lock, abstime)); if (lock == NULL) return EINVAL; if (abstime == NULL) return pthread_rwlock_wrlock(lock); else if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) return EINVAL; pthread_testcancel(); TIMESPEC_TO_TIMEVAL(&end, abstime); #if defined(__MORPHOS__) || defined(__AMIGA__) result = pthread_rwlock_trywrlock(lock); if (result != EBUSY) return result; return _obtain_sema_timed(&lock->semaphore, &end, 0); #else // busy waiting is not very nice, but ObtainSemaphore doesn't support timeouts while ((result = pthread_rwlock_trywrlock(lock)) == EBUSY) { struct timeval now; sched_yield(); gettimeofday(&now, NULL); if (timercmp(&end, &now, <)) return ETIMEDOUT; } return result; #endif } int pthread_rwlock_unlock(pthread_rwlock_t *lock) { D(bug("%s(%8lx)\n", __FUNCTION__, lock)); if (lock == NULL) return EINVAL; // initialize static rwlocks if (SemaphoreIsInvalid(&lock->semaphore)) pthread_rwlock_init(lock, NULL); //if (!SemaphoreIsMine(&lock->semaphore)) // if no one has obtained the semaphore don't unlock the rwlock // this can be a leap of faith because we don't maintain a separate list of readers if (lock->semaphore.ss_NestCount < 1) return EPERM; ReleaseSemaphore(&lock->semaphore); return 0; } // // Spinlock functions // int pthread_spin_init(pthread_spinlock_t *lock, int pshared) { D(bug("%s(%8lx, %ld)\n", __FUNCTION__, lock, pshared)); if (lock == NULL) return EINVAL; *lock = 0; return 0; } int pthread_spin_destroy(pthread_spinlock_t *lock) { D(bug("%s(%8lx)\n", __FUNCTION__, lock)); return 0; } int pthread_spin_lock(pthread_spinlock_t *lock) { D(bug("%s(%8lx)\n", __FUNCTION__, lock)); if (lock == NULL) return EINVAL; #ifdef __MORPHOS__ { unsigned int cnt = 0; while (__sync_val_compare_and_swap((int *)lock, 0, 1) == 0) { asm volatile("" ::: "memory"); if ((cnt++ & 255) == 0) sched_yield(); } } #else while (__sync_val_compare_and_swap((int*) lock, 0, 1) == 0) sched_yield(); // TODO: don't yield the CPU every iteration // SBF: if yield is implemented correctly there's nothing else to do. #endif return 0; } int pthread_spin_trylock(pthread_spinlock_t *lock) { D(bug("%s(%8lx)\n", __FUNCTION__, lock)); if (lock == NULL) return EINVAL; if (__sync_val_compare_and_swap((int*) lock, 0, 1) == 0) return EBUSY; return 0; } int pthread_spin_unlock(pthread_spinlock_t *lock) { D(bug("%s(%8lx)\n", __FUNCTION__, lock)); if (lock == NULL) return EINVAL; __sync_lock_release((int*) lock); return 0; } // // Thread attribute functions // int pthread_attr_init(pthread_attr_t *attr) { struct Task *task; D(bug("%s(%8lx)\n", __FUNCTION__, attr)); if (attr == NULL) return EINVAL; memset(attr, 0, sizeof(pthread_attr_t)); // inherit the priority and stack size of the parent thread #ifdef __AMIGA__ task = SysBase->ThisTask; #else task = FindTask(NULL); #endif attr->param.sched_priority = task->tc_Node.ln_Pri; #ifdef __MORPHOS__ NewGetTaskAttrs(task, &attr->stacksize68k, sizeof(attr->stacksize68k), TASKINFOTYPE_STACKSIZE_M68K, TAG_DONE); NewGetTaskAttrs(task, &attr->stacksize, sizeof(attr->stacksize), TASKINFOTYPE_STACKSIZE, TAG_DONE); #else attr->stacksize = (UBYTE*) task->tc_SPUpper - (UBYTE*) task->tc_SPLower; #endif D(bug("%s(%8lx, detach=%ld)\n", __FUNCTION__, attr, (int)attr->detachstate)); return 0; } int pthread_attr_destroy(pthread_attr_t *attr) { D(bug("%s(%8lx)\n", __FUNCTION__, attr)); if (attr == NULL) return EINVAL; memset(attr, 0, sizeof(pthread_attr_t)); return 0; } int pthread_attr_getdetachstate(const pthread_attr_t *attr, int *detachstate) { D(bug("%s(%8lx, %8lx)\n", __FUNCTION__, attr, detachstate)); if (attr == NULL) return EINVAL; if (detachstate != NULL) *detachstate = attr->detachstate; return 0; } int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate) { D(bug("%s(%8lx, %ld)\n", __FUNCTION__, attr, detachstate)); if (attr == NULL || detachstate != PTHREAD_CREATE_JOINABLE) return EINVAL; attr->detachstate = detachstate; return 0; } int pthread_attr_getstack(const pthread_attr_t *attr, void **stackaddr, size_t *stacksize) { D(bug("%s(%8lx, %8lx, %8lx)\n", __FUNCTION__, attr, stackaddr, stacksize)); if (attr == NULL) return EINVAL; if (stackaddr != NULL) *stackaddr = attr->stackaddr; if (stacksize != NULL) *stacksize = attr->stacksize; return 0; } int pthread_attr_setstack(pthread_attr_t *attr, void *stackaddr, size_t stacksize) { D(bug("%s(%8lx, %8lx, %ld)\n", __FUNCTION__, attr, stackaddr, stacksize)); if (attr == NULL || (stackaddr != NULL && stacksize == 0)) return EINVAL; attr->stackaddr = stackaddr; attr->stacksize = stacksize; return 0; } int pthread_attr_getstacksize(const pthread_attr_t *attr, size_t *stacksize) { D(bug("%s(%8lx, %8lx)\n", __FUNCTION__, attr, stacksize)); return pthread_attr_getstack(attr, NULL, stacksize); } int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize) { D(bug("%s(%8lx, %ld)\n", __FUNCTION__, attr, stacksize)); return pthread_attr_setstack(attr, NULL, stacksize); } int pthread_attr_getschedparam(const pthread_attr_t *attr, struct sched_param *param) { D(bug("%s(%8lx, %8lx)\n", __FUNCTION__, attr, param)); if (attr == NULL) return EINVAL; if (param != NULL) *param = attr->param; return 0; } int pthread_attr_setschedparam(pthread_attr_t *attr, const struct sched_param *param) { D(bug("%s(%8lx, %8lx)\n", __FUNCTION__, attr, param)); if (attr == NULL || param == NULL) return EINVAL; attr->param = *param; return 0; } // // Thread functions // #ifdef USE_ASYNC_CANCEL #ifdef __MORPHOS__ static ULONG CancelHandlerFunc(void); static struct EmulLibEntry CancelHandler = { TRAP_LIB, 0, (void (*)(void))CancelHandlerFunc }; static ULONG CancelHandlerFunc(void) { ULONG signals = (ULONG)REG_D0; APTR data = (APTR)REG_A1; struct ExecBase *SysBase = (struct ExecBase *)REG_A6; #else AROS_UFH3S(ULONG, CancelHandler, AROS_UFHA(ULONG, signals, D0), AROS_UFHA(APTR, data, A1), AROS_UFHA(struct ExecBase *, SysBase, A6)) { AROS_USERFUNC_INIT #endif DB2(bug("%s(%ld, %ld, %ld)\n", __FUNCTION__, signals, data, SysBase)); pthread_testcancel(); return signals; #ifdef __AROS__ AROS_USERFUNC_EXIT #endif } #endif static void StarterFunc(void) { ThreadInfo *inf; void *ret; pthread_t pid; int i, j; int foundkey = TRUE; #ifdef USE_ASYNC_CANCEL APTR oldexcept; #endif #ifdef __AMIGA__ struct Process *proc = (struct Process*) SysBase->ThisTask; pid = (pthread_t) proc->pr_CIS; proc->pr_CIS = 0; #else inf = (pthread_t)FindTask(NULL)->tc_UserData; #endif DB2(bug("%s(%ld)\n", __FUNCTION__, pid)); ObtainSemaphore(&thread_sem); inf = GetThreadInfo(pid); inf->started = 1; // trim the name //inf->task->tc_Node.ln_Name[inf->oldlen]; // we have to set the priority here to avoid race conditions SetTaskPri(inf->task, inf->attr.param.sched_priority); #ifdef USE_ASYNC_CANCEL // set the exception handler for async cancellation oldexcept = inf->task->tc_ExceptCode; #ifdef __AROS__ inf->task->tc_ExceptCode = &AROS_ASMSYMNAME(CancelHandler); #else inf->task->tc_ExceptCode = &CancelHandler; #endif SetExcept(SIGBREAKF_CTRL_C, SIGBREAKF_CTRL_C); #endif // set a jump point for pthread_exit if (!setjmp(inf->jmp)) { void* (*start)(void*) = inf->start; void *arg = inf->arg; ReleaseSemaphore(&thread_sem); DB2(bug("stack soll %ld, ist %ld\n", inf->attr.stacksize, (unsigned)proc->pr_Task.tc_SPUpper - (unsigned)proc->pr_Task.tc_SPLower)); // custom stack requires special handling if (inf->attr.stackaddr != NULL && inf->attr.stacksize > 0) { #ifdef __AMIGA__ struct StackSwapStruct stack; stack.stk_Lower = inf->attr.stackaddr; stack.stk_Upper = (ULONG)((char*) stack.stk_Lower + inf->attr.stacksize); stack.stk_Pointer = (APTR) stack.stk_Upper; StackSwap(&stack); ret = start(arg); #else struct StackSwapArgs swapargs; struct StackSwapStruct stack; swapargs.Args[0] = (IPTR)inf->arg; stack.stk_Lower = inf->attr.stackaddr; #ifdef __MORPHOS__ stack.stk_Upper = (IPTR)stack.stk_Lower + inf->attr.stacksize; stack.stk_Pointer = (APTR)stack.stk_Upper; #else stack.stk_Upper = (APTR)((IPTR)stack.stk_Lower + inf->attr.stacksize); stack.stk_Pointer = stack.stk_Upper; #endif inf->ret = (void *)NewStackSwap(&stack, inf->start, &swapargs); #endif } else { ret = start(arg); } Forbid(); } // we end up here with Forbid() called before longjmp() Permit(); D(bug("* %s(pid=%ld after long jump)\n", __FUNCTION__, pid)); // destroy all non-NULL TLS key values // since the destructors can set the keys themselves, we have to do multiple iterations ObtainSemaphoreShared(&tls_sem); for (j = 0; foundkey && j < PTHREAD_DESTRUCTOR_ITERATIONS; j++) { foundkey = FALSE; for (i = 0; i < numTlskeys; i++) { if (_tlskeys[i].used && _tlskeys[i].destructor && inf->tlsvalues[i]) { void *oldvalue = inf->tlsvalues[i]; inf->tlsvalues[i] = NULL; _tlskeys[i].destructor(oldvalue); foundkey = TRUE; } } } ReleaseSemaphore(&tls_sem); // inf may be invalid due to realloc ObtainSemaphore(&thread_sem); inf = GetThreadInfo(pid); inf->ret = ret; #ifdef USE_ASYNC_CANCEL // remove the exception handler SetExcept(0, SIGBREAKF_CTRL_C); inf->task->tc_ExceptCode = oldexcept; #endif DB2(bug("%s(releasing inf ptr %8lx)\n", __FUNCTION__, inf)); // tell the parent thread that we are done Forbid(); inf->task = (struct Task *)-1; inf->finished = 1; ReleaseSemaphore(&thread_sem); Signal(inf->parent, SIGF_PARENT); } int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void* (*start)(void*), void *arg) { ThreadInfo *inf; char name[NAMELEN]; size_t oldlen; pthread_t threadnew; D(bug("%s(%8lx, %8lx, %8lx, %8lx)\n", __FUNCTION__, thread, attr, start, arg)); if (thread == NULL || start == NULL) return EINVAL; ObtainSemaphore(&thread_sem); // grab an empty thread slot threadnew = GetThreadId(NULL); if (threadnew == PTHREAD_THREADS_MAX) { ReleaseSemaphore(&thread_sem); return EAGAIN; } if (threadnew == numThreads) { // resize the thread's storage. unsigned n = numThreads + numThreads + 2; D(bug("** %s(resize to %ld)\n", __FUNCTION__, n)); if (n > PTHREAD_THREADS_MAX) n = PTHREAD_THREADS_MAX; if (n != numThreads) { ThreadInfo *t = (ThreadInfo*) realloc(_threads, n * sizeof(ThreadInfo)); if (!t) { ReleaseSemaphore(&thread_sem); return EAGAIN; } memset(&t[numThreads], 0, (n - numThreads) * sizeof(ThreadInfo)); _threads = t; numThreads = n; } } // prepare the ThreadInfo structure inf = GetThreadInfo(threadnew); memset(inf, 0, sizeof(ThreadInfo)); D(bug("new thread=%ld, inf=%8lx\n", threadnew, inf)); inf->start = start; inf->arg = arg; #ifdef __AMIGA__ inf->parent = SysBase->ThisTask; #else inf->parent = FindTask(NULL); #endif if (attr) inf->attr = *attr; else pthread_attr_init(&inf->attr); NEWLIST((struct List* )&inf->cleanup); inf->cancelstate = PTHREAD_CANCEL_ENABLE; inf->canceltype = PTHREAD_CANCEL_DEFERRED; // let's trick CreateNewProc into allocating a larger buffer for the name snprintf(name, sizeof(name), "pthread thread #%ld", (long)threadnew); oldlen = strlen(name); memset(name + oldlen, ' ', sizeof(name) - oldlen - 1); name[sizeof(name) - 1] = '\0'; // start the child thread inf->task = (struct Task*) CreateNewProcTags(NP_Entry, (IPTR) StarterFunc, #ifdef __MORPHOS__ NP_CodeType, CODETYPE_PPC, (inf->attr.stackaddr == NULL && inf->attr.stacksize > 0) ? NP_PPCStackSize : TAG_IGNORE, inf->attr.stacksize, inf->attr.stacksize68k > 0 ? NP_StackSize : TAG_IGNORE, inf->attr.stacksize68k, #else (inf->attr.stackaddr == NULL && inf->attr.stacksize > 0) ? NP_StackSize : TAG_IGNORE, inf->attr.stacksize, #endif #ifdef __AMIGA__ NP_Input, (IPTR) threadnew, #else NP_UserData, threadnew, #endif NP_Name, (IPTR) name, TAG_DONE); ReleaseSemaphore(&thread_sem); D(bug("new thread=%ld, task=%8lx, fini=%ld, detach=%ld\n", threadnew, inf->task, (int)inf->finished, (int)inf->attr.detachstate)); if (!inf->task) { inf->parent = NULL; return EAGAIN; } *thread = threadnew; return 0; } int pthread_detach(pthread_t thread) { D(bug("%s(%ld) not implemented\n", __FUNCTION__, thread)); return ESRCH; } int pthread_join(pthread_t thread, void **value_ptr) { ThreadInfo *inf; struct Task * task = SysBase->ThisTask; pthread_testcancel(); D(bug("%s(%ld, %8lx)\n", __FUNCTION__, thread, value_ptr)); if (!thread || pthread_self() == thread) return EDEADLK; ObtainSemaphore(&thread_sem); inf = GetThreadInfo(thread); D(bug("%s(%ld, inf=%8lx, fini=%ld, detach=%ld, started=%ld)\n", __FUNCTION__, thread, inf, (int)(inf ? inf->finished : 0), (int)(inf ? inf->attr.detachstate : 0), inf->started)); if (inf == NULL || !inf->task) return ESRCH; if (inf->attr.detachstate) return EINVAL; // busy yield loop... while (!inf->started || !inf->finished) { ReleaseSemaphore(&thread_sem); D(bug("waiting for %ld to start...\n", thread)); --task->tc_Node.ln_Pri; sched_yield(); ++task->tc_Node.ln_Pri; #if 0 // TODO add proper signal waiting - but some that works from any task D(bug("%s waiting for signal (%ld, inf=%8lx, fini=%ld)\n", __FUNCTION__, thread, inf, (int)inf->finished)); Wait(SIGF_PARENT); #endif ObtainSemaphore(&thread_sem); inf = GetThreadInfo(thread); } D(bug("%s finished after wait (%ld, %ld)\n", __FUNCTION__, thread, (int)inf->finished)); if (value_ptr) *value_ptr = inf->ret; inf->task = NULL; ReleaseSemaphore(&thread_sem); return 0; } int pthread_equal(pthread_t t1, pthread_t t2) { D(bug("%s(%ld, %ld)\n", __FUNCTION__, t1, t2)); return (t1 == t2); } pthread_t pthread_self(void) { struct Task *task; pthread_t thread; // D(bug("%s()\n", __FUNCTION__)); #ifdef __AMIGA__ task = SysBase->ThisTask; #else task = FindTask(NULL); #endif thread = GetThreadId(task); if (thread == PTHREAD_THREADS_MAX) return 0; return thread; } int pthread_cancel(pthread_t thread) { ThreadInfo *inf; D(bug("%s(%ld)\n", __FUNCTION__, thread)); ObtainSemaphore(&thread_sem); inf = GetThreadInfo(thread); if (inf == NULL || inf->parent == NULL || inf->canceled == TRUE) { ReleaseSemaphore(&thread_sem); return ESRCH; } inf->canceled = TRUE; // we might have to cancel the thread immediately if (inf->canceltype == PTHREAD_CANCEL_ASYNCHRONOUS && inf->cancelstate == PTHREAD_CANCEL_ENABLE) { struct Task *task; #ifdef __AMIGA__ task = SysBase->ThisTask; #else task = FindTask(NULL); #endif if (inf->task == task) pthread_testcancel(); // cancel ourselves else Signal(inf->task, SIGBREAKF_CTRL_C); // trigger the exception handler } ReleaseSemaphore(&thread_sem); return 0; } int pthread_setcancelstate(int state, int *oldstate) { pthread_t thread; ThreadInfo *inf; D(bug("%s(%ld, %8lx)\n", __FUNCTION__, state, oldstate)); if (state != PTHREAD_CANCEL_ENABLE && state != PTHREAD_CANCEL_DISABLE) return EINVAL; thread = pthread_self(); inf = GetThreadInfo(thread); if (oldstate) *oldstate = inf->cancelstate; inf->cancelstate = state; return 0; } int pthread_setcanceltype(int type, int *oldtype) { pthread_t thread; ThreadInfo *inf; D(bug("%s(%ld, %8lx)\n", __FUNCTION__, type, oldtype)); if (type != PTHREAD_CANCEL_DEFERRED && type != PTHREAD_CANCEL_ASYNCHRONOUS) return EINVAL; thread = pthread_self(); ObtainSemaphore(&thread_sem); inf = GetThreadInfo(thread); if (oldtype) *oldtype = inf->canceltype; inf->canceltype = type; ReleaseSemaphore(&thread_sem); return 0; } void pthread_testcancel(void) { pthread_t thread; ThreadInfo *inf; int flag; D(bug("%s()\n", __FUNCTION__)); thread = pthread_self(); ObtainSemaphore(&thread_sem); inf = GetThreadInfo(thread); flag = inf->canceled && (inf->cancelstate == PTHREAD_CANCEL_ENABLE); ReleaseSemaphore(&thread_sem); if (flag) pthread_exit(PTHREAD_CANCELED); } void pthread_exit(void *value_ptr) { pthread_t pid; ThreadInfo *inf; CleanupHandler *handler; pid = pthread_self(); D(bug("%s(pid=%ld, %8lx)\n", __FUNCTION__, pid, value_ptr)); ObtainSemaphore(&thread_sem); inf = GetThreadInfo(pid); inf->ret = value_ptr; // execute the clean-up handlers while ((handler = (CleanupHandler*) RemTail((struct List*) &inf->cleanup))) if (handler->routine) handler->routine(handler->arg); // only for real threads. if (pid) { Forbid(); ReleaseSemaphore(&thread_sem); longjmp(inf->jmp, 1); } ReleaseSemaphore(&thread_sem); } static void OnceCleanup(void *arg) { pthread_once_t *once_control; DB2(bug("%s(%8lx)\n", __FUNCTION__, arg)); once_control = (pthread_once_t*) arg; pthread_spin_unlock(&once_control->lock); } int pthread_once(pthread_once_t *once_control, void (*init_routine)(void)) { D(bug("%s(%8lx, %8lx)\n", __FUNCTION__, once_control, init_routine)); if (once_control == NULL || init_routine == NULL) return EINVAL; if (!once_control->done) { pthread_spin_lock(&once_control->lock); if (!once_control->done) { pthread_cleanup_push(OnceCleanup, once_control); (*init_routine)(); pthread_cleanup_pop(0); once_control->done = TRUE; } pthread_spin_unlock(&once_control->lock); } return 0; } // // Scheduling functions // int pthread_setschedparam(pthread_t thread, int policy, const struct sched_param *param) { ThreadInfo *inf; D(bug("%s(%ld, %ld, %8lx)\n", __FUNCTION__, thread, policy, param)); if (param == NULL) return EINVAL; ObtainSemaphore(&thread_sem); inf = GetThreadInfo(thread); if (inf) SetTaskPri(inf->task, param->sched_priority); ReleaseSemaphore(&thread_sem); if (inf == NULL) return ESRCH; return 0; } // // Non-portable functions // int pthread_setname_np(pthread_t thread, const char *name) { ThreadInfo *inf; char *currentname; size_t namelen; D(bug("%s(%ld, %s)\n", __FUNCTION__, thread, name)); if (name == NULL) return ERANGE; ObtainSemaphore(&thread_sem); inf = GetThreadInfo(thread); if (inf != NULL) { currentname = inf->task->tc_Node.ln_Name; if (inf->parent == NULL) namelen = strlen(currentname) + 1; else namelen = NAMELEN; if (strlen(name) + 1 > namelen) inf = NULL; else strncpy(currentname, name, namelen); } ReleaseSemaphore(&thread_sem); if (inf == NULL) return ERANGE; return 0; } int pthread_getname_np(pthread_t thread, char *name, size_t len) { ThreadInfo *inf; char *currentname; D(bug("%s(%ld, %s, %ld)\n", __FUNCTION__, thread, name, len)); if (name == NULL || len == 0) return ERANGE; ObtainSemaphore(&thread_sem); inf = GetThreadInfo(thread); if (inf) { currentname = inf->task->tc_Node.ln_Name; if (strlen(currentname) + 1 > len) inf = NULL; else // length check passed - strcpy is ok. strcpy(name, currentname); } ReleaseSemaphore(&thread_sem); if (inf == NULL) return ERANGE; return 0; } // // Cancellation cleanup // void pthread_cleanup_push(void (*routine)(void*), void *arg) { pthread_t thread; ThreadInfo *inf; CleanupHandler *handler; D(bug("%s(%8lx, %8lx)\n", __FUNCTION__, routine, arg)); if (routine == NULL) return; handler = malloc(sizeof(CleanupHandler)); if (handler == NULL) return; thread = pthread_self(); handler->routine = routine; handler->arg = arg; ObtainSemaphore(&thread_sem); inf = GetThreadInfo(thread); AddTail((struct List*) &inf->cleanup, (struct Node*) handler); ReleaseSemaphore(&thread_sem); } void pthread_cleanup_pop(int execute) { pthread_t thread; ThreadInfo *inf; CleanupHandler *handler; D(bug("%s(%d)\n", __FUNCTION__, execute)); thread = pthread_self(); ObtainSemaphore(&thread_sem); inf = GetThreadInfo(thread); handler = (CleanupHandler*) RemTail((struct List*) &inf->cleanup); ReleaseSemaphore(&thread_sem); if (handler && handler->routine && execute) handler->routine(handler->arg); free(handler); } // // Signalling // int pthread_kill(pthread_t thread, int sig) { D(bug("%s(%ld, %ld) not implemented\n", __FUNCTION__, thread, sig)); return EINVAL; } // // Constructors, destructors // #ifndef __AMIGA__ static #endif int __pthread_Init_Func(void) { D(bug("%s\n", "********************")); if (sizeof(struct SignalSemaphore) > sizeof(struct __OpaqueSemaphore)) perror("__OpaqueSemaphore to small"); if (sizeof(struct MinList) > sizeof(struct __OpaqueMinList)) perror("__OpaqueMinList to small"); DB2(bug("%s()\n", __FUNCTION__)); #ifdef __MORPHOS__ memset(&waitutc, 0, sizeof(waitutc)); if (OpenDevice("timer.device", UNIT_WAITUTC, (APTR) &waitutc, 0) != 0) return FALSE; #endif //memset(&threads, 0, sizeof(threads)); InitSemaphore(&thread_sem); InitSemaphore(&tls_sem); // reserve ID 0 for the main thread _threads = (ThreadInfo*) malloc(sizeof(ThreadInfo) * INITIALSIZE); if (!_threads) exit(10); memset(_threads, 0, sizeof(ThreadInfo) * INITIALSIZE); numThreads = INITIALSIZE; ThreadInfo *inf = &_threads[0]; #ifdef __AMIGA__ inf->task = SysBase->ThisTask; #else inf->task = FindTask(NULL); #endif NEWLIST((struct List* )&inf->cleanup); return TRUE; } #ifndef __AMIGA__ static #endif void __pthread_Exit_Func(void) { #if defined(__MORPHOS__) || defined(__AMIGA__) pthread_t i; #endif /* main thread is no longer cancelable. */ GetThreadInfo(pthread_self())->cancelstate = PTHREAD_CANCEL_DISABLE; DB2(bug("%s()\n", __FUNCTION__)); // wait for the threads? #if defined(__MORPHOS__) || defined(__AMIGA__) // if we don't do this we can easily end up with unloaded code being executed for (i = 1; i < numThreads; i++) { // a thread might still be in creation! ObtainSemaphore(&thread_sem); ThreadInfo *inf = GetThreadInfo(i); inf->attr.detachstate = PTHREAD_CREATE_JOINABLE; // force it to be joinable or waiting may fail. ReleaseSemaphore(&thread_sem); pthread_join(i, NULL); } #endif #ifdef __MORPHOS__ CloseDevice((APTR) &waitutc); #endif } #if defined(__MORPHOS__) || defined(__AMIGA__) ADD2INIT(__pthread_Init_Func, -6); // below init cpp with -5 - it's needed there ADD2EXIT(__pthread_Exit_Func, -6); #else static CONSTRUCTOR_P(__pthread_Init_Func, 100) { return !__pthread_Init_Func(); } static DESTRUCTOR_P(__pthread_Exit_Func, 100) { __pthread_Exit_Func(); } #endif