Author: bonefish Date: 2009-12-31 18:03:41 +0100 (Thu, 31 Dec 2009) New Revision: 34830 Changeset: http://dev.haiku-os.org/changeset/34830/haiku Modified: haiku/trunk/headers/private/kernel/lock.h haiku/trunk/src/system/kernel/lock.cpp haiku/trunk/src/system/kernel/vm/VMAddressSpaceLocking.cpp haiku/trunk/src/tests/add-ons/kernel/kernelland_emu/lock.cpp Log: R/W lock implementation: * Changed the rw_lock_{read,write}_unlock() return values to void. They returned a value != B_OK only in case of user error and no-one checked them anyway. * Optimized rw_lock_read_[un]lock(). They are inline now and as long as there's no contending write locker, they will only perform an atomic_add(). * Changed the semantics of nested locking after acquiring a write lock: Read and write locks are counted separately, so read locks no longer implicitly become write locks. This does e.g. make degrading a write lock to a read lock by way of read_lock + write_unlock (as used in the VM) actually work. These changes speed up the -j8 Haiku image build on my machine by a few percent, but more interestingly they reduce the total kernel time by 25 %. Apparently we get more contention on other locks, now. Modified: haiku/trunk/headers/private/kernel/lock.h =================================================================== --- haiku/trunk/headers/private/kernel/lock.h 2009-12-31 16:42:30 UTC (rev 34829) +++ haiku/trunk/headers/private/kernel/lock.h 2009-12-31 17:03:41 UTC (rev 34830) @@ -1,5 +1,5 @@ /* - * Copyright 2008, Ingo Weinhold, ingo_weinhold@xxxxxxx + * Copyright 2008-2009, Ingo Weinhold, ingo_weinhold@xxxxxxx * Copyright 2002-2009, Axel Dörfler, axeld@xxxxxxxxxxxxxxxxx * Distributed under the terms of the MIT License. * @@ -45,12 +45,21 @@ const char* name; struct rw_lock_waiter* waiters; thread_id holder; - int32 reader_count; - int32 writer_count; + vint32 count; int32 owner_count; + int16 active_readers; + // Only > 0 while a writer is waiting: number + // of active readers when the first waiting + // writer started waiting. + int16 pending_readers; + // Number of readers that have already + // incremented "count", but have not yet started + // to wait at the time the last writer unlocked. uint32 flags; } rw_lock; +#define RW_LOCK_WRITER_COUNT_BASE 0x10000 + #define RW_LOCK_FLAG_CLONE_NAME 0x1 @@ -114,10 +123,8 @@ // name is *not* cloned nor freed in rw_lock_destroy() extern void rw_lock_init_etc(rw_lock* lock, const char* name, uint32 flags); extern void rw_lock_destroy(rw_lock* lock); -extern status_t rw_lock_read_lock(rw_lock* lock); -extern status_t rw_lock_read_unlock(rw_lock* lock); extern status_t rw_lock_write_lock(rw_lock* lock); -extern status_t rw_lock_write_unlock(rw_lock* lock); +extern void rw_lock_write_unlock(rw_lock* lock); extern void mutex_init(mutex* lock, const char* name); // name is *not* cloned nor freed in mutex_destroy() @@ -129,7 +136,12 @@ // to, the operation is safe as long as "from" is held while destroying // "to". + // implementation private: + +extern status_t _rw_lock_read_lock(rw_lock* lock); +extern void _rw_lock_read_unlock(rw_lock* lock); + extern status_t _mutex_lock(mutex* lock, bool threadsLocked); extern void _mutex_unlock(mutex* lock, bool threadsLocked); extern status_t _mutex_trylock(mutex* lock); @@ -138,6 +150,33 @@ static inline status_t +rw_lock_read_lock(rw_lock* lock) +{ +#if KDEBUG_RW_LOCK_DEBUG + return rw_lock_write_lock(lock); +#else + int32 oldCount = atomic_add(&lock->count, 1); + if (oldCount >= RW_LOCK_WRITER_COUNT_BASE) + return _rw_lock_read_lock(lock); + return B_OK; +#endif +} + + +static inline void +rw_lock_read_unlock(rw_lock* lock) +{ +#if KDEBUG_RW_LOCK_DEBUG + rw_lock_write_unlock(lock); +#else + int32 oldCount = atomic_add(&lock->count, -1); + if (oldCount >= RW_LOCK_WRITER_COUNT_BASE) + _rw_lock_read_unlock(lock); +#endif +} + + +static inline status_t mutex_lock(mutex* lock) { #if KDEBUG Modified: haiku/trunk/src/system/kernel/lock.cpp =================================================================== --- haiku/trunk/src/system/kernel/lock.cpp 2009-12-31 16:42:30 UTC (rev 34829) +++ haiku/trunk/src/system/kernel/lock.cpp 2009-12-31 17:03:41 UTC (rev 34830) @@ -1,5 +1,5 @@ /* - * Copyright 2008, Ingo Weinhold, ingo_weinhold@xxxxxxx + * Copyright 2008-2009, Ingo Weinhold, ingo_weinhold@xxxxxxx * Copyright 2002-2009, Axel Dörfler, axeld@xxxxxxxxxxxxxxxxx All rights reserved. * Distributed under the terms of the MIT License. * @@ -170,43 +170,50 @@ } -static void +static int32 rw_lock_unblock(rw_lock* lock) { // Check whether there are any waiting threads at all and whether anyone // has the write lock. rw_lock_waiter* waiter = lock->waiters; - if (waiter == NULL || lock->holder > 0) - return; + if (waiter == NULL || lock->holder >= 0) + return 0; // writer at head of queue? if (waiter->writer) { - if (lock->reader_count == 0) { - // dequeue writer - lock->waiters = waiter->next; - if (lock->waiters != NULL) - lock->waiters->last = waiter->last; + if (lock->active_readers > 0 || lock->pending_readers > 0) + return 0; - lock->holder = waiter->thread->id; + // dequeue writer + lock->waiters = waiter->next; + if (lock->waiters != NULL) + lock->waiters->last = waiter->last; - // unblock thread - thread_unblock_locked(waiter->thread, B_OK); - } - return; + lock->holder = waiter->thread->id; + + // unblock thread + thread_unblock_locked(waiter->thread, B_OK); + return RW_LOCK_WRITER_COUNT_BASE; } // wake up one or more readers - while ((waiter = lock->waiters) != NULL && !waiter->writer) { + uint32 readerCount = 0; + do { // dequeue reader lock->waiters = waiter->next; if (lock->waiters != NULL) lock->waiters->last = waiter->last; - lock->reader_count++; + readerCount++; // unblock thread thread_unblock_locked(waiter->thread, B_OK); - } + } while ((waiter = lock->waiters) != NULL && !waiter->writer); + + if (lock->count >= RW_LOCK_WRITER_COUNT_BASE) + lock->active_readers += readerCount; + + return readerCount; } @@ -216,9 +223,10 @@ lock->name = name; lock->waiters = NULL; lock->holder = -1; - lock->reader_count = 0; - lock->writer_count = 0; + lock->count = 0; lock->owner_count = 0; + lock->active_readers = 0; + lock->pending_readers = 0; lock->flags = 0; T_SCHEDULING_ANALYSIS(InitRWLock(lock, name)); @@ -232,9 +240,10 @@ lock->name = (flags & RW_LOCK_FLAG_CLONE_NAME) != 0 ? strdup(name) : name; lock->waiters = NULL; lock->holder = -1; - lock->reader_count = 0; - lock->writer_count = 0; + lock->count = 0; lock->owner_count = 0; + lock->active_readers = 0; + lock->pending_readers = 0; lock->flags = flags & RW_LOCK_FLAG_CLONE_NAME; T_SCHEDULING_ANALYSIS(InitRWLock(lock, name)); @@ -253,7 +262,7 @@ #if KDEBUG if (lock->waiters != NULL && thread_get_current_thread_id() - != lock->holder) { + != lock->holder) { panic("rw_lock_destroy(): there are blocking threads, but the caller " "doesn't hold the write lock (%p)", lock); @@ -280,89 +289,106 @@ } +#if !KDEBUG_RW_LOCK_DEBUG + status_t -rw_lock_read_lock(rw_lock* lock) +_rw_lock_read_lock(rw_lock* lock) { -#if KDEBUG_RW_LOCK_DEBUG - return rw_lock_write_lock(lock); -#else InterruptsSpinLocker locker(gThreadSpinlock); - if (lock->writer_count == 0) { - lock->reader_count++; - return B_OK; - } + // We might be the writer ourselves. if (lock->holder == thread_get_current_thread_id()) { lock->owner_count++; return B_OK; } + // The writer that originally had the lock when we called atomic_add() might + // already have gone and another writer could have overtaken us. In this + // case the original writer set pending_readers, so we know that we don't + // have to wait. + if (lock->pending_readers > 0) { + lock->pending_readers--; + + if (lock->count >= RW_LOCK_WRITER_COUNT_BASE) + lock->active_readers++; + + return B_OK; + } + + ASSERT(lock->count >= RW_LOCK_WRITER_COUNT_BASE); + + // we need to wait return rw_lock_wait(lock, false); -#endif } -status_t -rw_lock_read_unlock(rw_lock* lock) +void +_rw_lock_read_unlock(rw_lock* lock) { -#if KDEBUG_RW_LOCK_DEBUG - return rw_lock_write_unlock(lock); -#else InterruptsSpinLocker locker(gThreadSpinlock); + // If we're still holding the write lock or if there are other readers, + // no-one can be woken up. if (lock->holder == thread_get_current_thread_id()) { - if (--lock->owner_count > 0) - return B_OK; - - // this originally has been a write lock - lock->writer_count--; - lock->holder = -1; - - rw_lock_unblock(lock); - return B_OK; + ASSERT(lock->owner_count % RW_LOCK_WRITER_COUNT_BASE > 0); + lock->owner_count--; + return; } - if (lock->reader_count <= 0) { - panic("rw_lock_read_unlock(): lock %p not read-locked", lock); - return B_BAD_VALUE; - } + if (--lock->active_readers > 0) + return; - lock->reader_count--; + if (lock->active_readers < 0) { + panic("rw_lock_read_unlock(): lock %p not read-locked", lock); + lock->active_readers = 0; + return; + } rw_lock_unblock(lock); - return B_OK; -#endif } +#endif // !KDEBUG_RW_LOCK_DEBUG + status_t rw_lock_write_lock(rw_lock* lock) { InterruptsSpinLocker locker(gThreadSpinlock); - if (lock->reader_count == 0 && lock->writer_count == 0) { - lock->writer_count++; - lock->holder = thread_get_current_thread_id(); - lock->owner_count = 1; + // If we're already the lock holder, we just need to increment the owner + // count. + thread_id thread = thread_get_current_thread_id(); + if (lock->holder == thread) { + lock->owner_count += RW_LOCK_WRITER_COUNT_BASE; return B_OK; } - if (lock->holder == thread_get_current_thread_id()) { - lock->owner_count++; + + // announce our claim + int32 oldCount = atomic_add(&lock->count, RW_LOCK_WRITER_COUNT_BASE); + + if (oldCount == 0) { + // No-one else held a read or write lock, so it's ours now. + lock->holder = thread; + lock->owner_count = RW_LOCK_WRITER_COUNT_BASE; return B_OK; } - lock->writer_count++; + // We have to wait. If we're the first writer, note the current reader + // count. + if (oldCount < RW_LOCK_WRITER_COUNT_BASE) + lock->active_readers = oldCount - lock->pending_readers; status_t status = rw_lock_wait(lock, true); if (status == B_OK) { - lock->holder = thread_get_current_thread_id(); - lock->owner_count = 1; + lock->holder = thread; + lock->owner_count = RW_LOCK_WRITER_COUNT_BASE; } + return status; } -status_t +void rw_lock_write_unlock(rw_lock* lock) { InterruptsSpinLocker locker(gThreadSpinlock); @@ -370,17 +396,41 @@ if (thread_get_current_thread_id() != lock->holder) { panic("rw_lock_write_unlock(): lock %p not write-locked by this thread", lock); - return B_BAD_VALUE; + return; } - if (--lock->owner_count > 0) - return B_OK; - lock->writer_count--; + ASSERT(lock->owner_count >= RW_LOCK_WRITER_COUNT_BASE); + + lock->owner_count -= RW_LOCK_WRITER_COUNT_BASE; + if (lock->owner_count >= RW_LOCK_WRITER_COUNT_BASE) + return; + + // We gave up our last write lock -- clean up and unblock waiters. + int32 readerCount = lock->owner_count; lock->holder = -1; + lock->owner_count = 0; - rw_lock_unblock(lock); + int32 oldCount = atomic_add(&lock->count, -RW_LOCK_WRITER_COUNT_BASE); + oldCount -= RW_LOCK_WRITER_COUNT_BASE; - return B_OK; + if (oldCount != 0) { + // If writers are waiting, take over our reader count. + if (oldCount >= RW_LOCK_WRITER_COUNT_BASE) { + lock->active_readers = readerCount; + rw_lock_unblock(lock); + } else { + // No waiting writer, but there are one or more readers. We will + // unblock all waiting readers -- that's the easy part -- and must + // also make sure that all readers that haven't entered the critical + // section yet, won't start to wait. Otherwise a writer overtaking + // such a reader will correctly start to wait, but the reader, + // seeing the writer count > 0, would also start to wait. We set + // pending_readers to the number of readers that are still expected + // to enter the critical section. + lock->pending_readers = oldCount - readerCount + - rw_lock_unblock(lock); + } + } } @@ -402,9 +452,10 @@ kprintf("rw lock %p:\n", lock); kprintf(" name: %s\n", lock->name); kprintf(" holder: %ld\n", lock->holder); - kprintf(" reader count: %ld\n", lock->reader_count); - kprintf(" writer count: %ld\n", lock->writer_count); - kprintf(" owner count: %ld\n", lock->owner_count); + kprintf(" count: %#lx\n", lock->count); + kprintf(" active readers %d\n", lock->active_readers); + kprintf(" pending readers %d\n", lock->pending_readers); + kprintf(" owner count: %#lx\n", lock->owner_count); kprintf(" flags: %#lx\n", lock->flags); kprintf(" waiting threads:"); Modified: haiku/trunk/src/system/kernel/vm/VMAddressSpaceLocking.cpp =================================================================== --- haiku/trunk/src/system/kernel/vm/VMAddressSpaceLocking.cpp 2009-12-31 16:42:30 UTC (rev 34829) +++ haiku/trunk/src/system/kernel/vm/VMAddressSpaceLocking.cpp 2009-12-31 17:03:41 UTC (rev 34830) @@ -292,7 +292,6 @@ void AddressSpaceWriteLocker::DegradeToReadLock() { - // TODO: the current R/W lock implementation just keeps the write lock here fSpace->ReadLock(); fSpace->WriteUnlock(); fDegraded = true; Modified: haiku/trunk/src/tests/add-ons/kernel/kernelland_emu/lock.cpp =================================================================== --- haiku/trunk/src/tests/add-ons/kernel/kernelland_emu/lock.cpp 2009-12-31 16:42:30 UTC (rev 34829) +++ haiku/trunk/src/tests/add-ons/kernel/kernelland_emu/lock.cpp 2009-12-31 17:03:41 UTC (rev 34830) @@ -215,43 +215,50 @@ } -static void +static int32 rw_lock_unblock(rw_lock* lock) { // Check whether there any waiting threads at all and whether anyone // has the write lock. rw_lock_waiter* waiter = lock->waiters; if (waiter == NULL || lock->holder > 0) - return; + return 0; // writer at head of queue? if (waiter->writer) { - if (lock->reader_count == 0) { - // dequeue writer - lock->waiters = waiter->next; - if (lock->waiters != NULL) - lock->waiters->last = waiter->last; + if (lock->active_readers > 0 || lock->pending_readers > 0) + return 0; - lock->holder = get_thread_id(waiter->thread); + // dequeue writer + lock->waiters = waiter->next; + if (lock->waiters != NULL) + lock->waiters->last = waiter->last; - // unblock thread - _kern_unblock_thread(get_thread_id(waiter->thread), B_OK); - } - return; + lock->holder = get_thread_id(waiter->thread); + + // unblock thread + _kern_unblock_thread(get_thread_id(waiter->thread), B_OK); + return RW_LOCK_WRITER_COUNT_BASE; } // wake up one or more readers - while ((waiter = lock->waiters) != NULL && !waiter->writer) { + uint32 readerCount = 0; + do { // dequeue reader lock->waiters = waiter->next; if (lock->waiters != NULL) lock->waiters->last = waiter->last; - lock->reader_count++; + readerCount++; // unblock thread _kern_unblock_thread(get_thread_id(waiter->thread), B_OK); - } + } while ((waiter = lock->waiters) != NULL && !waiter->writer); + + if (lock->count >= RW_LOCK_WRITER_COUNT_BASE) + lock->active_readers += readerCount; + + return readerCount; } @@ -261,9 +268,10 @@ lock->name = name; lock->waiters = NULL; lock->holder = -1; - lock->reader_count = 0; - lock->writer_count = 0; + lock->count = 0; lock->owner_count = 0; + lock->active_readers = 0; + lock->pending_readers = 0; lock->flags = 0; } @@ -274,9 +282,10 @@ lock->name = (flags & RW_LOCK_FLAG_CLONE_NAME) != 0 ? strdup(name) : name; lock->waiters = NULL; lock->holder = -1; - lock->reader_count = 0; - lock->writer_count = 0; + lock->count = 0; lock->owner_count = 0; + lock->active_readers = 0; + lock->pending_readers = 0; lock->flags = flags & RW_LOCK_FLAG_CLONE_NAME; } @@ -292,7 +301,7 @@ #if KDEBUG if (lock->waiters != NULL && find_thread(NULL) - != lock->holder) { + != lock->holder) { panic("rw_lock_destroy(): there are blocking threads, but the caller " "doesn't hold the write lock (%p)", lock); @@ -319,89 +328,103 @@ } +#if !KDEBUG_RW_LOCK_DEBUG + status_t -rw_lock_read_lock(rw_lock* lock) +_rw_lock_read_lock(rw_lock* lock) { -#if KDEBUG_RW_LOCK_DEBUG - return rw_lock_write_lock(lock); -#else AutoLocker<ThreadSpinlock> locker(sThreadSpinlock); - if (lock->writer_count == 0) { - lock->reader_count++; - return B_OK; - } + // We might be the writer ourselves. if (lock->holder == find_thread(NULL)) { lock->owner_count++; return B_OK; } + // The writer that originally had the lock when we called atomic_add() might + // already have gone and another writer could have overtaken us. In this + // case the original writer set pending_readers, so we know that we don't + // have to wait. + if (lock->pending_readers > 0) { + lock->pending_readers--; + + if (lock->count >= RW_LOCK_WRITER_COUNT_BASE) + lock->active_readers++; + + return B_OK; + } + + // we need to wait return rw_lock_wait(lock, false); -#endif } -status_t -rw_lock_read_unlock(rw_lock* lock) +void +_rw_lock_read_unlock(rw_lock* lock) { -#if KDEBUG_RW_LOCK_DEBUG - return rw_lock_write_unlock(lock); -#else AutoLocker<ThreadSpinlock> locker(sThreadSpinlock); + // If we're still holding the write lock or if there are other readers, + // no-one can be woken up. if (lock->holder == find_thread(NULL)) { - if (--lock->owner_count > 0) - return B_OK; - - // this originally has been a write lock - lock->writer_count--; - lock->holder = -1; - - rw_lock_unblock(lock); - return B_OK; + lock->owner_count--; + return; } - if (lock->reader_count <= 0) { - panic("rw_lock_read_unlock(): lock %p not read-locked", lock); - return B_BAD_VALUE; - } + if (--lock->active_readers > 0) + return; - lock->reader_count--; + if (lock->active_readers < 0) { + panic("rw_lock_read_unlock(): lock %p not read-locked", lock); + lock->active_readers = 0; + return; + } rw_lock_unblock(lock); - return B_OK; -#endif } +#endif // !KDEBUG_RW_LOCK_DEBUG + status_t rw_lock_write_lock(rw_lock* lock) { AutoLocker<ThreadSpinlock> locker(sThreadSpinlock); - if (lock->reader_count == 0 && lock->writer_count == 0) { - lock->writer_count++; - lock->holder = find_thread(NULL); - lock->owner_count = 1; + // If we're already the lock holder, we just need to increment the owner + // count. + thread_id thread = find_thread(NULL); + if (lock->holder == thread) { + lock->owner_count += RW_LOCK_WRITER_COUNT_BASE; return B_OK; } - if (lock->holder == find_thread(NULL)) { - lock->owner_count++; + + // announce our claim + int32 oldCount = atomic_add(&lock->count, RW_LOCK_WRITER_COUNT_BASE); + + if (oldCount == 0) { + // No-one else held a read or write lock, so it's ours now. + lock->holder = thread; + lock->owner_count = RW_LOCK_WRITER_COUNT_BASE; return B_OK; } - lock->writer_count++; + // We have to wait. If we're the first writer, note the current reader + // count. + if (oldCount < RW_LOCK_WRITER_COUNT_BASE) + lock->active_readers = oldCount - lock->pending_readers; status_t status = rw_lock_wait(lock, true); if (status == B_OK) { - lock->holder = find_thread(NULL); - lock->owner_count = 1; + lock->holder = thread; + lock->owner_count = RW_LOCK_WRITER_COUNT_BASE; } + return status; } -status_t +void rw_lock_write_unlock(rw_lock* lock) { AutoLocker<ThreadSpinlock> locker(sThreadSpinlock); @@ -409,17 +432,39 @@ if (find_thread(NULL) != lock->holder) { panic("rw_lock_write_unlock(): lock %p not write-locked by this thread", lock); - return B_BAD_VALUE; + return; } - if (--lock->owner_count > 0) - return B_OK; - lock->writer_count--; + lock->owner_count -= RW_LOCK_WRITER_COUNT_BASE; + if (lock->owner_count >= RW_LOCK_WRITER_COUNT_BASE) + return; + + // We gave up our last write lock -- clean up and unblock waiters. + int32 readerCount = lock->owner_count; lock->holder = -1; + lock->owner_count = 0; - rw_lock_unblock(lock); + int32 oldCount = atomic_add(&lock->count, -RW_LOCK_WRITER_COUNT_BASE); + oldCount -= RW_LOCK_WRITER_COUNT_BASE; - return B_OK; + if (oldCount != 0) { + // If writers are waiting, take over our reader count. + if (oldCount >= RW_LOCK_WRITER_COUNT_BASE) { + lock->active_readers = readerCount; + rw_lock_unblock(lock); + } else { + // No waiting writer, but there are one or more readers. We will + // unblock all waiting readers -- that's the easy part -- and must + // also make sure that all readers that haven't entered the critical + // section yet, won't start to wait. Otherwise a writer overtaking + // such a reader will correctly start to wait, but the reader, + // seeing the writer count > 0, would also start to wait. We set + // pending_readers to the number of readers that are still expected + // to enter the critical section. + lock->pending_readers = oldCount - readerCount + - rw_lock_unblock(lock); + } + } }