[haiku-commits] BRANCH axeld-github.launch_daemon [bb37de24ab63] in src: servers/launch kits/support

  • From: axeld-github.launch_daemon <community@xxxxxxxxxxxx>
  • To: haiku-commits@xxxxxxxxxxxxx
  • Date: Sun, 31 May 2015 19:16:46 +0200 (CEST)

added 2 changesets to branch 'refs/remotes/axeld-github/launch_daemon'
old head: 8a604e82d785f991629f4b99c8f62205a24ccea2
new head: bb37de24ab6377e4be019c5110d8036947205de8
overview: https://github.com/axeld/haiku/compare/8a604e82d785...bb37de24ab63

----------------------------------------------------------------------------

02a405f8674e: JobQueue: fixed leak, notification, added Pop() variant.

* Was leaking fQueuedJobs on destruction.
* fHaveRunnableJobSem implementation was not completed; it was never
released.
* Added Pop() variant that is a bit more flexible, and allows for a
timeout as well as waiting even when the queue is empty, and can
return a status code.

bb37de24ab63: launch_daemon: use BJobs for everything, multi-threaded worker.

* Implemented a multi-threaded job queue, that launches as many as CPU
count jobs at once -- due to SSDs, this number could possibly even be
made much larger, though.
* Launching a job now happens within that worker queues as a LaunchJob.
* Put all init jobs into an init target, and make the launch jobs depend
on it.

[ Axel Dörfler <axeld@xxxxxxxxxxxxxxxx> ]

----------------------------------------------------------------------------

3 files changed, 281 insertions(+), 29 deletions(-)
headers/private/support/JobQueue.h | 6 +-
src/kits/support/JobQueue.cpp | 63 ++++++---
src/servers/launch/LaunchDaemon.cpp | 241 ++++++++++++++++++++++++++++++--

############################################################################

Commit: 02a405f8674e957ec607d69613834a53456c7a24
Author: Axel Dörfler <axeld@xxxxxxxxxxxxxxxx>
Date: Sun May 31 17:03:34 2015 UTC

JobQueue: fixed leak, notification, added Pop() variant.

* Was leaking fQueuedJobs on destruction.
* fHaveRunnableJobSem implementation was not completed; it was never
released.
* Added Pop() variant that is a bit more flexible, and allows for a
timeout as well as waiting even when the queue is empty, and can
return a status code.

----------------------------------------------------------------------------

diff --git a/headers/private/support/JobQueue.h
b/headers/private/support/JobQueue.h
index 6a5efbf..468877a 100644
--- a/headers/private/support/JobQueue.h
+++ b/headers/private/support/JobQueue.h
@@ -30,8 +30,12 @@ public:
//
gives up ownership

BJob* Pop();
+ status_t Pop(bigtime_t timeout,
bool returnWhenEmpty,
+ BJob**
_job);
//
caller owns job

+ size_t CountJobs() const;
+
void Close();

private:
@@ -49,7 +53,7 @@ private:
void
_RequeueDependantJobsOf(BJob* job);
void
_RemoveDependantJobsOf(BJob* job);

- BLocker fLock;
+ mutable BLocker fLock;
uint32 fNextTicketNumber;
JobPriorityQueue* fQueuedJobs;
sem_id fHaveRunnableJobSem;
diff --git a/src/kits/support/JobQueue.cpp b/src/kits/support/JobQueue.cpp
index 04b4624..0246c75 100644
--- a/src/kits/support/JobQueue.cpp
+++ b/src/kits/support/JobQueue.cpp
@@ -3,6 +3,7 @@
* Distributed under the terms of the MIT License.
*
* Authors:
+ * Axel Dörfler <axeld@xxxxxxxxxxxxxxxx>
* Oliver Tappe <zooey@xxxxxxxxxxxxxxx>
*/

@@ -50,6 +51,9 @@ class JobQueue::JobPriorityQueue
};


+// #pragma mark -
+
+
JobQueue::JobQueue()
:
fLock("job queue"),
@@ -61,6 +65,8 @@ JobQueue::JobQueue()

JobQueue::~JobQueue()
{
+ Close();
+ delete fQueuedJobs;
}


@@ -89,6 +95,8 @@ JobQueue::AddJob(BJob* job)
}
BJob::Private(*job).SetTicketNumber(fNextTicketNumber++);
job->AddStateListener(this);
+ if (job->IsRunnable())
+ release_sem(fHaveRunnableJobSem);
}

return B_OK;
@@ -138,34 +146,53 @@ JobQueue::JobFailed(BJob* job)
BJob*
JobQueue::Pop()
{
+ BJob* job;
+ if (Pop(B_INFINITE_TIMEOUT, true, &job) == B_OK)
+ return job;
+
+ return NULL;
+}
+
+
+status_t
+JobQueue::Pop(bigtime_t timeout, bool returnWhenEmpty, BJob** _job)
+{
BAutolock lock(&fLock);
if (lock.IsLocked()) {
- JobPriorityQueue::iterator head = fQueuedJobs->begin();
- if (head == fQueuedJobs->end())
- return NULL;
- while (!(*head)->IsRunnable()) {
- // we need to wait until a job becomes runnable
+ while (true) {
+ JobPriorityQueue::iterator head = fQueuedJobs->begin();
+ if (head != fQueuedJobs->end()) {
+ if ((*head)->IsRunnable()) {
+ *_job = *head;
+ fQueuedJobs->erase(head);
+ return B_OK;
+ }
+ } else if (returnWhenEmpty)
+ return B_ENTRY_NOT_FOUND;
+
+ // we need to wait until a job becomes
available/runnable
status_t result;
do {
lock.Unlock();
- result = acquire_sem(fHaveRunnableJobSem);
+ result = acquire_sem_etc(fHaveRunnableJobSem, 1,
+ B_RELATIVE_TIMEOUT, timeout);
if (!lock.Lock())
- return NULL;
+ return B_ERROR;
} while (result == B_INTERRUPTED);
if (result != B_OK)
- return NULL;
-
- // fetch current head, it must be runnable now
- head = fQueuedJobs->begin();
- if (head == fQueuedJobs->end())
- return NULL;
+ return result;
}
- BJob* job = *head;
- fQueuedJobs->erase(head);
- return job;
}

- return NULL;
+ return B_ERROR;
+}
+
+
+size_t
+JobQueue::CountJobs() const
+{
+ BAutolock locker(fLock);
+ return fQueuedJobs->size();
}


@@ -222,6 +249,8 @@ JobQueue::_RequeueDependantJobsOf(BJob* job)
dependantJob->RemoveDependency(job);
try {
fQueuedJobs->insert(dependantJob);
+ if (dependantJob->IsRunnable())
+ release_sem(fHaveRunnableJobSem);
} catch (...) {
}
}

############################################################################

Commit: bb37de24ab6377e4be019c5110d8036947205de8
Author: Axel Dörfler <axeld@xxxxxxxxxxxxxxxx>
Date: Sun May 31 17:07:43 2015 UTC

launch_daemon: use BJobs for everything, multi-threaded worker.

* Implemented a multi-threaded job queue, that launches as many as CPU
count jobs at once -- due to SSDs, this number could possibly even be
made much larger, though.
* Launching a job now happens within that worker queues as a LaunchJob.
* Put all init jobs into an init target, and make the launch jobs depend
on it.

----------------------------------------------------------------------------

diff --git a/src/servers/launch/LaunchDaemon.cpp
b/src/servers/launch/LaunchDaemon.cpp
index 963beaf..7979f56 100644
--- a/src/servers/launch/LaunchDaemon.cpp
+++ b/src/servers/launch/LaunchDaemon.cpp
@@ -112,6 +112,59 @@ private:
};


+class LaunchJob : public BJob {
+public:
+ LaunchJob(Job*
job);
+
+protected:
+ virtual status_t Execute();
+
+private:
+ Job* fJob;
+};
+
+
+class Target : public BJob {
+public:
+ Target(const
char* name);
+
+protected:
+ virtual status_t Execute();
+};
+
+
+class Worker {
+public:
+
Worker(JobQueue& queue);
+ virtual ~Worker();
+
+protected:
+ virtual status_t Process();
+ virtual bigtime_t Timeout() const;
+ virtual status_t Run(BJob* job);
+
+private:
+ static status_t _Process(void* self);
+
+protected:
+ thread_id fThread;
+ JobQueue& fJobQueue;
+};
+
+
+class MainWorker : public Worker {
+public:
+
MainWorker(JobQueue& queue);
+
+protected:
+ virtual bigtime_t Timeout() const;
+ virtual status_t Run(BJob* job);
+
+private:
+ int32 fCPUCount;
+};
+
+
typedef std::map<BString, Job*> JobMap;


@@ -134,20 +187,30 @@ private:
Job* _Job(const char* name);
void _InitJobs();
void _LaunchJobs();
+ void _AddLaunchJob(Job* job);

void
_RetrieveKernelOptions();
void _SetupEnvironment();
void _InitSystem();
+ void _AddInitJob(BJob* job);

bool _IsSafeMode() const;

private:
JobMap fJobs;
JobQueue fJobQueue;
+ MainWorker* fMainWorker;
+ Target* fInitTarget;
bool fSafeMode;
};


+static const bigtime_t kWorkerTimeout = 1000000;
+ // One second until a worker thread quits without a job
+
+static int32 sWorkerCount;
+
+
static const char*
get_leaf(const char* signature)
{
@@ -406,12 +469,148 @@ Job::IsLaunched() const
// #pragma mark -


+LaunchJob::LaunchJob(Job* job)
+ :
+ BJob(job->Name()),
+ fJob(job)
+{
+}
+
+
+status_t
+LaunchJob::Execute()
+{
+ if (!fJob->IsLaunched())
+ return fJob->Launch();
+
+ return B_OK;
+}
+
+
+// #pragma mark -
+
+
+Target::Target(const char* name)
+ :
+ BJob(name)
+{
+}
+
+
+status_t
+Target::Execute()
+{
+ return B_OK;
+}
+
+
+// #pragma mark -
+
+
+Worker::Worker(JobQueue& queue)
+ :
+ fJobQueue(queue)
+{
+ fThread = spawn_thread(&Worker::_Process, "worker", B_NORMAL_PRIORITY,
+ this);
+ if (fThread >= 0 && resume_thread(fThread) == B_OK)
+ atomic_add(&sWorkerCount, 1);
+}
+
+
+Worker::~Worker()
+{
+}
+
+
+status_t
+Worker::Process()
+{
+ while (true) {
+ BJob* job;
+ status_t status = fJobQueue.Pop(Timeout(), false, &job);
+ if (status != B_OK)
+ return status;
+
+ Run(job);
+ // TODO: proper error reporting on failed job!
+ }
+}
+
+
+bigtime_t
+Worker::Timeout() const
+{
+ return kWorkerTimeout;
+}
+
+
+status_t
+Worker::Run(BJob* job)
+{
+ return job->Run();
+}
+
+
+/*static*/ status_t
+Worker::_Process(void* _self)
+{
+ Worker* self = (Worker*)_self;
+ status_t status = self->Process();
+ delete self;
+
+ return status;
+}
+
+
+// #pragma mark -
+
+
+MainWorker::MainWorker(JobQueue& queue)
+ :
+ Worker(queue)
+{
+ // TODO: keep track of workers, and quit them on destruction
+ system_info info;
+ if (get_system_info(&info) == B_OK)
+ fCPUCount = info.cpu_count;
+}
+
+
+bigtime_t
+MainWorker::Timeout() const
+{
+ return B_INFINITE_TIMEOUT;
+}
+
+
+status_t
+MainWorker::Run(BJob* job)
+{
+ int32 count = atomic_get(&sWorkerCount);
+
+ size_t jobCount = fJobQueue.CountJobs();
+ if (jobCount > INT_MAX)
+ jobCount = INT_MAX;
+
+ if ((int32)jobCount > count && count < fCPUCount)
+ new Worker(fJobQueue);
+
+ return Worker::Run(job);
+}
+
+
+// #pragma mark -
+
+
LaunchDaemon::LaunchDaemon(status_t& error)
:
BServer(kLaunchDaemonSignature, NULL,
create_port(B_LOOPER_PORT_DEFAULT_CAPACITY,
- B_LAUNCH_DAEMON_PORT_NAME), false, &error)
+ B_LAUNCH_DAEMON_PORT_NAME), false, &error),
+ fInitTarget(new Target("init"))
{
+ fMainWorker = new MainWorker(fJobQueue);
}


@@ -469,9 +668,7 @@ LaunchDaemon::MessageReceived(BMessage* message)

iterator->second.GetInt32("port", -1));
}

- // Launch job now if it isn't running yet
- if (!job->IsLaunched())
- job->Launch();
+ _AddLaunchJob(job);
}
message->SendReply(&reply);
break;
@@ -618,12 +815,28 @@ LaunchDaemon::_LaunchJobs()
iterator++) {
Job* job = iterator->second;
if (job->IsEnabled() && job->InitCheck() == B_OK)
- job->Launch();
+ _AddLaunchJob(job);
}
}


void
+LaunchDaemon::_AddLaunchJob(Job* job)
+{
+ if (job->IsLaunched())
+ return;
+
+ LaunchJob* launchJob = new LaunchJob(job);
+
+ // All jobs depend on the init target
+ if (fInitTarget->State() < B_JOB_STATE_SUCCEEDED)
+ launchJob->AddDependency(fInitTarget);
+
+ fJobQueue.AddJob(launchJob);
+}
+
+
+void
LaunchDaemon::_RetrieveKernelOptions()
{
char buffer[32];
@@ -656,13 +869,19 @@ LaunchDaemon::_SetupEnvironment()
void
LaunchDaemon::_InitSystem()
{
- fJobQueue.AddJob(new InitRealTimeClockJob());
- fJobQueue.AddJob(new InitSharedMemoryDirectoryJob());
- fJobQueue.AddJob(new InitTemporaryDirectoryJob());
+ _AddInitJob(new InitRealTimeClockJob());
+ _AddInitJob(new InitSharedMemoryDirectoryJob());
+ _AddInitJob(new InitTemporaryDirectoryJob());
+
+ fJobQueue.AddJob(fInitTarget);
+}

- // TODO: these should be done in parallel
- while (BJob* job = fJobQueue.Pop())
- job->Run();
+
+void
+LaunchDaemon::_AddInitJob(BJob* job)
+{
+ fInitTarget->AddDependency(job);
+ fJobQueue.AddJob(job);
}




Other related posts:

  • » [haiku-commits] BRANCH axeld-github.launch_daemon [bb37de24ab63] in src: servers/launch kits/support - axeld-github . launch_daemon