added 2 changesets to branch 'refs/remotes/axeld-github/launch_daemon'
old head: 8a604e82d785f991629f4b99c8f62205a24ccea2
new head: bb37de24ab6377e4be019c5110d8036947205de8
overview: https://github.com/axeld/haiku/compare/8a604e82d785...bb37de24ab63
----------------------------------------------------------------------------
02a405f8674e: JobQueue: fixed leak, notification, added Pop() variant.
* Was leaking fQueuedJobs on destruction.
* fHaveRunnableJobSem implementation was not completed; it was never
released.
* Added Pop() variant that is a bit more flexible, and allows for a
timeout as well as waiting even when the queue is empty, and can
return a status code.
bb37de24ab63: launch_daemon: use BJobs for everything, multi-threaded worker.
* Implemented a multi-threaded job queue, that launches as many as CPU
count jobs at once -- due to SSDs, this number could possibly even be
made much larger, though.
* Launching a job now happens within that worker queues as a LaunchJob.
* Put all init jobs into an init target, and make the launch jobs depend
on it.
[ Axel Dörfler <axeld@xxxxxxxxxxxxxxxx> ]
----------------------------------------------------------------------------
3 files changed, 281 insertions(+), 29 deletions(-)
headers/private/support/JobQueue.h | 6 +-
src/kits/support/JobQueue.cpp | 63 ++++++---
src/servers/launch/LaunchDaemon.cpp | 241 ++++++++++++++++++++++++++++++--
############################################################################
Commit: 02a405f8674e957ec607d69613834a53456c7a24
Author: Axel Dörfler <axeld@xxxxxxxxxxxxxxxx>
Date: Sun May 31 17:03:34 2015 UTC
JobQueue: fixed leak, notification, added Pop() variant.
* Was leaking fQueuedJobs on destruction.
* fHaveRunnableJobSem implementation was not completed; it was never
released.
* Added Pop() variant that is a bit more flexible, and allows for a
timeout as well as waiting even when the queue is empty, and can
return a status code.
----------------------------------------------------------------------------
diff --git a/headers/private/support/JobQueue.h
b/headers/private/support/JobQueue.h
index 6a5efbf..468877a 100644
--- a/headers/private/support/JobQueue.h
+++ b/headers/private/support/JobQueue.h
@@ -30,8 +30,12 @@ public:
//
gives up ownership
BJob* Pop();
+ status_t Pop(bigtime_t timeout,
bool returnWhenEmpty,
+ BJob**
_job);
//
caller owns job
+ size_t CountJobs() const;
+
void Close();
private:
@@ -49,7 +53,7 @@ private:
void
_RequeueDependantJobsOf(BJob* job);
void
_RemoveDependantJobsOf(BJob* job);
- BLocker fLock;
+ mutable BLocker fLock;
uint32 fNextTicketNumber;
JobPriorityQueue* fQueuedJobs;
sem_id fHaveRunnableJobSem;
diff --git a/src/kits/support/JobQueue.cpp b/src/kits/support/JobQueue.cpp
index 04b4624..0246c75 100644
--- a/src/kits/support/JobQueue.cpp
+++ b/src/kits/support/JobQueue.cpp
@@ -3,6 +3,7 @@
* Distributed under the terms of the MIT License.
*
* Authors:
+ * Axel Dörfler <axeld@xxxxxxxxxxxxxxxx>
* Oliver Tappe <zooey@xxxxxxxxxxxxxxx>
*/
@@ -50,6 +51,9 @@ class JobQueue::JobPriorityQueue
};
+// #pragma mark -
+
+
JobQueue::JobQueue()
:
fLock("job queue"),
@@ -61,6 +65,8 @@ JobQueue::JobQueue()
JobQueue::~JobQueue()
{
+ Close();
+ delete fQueuedJobs;
}
@@ -89,6 +95,8 @@ JobQueue::AddJob(BJob* job)
}
BJob::Private(*job).SetTicketNumber(fNextTicketNumber++);
job->AddStateListener(this);
+ if (job->IsRunnable())
+ release_sem(fHaveRunnableJobSem);
}
return B_OK;
@@ -138,34 +146,53 @@ JobQueue::JobFailed(BJob* job)
BJob*
JobQueue::Pop()
{
+ BJob* job;
+ if (Pop(B_INFINITE_TIMEOUT, true, &job) == B_OK)
+ return job;
+
+ return NULL;
+}
+
+
+status_t
+JobQueue::Pop(bigtime_t timeout, bool returnWhenEmpty, BJob** _job)
+{
BAutolock lock(&fLock);
if (lock.IsLocked()) {
- JobPriorityQueue::iterator head = fQueuedJobs->begin();
- if (head == fQueuedJobs->end())
- return NULL;
- while (!(*head)->IsRunnable()) {
- // we need to wait until a job becomes runnable
+ while (true) {
+ JobPriorityQueue::iterator head = fQueuedJobs->begin();
+ if (head != fQueuedJobs->end()) {
+ if ((*head)->IsRunnable()) {
+ *_job = *head;
+ fQueuedJobs->erase(head);
+ return B_OK;
+ }
+ } else if (returnWhenEmpty)
+ return B_ENTRY_NOT_FOUND;
+
+ // we need to wait until a job becomes
available/runnable
status_t result;
do {
lock.Unlock();
- result = acquire_sem(fHaveRunnableJobSem);
+ result = acquire_sem_etc(fHaveRunnableJobSem, 1,
+ B_RELATIVE_TIMEOUT, timeout);
if (!lock.Lock())
- return NULL;
+ return B_ERROR;
} while (result == B_INTERRUPTED);
if (result != B_OK)
- return NULL;
-
- // fetch current head, it must be runnable now
- head = fQueuedJobs->begin();
- if (head == fQueuedJobs->end())
- return NULL;
+ return result;
}
- BJob* job = *head;
- fQueuedJobs->erase(head);
- return job;
}
- return NULL;
+ return B_ERROR;
+}
+
+
+size_t
+JobQueue::CountJobs() const
+{
+ BAutolock locker(fLock);
+ return fQueuedJobs->size();
}
@@ -222,6 +249,8 @@ JobQueue::_RequeueDependantJobsOf(BJob* job)
dependantJob->RemoveDependency(job);
try {
fQueuedJobs->insert(dependantJob);
+ if (dependantJob->IsRunnable())
+ release_sem(fHaveRunnableJobSem);
} catch (...) {
}
}
############################################################################
Commit: bb37de24ab6377e4be019c5110d8036947205de8
Author: Axel Dörfler <axeld@xxxxxxxxxxxxxxxx>
Date: Sun May 31 17:07:43 2015 UTC
launch_daemon: use BJobs for everything, multi-threaded worker.
* Implemented a multi-threaded job queue, that launches as many as CPU
count jobs at once -- due to SSDs, this number could possibly even be
made much larger, though.
* Launching a job now happens within that worker queues as a LaunchJob.
* Put all init jobs into an init target, and make the launch jobs depend
on it.
----------------------------------------------------------------------------
diff --git a/src/servers/launch/LaunchDaemon.cpp
b/src/servers/launch/LaunchDaemon.cpp
index 963beaf..7979f56 100644
--- a/src/servers/launch/LaunchDaemon.cpp
+++ b/src/servers/launch/LaunchDaemon.cpp
@@ -112,6 +112,59 @@ private:
};
+class LaunchJob : public BJob {
+public:
+ LaunchJob(Job*
job);
+
+protected:
+ virtual status_t Execute();
+
+private:
+ Job* fJob;
+};
+
+
+class Target : public BJob {
+public:
+ Target(const
char* name);
+
+protected:
+ virtual status_t Execute();
+};
+
+
+class Worker {
+public:
+
Worker(JobQueue& queue);
+ virtual ~Worker();
+
+protected:
+ virtual status_t Process();
+ virtual bigtime_t Timeout() const;
+ virtual status_t Run(BJob* job);
+
+private:
+ static status_t _Process(void* self);
+
+protected:
+ thread_id fThread;
+ JobQueue& fJobQueue;
+};
+
+
+class MainWorker : public Worker {
+public:
+
MainWorker(JobQueue& queue);
+
+protected:
+ virtual bigtime_t Timeout() const;
+ virtual status_t Run(BJob* job);
+
+private:
+ int32 fCPUCount;
+};
+
+
typedef std::map<BString, Job*> JobMap;
@@ -134,20 +187,30 @@ private:
Job* _Job(const char* name);
void _InitJobs();
void _LaunchJobs();
+ void _AddLaunchJob(Job* job);
void
_RetrieveKernelOptions();
void _SetupEnvironment();
void _InitSystem();
+ void _AddInitJob(BJob* job);
bool _IsSafeMode() const;
private:
JobMap fJobs;
JobQueue fJobQueue;
+ MainWorker* fMainWorker;
+ Target* fInitTarget;
bool fSafeMode;
};
+static const bigtime_t kWorkerTimeout = 1000000;
+ // One second until a worker thread quits without a job
+
+static int32 sWorkerCount;
+
+
static const char*
get_leaf(const char* signature)
{
@@ -406,12 +469,148 @@ Job::IsLaunched() const
// #pragma mark -
+LaunchJob::LaunchJob(Job* job)
+ :
+ BJob(job->Name()),
+ fJob(job)
+{
+}
+
+
+status_t
+LaunchJob::Execute()
+{
+ if (!fJob->IsLaunched())
+ return fJob->Launch();
+
+ return B_OK;
+}
+
+
+// #pragma mark -
+
+
+Target::Target(const char* name)
+ :
+ BJob(name)
+{
+}
+
+
+status_t
+Target::Execute()
+{
+ return B_OK;
+}
+
+
+// #pragma mark -
+
+
+Worker::Worker(JobQueue& queue)
+ :
+ fJobQueue(queue)
+{
+ fThread = spawn_thread(&Worker::_Process, "worker", B_NORMAL_PRIORITY,
+ this);
+ if (fThread >= 0 && resume_thread(fThread) == B_OK)
+ atomic_add(&sWorkerCount, 1);
+}
+
+
+Worker::~Worker()
+{
+}
+
+
+status_t
+Worker::Process()
+{
+ while (true) {
+ BJob* job;
+ status_t status = fJobQueue.Pop(Timeout(), false, &job);
+ if (status != B_OK)
+ return status;
+
+ Run(job);
+ // TODO: proper error reporting on failed job!
+ }
+}
+
+
+bigtime_t
+Worker::Timeout() const
+{
+ return kWorkerTimeout;
+}
+
+
+status_t
+Worker::Run(BJob* job)
+{
+ return job->Run();
+}
+
+
+/*static*/ status_t
+Worker::_Process(void* _self)
+{
+ Worker* self = (Worker*)_self;
+ status_t status = self->Process();
+ delete self;
+
+ return status;
+}
+
+
+// #pragma mark -
+
+
+MainWorker::MainWorker(JobQueue& queue)
+ :
+ Worker(queue)
+{
+ // TODO: keep track of workers, and quit them on destruction
+ system_info info;
+ if (get_system_info(&info) == B_OK)
+ fCPUCount = info.cpu_count;
+}
+
+
+bigtime_t
+MainWorker::Timeout() const
+{
+ return B_INFINITE_TIMEOUT;
+}
+
+
+status_t
+MainWorker::Run(BJob* job)
+{
+ int32 count = atomic_get(&sWorkerCount);
+
+ size_t jobCount = fJobQueue.CountJobs();
+ if (jobCount > INT_MAX)
+ jobCount = INT_MAX;
+
+ if ((int32)jobCount > count && count < fCPUCount)
+ new Worker(fJobQueue);
+
+ return Worker::Run(job);
+}
+
+
+// #pragma mark -
+
+
LaunchDaemon::LaunchDaemon(status_t& error)
:
BServer(kLaunchDaemonSignature, NULL,
create_port(B_LOOPER_PORT_DEFAULT_CAPACITY,
- B_LAUNCH_DAEMON_PORT_NAME), false, &error)
+ B_LAUNCH_DAEMON_PORT_NAME), false, &error),
+ fInitTarget(new Target("init"))
{
+ fMainWorker = new MainWorker(fJobQueue);
}
@@ -469,9 +668,7 @@ LaunchDaemon::MessageReceived(BMessage* message)
iterator->second.GetInt32("port", -1));
}
- // Launch job now if it isn't running yet
- if (!job->IsLaunched())
- job->Launch();
+ _AddLaunchJob(job);
}
message->SendReply(&reply);
break;
@@ -618,12 +815,28 @@ LaunchDaemon::_LaunchJobs()
iterator++) {
Job* job = iterator->second;
if (job->IsEnabled() && job->InitCheck() == B_OK)
- job->Launch();
+ _AddLaunchJob(job);
}
}
void
+LaunchDaemon::_AddLaunchJob(Job* job)
+{
+ if (job->IsLaunched())
+ return;
+
+ LaunchJob* launchJob = new LaunchJob(job);
+
+ // All jobs depend on the init target
+ if (fInitTarget->State() < B_JOB_STATE_SUCCEEDED)
+ launchJob->AddDependency(fInitTarget);
+
+ fJobQueue.AddJob(launchJob);
+}
+
+
+void
LaunchDaemon::_RetrieveKernelOptions()
{
char buffer[32];
@@ -656,13 +869,19 @@ LaunchDaemon::_SetupEnvironment()
void
LaunchDaemon::_InitSystem()
{
- fJobQueue.AddJob(new InitRealTimeClockJob());
- fJobQueue.AddJob(new InitSharedMemoryDirectoryJob());
- fJobQueue.AddJob(new InitTemporaryDirectoryJob());
+ _AddInitJob(new InitRealTimeClockJob());
+ _AddInitJob(new InitSharedMemoryDirectoryJob());
+ _AddInitJob(new InitTemporaryDirectoryJob());
+
+ fJobQueue.AddJob(fInitTarget);
+}
- // TODO: these should be done in parallel
- while (BJob* job = fJobQueue.Pop())
- job->Run();
+
+void
+LaunchDaemon::_AddInitJob(BJob* job)
+{
+ fInitTarget->AddDependency(job);
+ fJobQueue.AddJob(job);
}