added 2 changesets to branch 'refs/remotes/pdziepak-github/scheduler' old head: 7aba623f524cb190460597f5d54bd9e890363def new head: f823aacf59cceb227850475367a6c492eef08667 overview: https://github.com/pdziepak/Haiku/compare/7aba623...f823aac ---------------------------------------------------------------------------- 2e0ee59: scheduler_affine: Migrate threads from overloaded cores * Keep number of CPU bound threads on cores balanced. * If possible migrate normal threads from cores with cpu bound ones to the less busy cores. f823aac: scheduler_affine: Remove old code [ Pawel Dziepak <pdziepak@xxxxxxxxxxx> ] ---------------------------------------------------------------------------- 1 file changed, 103 insertions(+), 110 deletions(-) src/system/kernel/scheduler/scheduler_affine.cpp | 213 +++++++++---------- ############################################################################ Commit: 2e0ee59462130209dc9bfd49ad05196687551cd8 Author: Pawel Dziepak <pdziepak@xxxxxxxxxxx> Date: Mon Oct 21 23:11:41 2013 UTC scheduler_affine: Migrate threads from overloaded cores * Keep number of CPU bound threads on cores balanced. * If possible migrate normal threads from cores with cpu bound ones to the less busy cores. ---------------------------------------------------------------------------- diff --git a/src/system/kernel/scheduler/scheduler_affine.cpp b/src/system/kernel/scheduler/scheduler_affine.cpp index 18d3139..8efd3aa 100644 --- a/src/system/kernel/scheduler/scheduler_affine.cpp +++ b/src/system/kernel/scheduler/scheduler_affine.cpp @@ -89,6 +89,8 @@ typedef Heap<CoreEntry, int32, HeapLesserCompare<int32>, HeapMemberGetLink<CoreEntry, int32, &CoreEntry::fThreadHeapLink> > AffineCoreThreadHeap; static AffineCoreThreadHeap* sCoreThreadHeap; +static int32 sCPUBoundThreads; +static int32 sAssignedThreads; // sPackageUsageHeap is used to decide which core should be woken up from the // idle state. When aiming for performance we should use as many packages as @@ -508,6 +510,7 @@ affine_increase_penalty(Thread* thread) int32 core = schedulerThreadData->previous_core; ASSERT(core >= 0); if (schedulerThreadData->additional_penalty == 0) { + sCPUBoundThreads++; sCoreEntries[core].fCPUBoundThreads++; affine_update_thread_heaps(core); } @@ -526,14 +529,6 @@ affine_cancel_penalty(Thread* thread) if (schedulerThreadData->priority_penalty != 0) TRACE("cancelling thread %ld penalty\n", thread->id); - if (schedulerThreadData->additional_penalty != 0) { - int32 core = schedulerThreadData->previous_core; - ASSERT(core >= 0); - - sCoreEntries[core].fCPUBoundThreads--; - affine_update_thread_heaps(core); - } - schedulerThreadData->priority_penalty = 0; schedulerThreadData->additional_penalty = 0; } @@ -679,30 +674,93 @@ affine_choose_cpu(int32 core) } -static void -affine_assign_thread_to_core(Thread* thread, int32 targetCore) +static bool +affine_should_rebalance(Thread* thread) { + if (thread_is_idle_thread(thread)) + return false; + scheduler_thread_data* schedulerThreadData = thread->scheduler_data; + ASSERT(schedulerThreadData->previous_core >= 0); + + CoreEntry* coreEntry = &sCoreEntries[schedulerThreadData->previous_core]; - int32 oldCore = schedulerThreadData->previous_core; - if (oldCore == targetCore) + // If this is a cpu bound thread and we have significantly more such threads + // than the average get rid of this one. + if (schedulerThreadData->additional_penalty != 0) { + int32 averageCPUBound = sCPUBoundThreads / sRunQueueCount; + if (coreEntry->fCPUBoundThreads - averageCPUBound > 1) + return true; + return false; + } + + // If this thread is not cpu bound but we have at least one consider giving + // this one to someone less busy. + int32 averageThread = sAssignedThreads / sRunQueueCount; + if (coreEntry->fCPUBoundThreads > 0) { + CoreEntry* other = sCoreThreadHeap->PeekRoot(); + if (AffineCoreThreadHeap::GetKey(other) <= averageThread) + return true; + } + + // No cpu bound threads - the situation is quite good. Make sure it + // won't get much worse... + const int32 kBalanceThreshold = 3; + return coreEntry->fThreads - averageThread > kBalanceThreshold; +} + + +static void +affine_assign_active_thread_to_core(Thread* thread) +{ + if (thread_is_idle_thread(thread)) return; - if (oldCore >= 0) { - sCoreEntries[oldCore].fThreads--; - if (schedulerThreadData->additional_penalty != 0) - sCoreEntries[oldCore].fCPUBoundThreads--; + scheduler_thread_data* schedulerThreadData = thread->scheduler_data; + + ASSERT(schedulerThreadData->previous_core >= 0); + int32 core = schedulerThreadData->previous_core; + + sCoreEntries[core].fThreads++; + sAssignedThreads++; - affine_update_thread_heaps(oldCore); + if (schedulerThreadData->additional_penalty != 0) { + sCoreEntries[core].fCPUBoundThreads++; + sCPUBoundThreads++; } - schedulerThreadData->previous_core = targetCore; - if (targetCore >= 0) { - sCoreEntries[targetCore].fThreads++; - if (schedulerThreadData->additional_penalty != 0) - sCoreEntries[targetCore].fCPUBoundThreads++; - affine_update_thread_heaps(targetCore); + affine_update_thread_heaps(core); +} + + +static inline void +affine_thread_goes_away(Thread* thread) +{ + if (thread_is_idle_thread(thread)) + return; + + scheduler_thread_data* schedulerThreadData = thread->scheduler_data; + + ASSERT(schedulerThreadData->previous_core >= 0); + int32 core = schedulerThreadData->previous_core; + + ASSERT(sCoreEntries[core].fThreads > 0); + ASSERT(sCoreEntries[core].fThreads > sCoreEntries[core].fCPUBoundThreads + || (sCoreEntries[core].fThreads == sCoreEntries[core].fCPUBoundThreads + && schedulerThreadData->additional_penalty != 0)); + sCoreEntries[core].fThreads--; + sAssignedThreads--; + + if (schedulerThreadData->additional_penalty != 0) { + ASSERT(sCoreEntries[core].fCPUBoundThreads > 0); + sCoreEntries[core].fCPUBoundThreads--; + sCPUBoundThreads--; } + + affine_update_thread_heaps(core); + + schedulerThreadData->went_sleep = system_time(); + schedulerThreadData->went_sleep_active = sCoreEntries[core].fActiveTime; } @@ -728,21 +786,31 @@ affine_enqueue(Thread* thread, bool newOne) targetCPU = thread->previous_cpu->cpu_num; targetCore = sCPUToCore[targetCPU]; ASSERT(targetCore == schedulerThreadData->previous_core); + + if (newOne) + affine_assign_active_thread_to_core(thread); } else if (schedulerThreadData->previous_core < 0 - || (newOne && affine_has_cache_expired(thread))) { + || (newOne && affine_has_cache_expired(thread)) + || affine_should_rebalance(thread)) { if (thread_is_idle_thread(thread)) { targetCPU = thread->previous_cpu->cpu_num; targetCore = sCPUToCore[targetCPU]; } else { + if (!newOne) + affine_thread_goes_away(thread); + targetCore = affine_choose_core(threadPriority); targetCPU = affine_choose_cpu(targetCore); } - affine_assign_thread_to_core(thread, targetCore); + schedulerThreadData->previous_core = targetCore; + affine_assign_active_thread_to_core(thread); } else { targetCore = schedulerThreadData->previous_core; targetCPU = affine_choose_cpu(targetCore); + if (newOne) + affine_assign_active_thread_to_core(thread); } TRACE("enqueueing thread %ld with priority %ld %ld\n", thread->id, @@ -815,26 +883,6 @@ affine_put_back(Thread* thread) #if 0 -/*! Dequeues the thread after the given \a prevThread from the run queue. -*/ -static inline Thread * -dequeue_from_run_queue(Thread *prevThread, int32 currentCPU) -{ - Thread *resultThread = NULL; - if (prevThread != NULL) { - resultThread = prevThread->queue_next; - prevThread->queue_next = resultThread->queue_next; - } else { - resultThread = sRunQueue[currentCPU]; - sRunQueue[currentCPU] = resultThread->queue_next; - } - sRunQueueSize[currentCPU]--; - resultThread->scheduler_data->fLastQueue = -1; - - return resultThread; -} - - /*! Looks for a possible thread to grab/run from another CPU. Note: thread lock must be held when entering this function */ @@ -899,12 +947,17 @@ affine_set_thread_priority(Thread *thread, int32 priority) thread->id, priority, thread->priority, affine_get_effective_priority(thread)); - if (thread->state == B_THREAD_RUNNING) + if (thread->state == B_THREAD_RUNNING) { + affine_thread_goes_away(thread); affine_update_priority_heaps(thread->cpu->cpu_num, priority); + } if (thread->state != B_THREAD_READY) { affine_cancel_penalty(thread); thread->priority = priority; + + if (thread->state == B_THREAD_RUNNING) + affine_assign_active_thread_to_core(thread); return; } @@ -921,6 +974,7 @@ affine_set_thread_priority(Thread *thread, int32 priority) int32 previousCore = thread->scheduler_data->previous_core; ASSERT(previousCore >= 0); sRunQueues[previousCore].Remove(thread); + affine_thread_goes_away(thread); // set priority and re-insert affine_cancel_penalty(thread); @@ -1087,15 +1141,6 @@ affine_track_cpu_activity(Thread* oldThread, Thread* nextThread, int32 thisCore) } -static inline void -affine_thread_goes_sleep(Thread* thread, int32 thisCore) -{ - scheduler_thread_data* schedulerThreadData = thread->scheduler_data; - schedulerThreadData->went_sleep = system_time(); - schedulerThreadData->went_sleep_active = sCoreEntries[thisCore].fActiveTime; -} - - /*! Runs the scheduler. Note: expects thread spinlock to be held */ @@ -1142,14 +1187,14 @@ affine_reschedule(void) break; case B_THREAD_SUSPENDED: - affine_thread_goes_sleep(oldThread, thisCore); + affine_thread_goes_away(oldThread); TRACE("reschedule(): suspending thread %ld\n", oldThread->id); break; case THREAD_STATE_FREE_ON_RESCHED: - affine_assign_thread_to_core(oldThread, -1); + affine_thread_goes_away(oldThread); break; default: - affine_thread_goes_sleep(oldThread, thisCore); + affine_thread_goes_away(oldThread); TRACE("not enqueueing thread %ld into run queue next_state = %ld\n", oldThread->id, oldThread->next_state); break; ############################################################################ Commit: f823aacf59cceb227850475367a6c492eef08667 Author: Pawel Dziepak <pdziepak@xxxxxxxxxxx> Date: Mon Oct 21 23:21:51 2013 UTC scheduler_affine: Remove old code ---------------------------------------------------------------------------- diff --git a/src/system/kernel/scheduler/scheduler_affine.cpp b/src/system/kernel/scheduler/scheduler_affine.cpp index 8efd3aa..c4f4671 100644 --- a/src/system/kernel/scheduler/scheduler_affine.cpp +++ b/src/system/kernel/scheduler/scheduler_affine.cpp @@ -882,58 +882,6 @@ affine_put_back(Thread* thread) } -#if 0 -/*! Looks for a possible thread to grab/run from another CPU. - Note: thread lock must be held when entering this function -*/ -static Thread * -steal_thread_from_other_cpus(int32 currentCPU) -{ - // look through the active CPUs - find the one - // that has a) threads available to steal, and - // b) out of those, the one that's the most CPU-bound - // TODO: make this more intelligent along with enqueue - // - we need to try and maintain a reasonable balance - // in run queue sizes across CPUs, and also try to maintain - // an even distribution of cpu bound / interactive threads - int32 targetCPU = -1; - for (int32 i = 0; i < smp_get_num_cpus(); i++) { - // skip CPUs that have either no or only one thread - if (i == currentCPU || sRunQueueSize[i] < 2) - continue; - - // out of the CPUs with threads available to steal, - // pick whichever one is generally the most CPU bound. - if (targetCPU < 0 - || sRunQueue[i]->priority > sRunQueue[targetCPU]->priority - || (sRunQueue[i]->priority == sRunQueue[targetCPU]->priority - && sRunQueueSize[i] > sRunQueueSize[targetCPU])) - targetCPU = i; - } - - if (targetCPU < 0) - return NULL; - - Thread* nextThread = sRunQueue[targetCPU]; - Thread* prevThread = NULL; - - while (nextThread != NULL) { - // grab the highest priority non-pinned thread - // out of this CPU's queue, dequeue and return it - if (nextThread->pinned_to_cpu <= 0) { - dequeue_from_run_queue(prevThread, targetCPU); - return nextThread; - } - - prevThread = nextThread; - nextThread = nextThread->queue_next; - } - - return NULL; -} -#endif - - /*! Sets the priority of a thread. Note: thread lock must be held when entering this function */