added 2 changesets to branch 'refs/remotes/HaikuPM-github/package-management' old head: 6978941aac85ea329a8552253f384924dcccd1b3 new head: 8fb3930a4290eb1156bbe53fba7947b0cd2be237 overview: https://github.com/haiku/HaikuPM/compare/6978941...8fb3930 ---------------------------------------------------------------------------- 7136419: Add shared PthreadMutexLocker, an AutoLocker for pthread_mutex_t 8fb3930: package daemon: Make all work with the packages asynchronous There's now a worker thread per Root that does all the work that can take time. Node monitoring notifications received in the main thread are just pushed into the worker's job queue, so the application looper remains responsive. [ Ingo Weinhold <ingo_weinhold@xxxxxx> ] ---------------------------------------------------------------------------- 11 files changed, 537 insertions(+), 37 deletions(-) headers/private/shared/PthreadMutexLocker.h | 42 +++++++ src/servers/package/Jamfile | 2 + src/servers/package/Job.cpp | 20 ++++ src/servers/package/Job.h | 25 +++++ src/servers/package/JobQueue.cpp | 99 ++++++++++++++++ src/servers/package/JobQueue.h | 43 +++++++ src/servers/package/PackageDaemon.cpp | 22 +--- src/servers/package/Root.cpp | 143 +++++++++++++++++++++++- src/servers/package/Root.h | 19 ++++ src/servers/package/Volume.cpp | 142 ++++++++++++++++++++--- src/servers/package/Volume.h | 17 ++- ############################################################################ Commit: 71364193839bcbefb2df5223133f901e25c3a99f Author: Ingo Weinhold <ingo_weinhold@xxxxxx> Date: Sun Apr 7 21:59:43 2013 UTC Add shared PthreadMutexLocker, an AutoLocker for pthread_mutex_t ---------------------------------------------------------------------------- diff --git a/headers/private/shared/PthreadMutexLocker.h b/headers/private/shared/PthreadMutexLocker.h new file mode 100644 index 0000000..db0e695 --- /dev/null +++ b/headers/private/shared/PthreadMutexLocker.h @@ -0,0 +1,42 @@ +/* + * Copyright 2013, Haiku, Inc. All Rights Reserved. + * Distributed under the terms of the MIT License. + * + * Authors: + * Ingo Weinhold <ingo_weinhold@xxxxxx> + */ +#ifndef _PTHREAD_MUTEX_LOCKER_H +#define _PTHREAD_MUTEX_LOCKER_H + + +#include <pthread.h> + +#include <AutoLocker.h> + + +namespace BPrivate { + + +class AutoLockerMutexLocking { +public: + inline bool Lock(pthread_mutex_t* lockable) + { + return pthread_mutex_lock(lockable) == 0; + } + + inline void Unlock(pthread_mutex_t* lockable) + { + pthread_mutex_unlock(lockable); + } +}; + + +typedef AutoLocker<pthread_mutex_t, AutoLockerMutexLocking> PthreadMutexLocker; + + +} // namespace BPrivate + +using BPrivate::PthreadMutexLocker; + + +#endif // _PTHREAD_MUTEX_LOCKER_H ############################################################################ Commit: 8fb3930a4290eb1156bbe53fba7947b0cd2be237 Author: Ingo Weinhold <ingo_weinhold@xxxxxx> Date: Sun Apr 7 22:05:51 2013 UTC package daemon: Make all work with the packages asynchronous There's now a worker thread per Root that does all the work that can take time. Node monitoring notifications received in the main thread are just pushed into the worker's job queue, so the application looper remains responsive. ---------------------------------------------------------------------------- diff --git a/src/servers/package/Jamfile b/src/servers/package/Jamfile index bc6bc7d..0542589 100644 --- a/src/servers/package/Jamfile +++ b/src/servers/package/Jamfile @@ -6,6 +6,8 @@ UsePrivateHeaders app kernel package shared ; Server package_daemon : DebugSupport.cpp + Job.cpp + JobQueue.cpp Package.cpp PackageDaemon.cpp Root.cpp diff --git a/src/servers/package/Job.cpp b/src/servers/package/Job.cpp new file mode 100644 index 0000000..092d1d7 --- /dev/null +++ b/src/servers/package/Job.cpp @@ -0,0 +1,20 @@ +/* + * Copyright 2013, Haiku, Inc. All Rights Reserved. + * Distributed under the terms of the MIT License. + * + * Authors: + * Ingo Weinhold <ingo_weinhold@xxxxxx> + */ + + +#include "Job.h" + + +Job::Job() +{ +} + + +Job::~Job() +{ +} diff --git a/src/servers/package/Job.h b/src/servers/package/Job.h new file mode 100644 index 0000000..a7e91c7 --- /dev/null +++ b/src/servers/package/Job.h @@ -0,0 +1,25 @@ +/* + * Copyright 2013, Haiku, Inc. All Rights Reserved. + * Distributed under the terms of the MIT License. + * + * Authors: + * Ingo Weinhold <ingo_weinhold@xxxxxx> + */ +#ifndef JOB_H +#define JOB_H + + +#include <Referenceable.h> +#include <util/DoublyLinkedList.h> + + +class Job : public BReferenceable, public DoublyLinkedListLinkImpl<Job> { +public: + Job(); + virtual ~Job(); + + virtual void Do() = 0; +}; + + +#endif // JOB_H diff --git a/src/servers/package/JobQueue.cpp b/src/servers/package/JobQueue.cpp new file mode 100644 index 0000000..8572a08 --- /dev/null +++ b/src/servers/package/JobQueue.cpp @@ -0,0 +1,99 @@ +/* + * Copyright 2013, Haiku, Inc. All Rights Reserved. + * Distributed under the terms of the MIT License. + * + * Authors: + * Ingo Weinhold <ingo_weinhold@xxxxxx> + */ + + +#include "JobQueue.h" + +#include <PthreadMutexLocker.h> + + +JobQueue::JobQueue() + : + fMutexInitialized(false), + fNewJobConditionInitialized(false), + fJobs(), + fClosed(false) +{ +} + + +JobQueue::~JobQueue() +{ + if (fMutexInitialized) { + PthreadMutexLocker mutexLocker(fMutex); + while (Job* job = fJobs.RemoveHead()) + job->ReleaseReference(); + } + + if (fNewJobConditionInitialized) + pthread_cond_destroy(&fNewJobCondition); + + if (fMutexInitialized) + pthread_mutex_destroy(&fMutex); +} + + +status_t +JobQueue::Init() +{ + status_t error = pthread_mutex_init(&fMutex, NULL); + if (error != B_OK) + return error; + fMutexInitialized = true; + + error = pthread_cond_init(&fNewJobCondition, NULL); + if (error != B_OK) + return error; + fNewJobConditionInitialized = true; + + return B_OK; +} + + +void +JobQueue::Close() +{ + if (fMutexInitialized && fNewJobConditionInitialized) { + PthreadMutexLocker mutexLocker(fMutex); + fClosed = true; + pthread_cond_broadcast(&fNewJobCondition); + } +} + + +bool +JobQueue::QueueJob(Job* job) +{ + PthreadMutexLocker mutexLocker(fMutex); + if (fClosed) + return false; + + fJobs.Add(job); + job->AcquireReference(); + + pthread_cond_signal(&fNewJobCondition); + return true; +} + + +Job* +JobQueue::DequeueJob() +{ + PthreadMutexLocker mutexLocker(fMutex); + + while (!fClosed) { + Job* job = fJobs.RemoveHead(); + if (job != NULL) + return job; + + if (!fClosed) + pthread_cond_wait(&fNewJobCondition, &fMutex); + } + + return NULL; +} diff --git a/src/servers/package/JobQueue.h b/src/servers/package/JobQueue.h new file mode 100644 index 0000000..4d6d93e --- /dev/null +++ b/src/servers/package/JobQueue.h @@ -0,0 +1,43 @@ +/* + * Copyright 2013, Haiku, Inc. All Rights Reserved. + * Distributed under the terms of the MIT License. + * + * Authors: + * Ingo Weinhold <ingo_weinhold@xxxxxx> + */ +#ifndef JOB_QUEUE_H +#define JOB_QUEUE_H + + +#include <pthread.h> + +#include "Job.h" + + +class JobQueue { +public: + JobQueue(); + ~JobQueue(); + + status_t Init(); + void Close(); + + bool QueueJob(Job* job); + // acquires a reference, if successful + Job* DequeueJob(); + // returns a reference + +private: + typedef DoublyLinkedList<Job> JobList; + +private: + pthread_mutex_t fMutex; + pthread_cond_t fNewJobCondition; + bool fMutexInitialized; + bool fNewJobConditionInitialized; + JobList fJobs; + bool fClosed; +}; + + +#endif // JOB_QUEUE_H diff --git a/src/servers/package/PackageDaemon.cpp b/src/servers/package/PackageDaemon.cpp index 8229e48..4380102 100644 --- a/src/servers/package/PackageDaemon.cpp +++ b/src/servers/package/PackageDaemon.cpp @@ -96,7 +96,7 @@ PackageDaemon::_RegisterVolume(dev_t deviceID) RETURN_ERROR(B_BAD_VALUE); // create a volume - Volume* volume = new(std::nothrow) Volume; + Volume* volume = new(std::nothrow) Volume(this); if (volume == NULL) RETURN_ERROR(B_NO_MEMORY); ObjectDeleter<Volume> volumeDeleter(volume); @@ -126,19 +126,6 @@ PackageDaemon::_RegisterVolume(dev_t deviceID) } volumeDeleter.Detach(); - AddHandler(volume); - - // node-monitor the volume's packages directory - error = watch_node(&volume->PackagesDirectoryRef(), B_WATCH_DIRECTORY, - BMessenger(volume, this)); - if (error != B_OK) { - ERROR("PackageDaemon::_RegisterVolume(): failed to start watching the " - "packages directory of the volume at \"%s\": %s\n", - volume->Path().String(), strerror(error)); - // Not good, but not fatal. Only the manual package operations in the - // packages directory won't work correctly. - } - INFORM("volume at \"%s\" registered\n", volume->Path().String()); return B_OK; @@ -148,16 +135,13 @@ PackageDaemon::_RegisterVolume(dev_t deviceID) void PackageDaemon::_UnregisterVolume(Volume* volume) { - stop_watching(BMessenger(volume, this)); + volume->Unmounted(); - RemoveHandler(volume); + INFORM("volume at \"%s\" unregistered\n", volume->Path().String()); Root* root = volume->GetRoot(); root->UnregisterVolume(volume); - INFORM("volume at \"%s\" unregistered\n", volume->Path().String()); - - delete volume; _PutRoot(root); } diff --git a/src/servers/package/Root.cpp b/src/servers/package/Root.cpp index d72a504..6553f4b 100644 --- a/src/servers/package/Root.cpp +++ b/src/servers/package/Root.cpp @@ -17,19 +17,88 @@ #include "Volume.h" +// #pragma mark - InitVolumePackagesJob + + +struct Root::InitPackagesJob : public Job { + InitPackagesJob(Volume* volume) + : + fVolume(volume) + { + } + + virtual void Do() + { + fVolume->InitPackages(); + } + +private: + Volume* fVolume; +}; + + +// #pragma mark - DeleteVolumeJob + + +struct Root::DeleteVolumeJob : public Job { + DeleteVolumeJob(Volume* volume) + : + fVolume(volume) + { + } + + virtual void Do() + { + delete fVolume; + } + +private: + Volume* fVolume; +}; + + +// #pragma mark - HandleNodeMonitorEventsJob + + +struct Root::HandleNodeMonitorEventsJob : public Job { + HandleNodeMonitorEventsJob(Volume* volume) + : + fVolume(volume) + { + } + + virtual void Do() + { + fVolume->ProcessPendingNodeMonitorEvents(); + } + +private: + Volume* fVolume; +}; + + +// #pragma mark - Root + + Root::Root() : fNodeRef(), fPath(), fSystemVolume(NULL), fCommonVolume(NULL), - fHomeVolume(NULL) + fHomeVolume(NULL), + fJobQueue(), + fJobRunner(-1) { } Root::~Root() { + fJobQueue.Close(); + + if (fJobRunner >= 0) + wait_for_thread(fJobRunner, NULL); } @@ -38,12 +107,22 @@ Root::Init(const node_ref& nodeRef) { fNodeRef = nodeRef; + // init job queue and spawn job runner thread + status_t error = fJobQueue.Init(); + if (error != B_OK) + RETURN_ERROR(error); + + fJobRunner = spawn_thread(&_JobRunnerEntry, "job runner", B_NORMAL_PRIORITY, + this); + if (fJobRunner < 0) + RETURN_ERROR(fJobRunner); + // get the path BDirectory directory; - status_t error = directory.SetTo(&fNodeRef); + error = directory.SetTo(&fNodeRef); if (error != B_OK) { ERROR("Root::Init(): failed to open directory: %s\n", strerror(error)); - return error; + RETURN_ERROR(error); } BEntry entry; @@ -63,6 +142,8 @@ Root::Init(const node_ref& nodeRef) if (fPath.IsEmpty()) RETURN_ERROR(B_NO_MEMORY); + resume_thread(fJobRunner); + return B_OK; } @@ -84,6 +165,14 @@ Root::RegisterVolume(Volume* volume) *volumeToSet = volume; volume->SetRoot(this); + // queue a job for reading the volume's packages + status_t error = _QueueJob(new(std::nothrow) InitPackagesJob(volume)); + if (error != B_OK) { + volume->SetRoot(NULL); + *volumeToSet = NULL; + return error; + } + return B_OK; } @@ -99,7 +188,10 @@ Root::UnregisterVolume(Volume* volume) } *volumeToSet = NULL; - volume->SetRoot(NULL); + + // Use the job queue to delete the volume to make sure there aren't any + // pending jobs that reference the volume. + _QueueJob(new(std::nothrow) DeleteVolumeJob(volume)); } @@ -118,6 +210,14 @@ Root::FindVolume(dev_t deviceID) const void +Root::HandleNodeMonitorEvents(Volume* volume) +{ +// TODO: Don't push a new one, if one is already pending! + _QueueJob(new(std::nothrow) HandleNodeMonitorEventsJob(volume)); +} + + +void Root::LastReferenceReleased() { } @@ -138,3 +238,38 @@ Root::_GetVolume(PackageFSMountType mountType) return NULL; } } + + +status_t +Root::_QueueJob(Job* job) +{ + if (job == NULL) + return B_NO_MEMORY; + + BReference<Job> jobReference(job, true); + if (!fJobQueue.QueueJob(job)) { + // job queue already closed + return B_BAD_VALUE; + } + + return B_OK; +} + + +/*static*/ status_t +Root::_JobRunnerEntry(void* data) +{ + return ((Root*)data)->_JobRunner(); +} + + +status_t +Root::_JobRunner() +{ + while (Job* job = fJobQueue.DequeueJob()) { + job->Do(); + job->ReleaseReference(); + } + + return B_OK; +} diff --git a/src/servers/package/Root.h b/src/servers/package/Root.h index 8afa89c..288f508 100644 --- a/src/servers/package/Root.h +++ b/src/servers/package/Root.h @@ -10,12 +10,16 @@ #include <Node.h> +#include <ObjectList.h> +#include <OS.h> #include <String.h> #include <Referenceable.h> #include <packagefs.h> +#include "JobQueue.h" + class Volume; @@ -34,21 +38,36 @@ public: status_t RegisterVolume(Volume* volume); void UnregisterVolume(Volume* volume); + // deletes the volume (eventually) Volume* FindVolume(dev_t deviceID) const; + void HandleNodeMonitorEvents(Volume* volume); + protected: virtual void LastReferenceReleased(); private: + struct InitPackagesJob; + struct DeleteVolumeJob; + struct HandleNodeMonitorEventsJob; + +private: Volume** _GetVolume(PackageFSMountType mountType); + status_t _QueueJob(Job* job); + + static status_t _JobRunnerEntry(void* data); + status_t _JobRunner(); + private: node_ref fNodeRef; BString fPath; Volume* fSystemVolume; Volume* fCommonVolume; Volume* fHomeVolume; + JobQueue fJobQueue; + thread_id fJobRunner; }; diff --git a/src/servers/package/Volume.cpp b/src/servers/package/Volume.cpp index 526a7ed..8c33084 100644 --- a/src/servers/package/Volume.cpp +++ b/src/servers/package/Volume.cpp @@ -16,15 +16,50 @@ #include <Directory.h> #include <Entry.h> +#include <Looper.h> #include <NodeMonitor.h> #include <Path.h> #include <AutoDeleter.h> +#include <AutoLocker.h> #include "DebugSupport.h" +#include "Root.h" -Volume::Volume() +// #pragma mark - NodeMonitorEvent + + +struct Volume::NodeMonitorEvent + : public DoublyLinkedListLinkImpl<NodeMonitorEvent> { +public: + NodeMonitorEvent(const BString& entryName, bool created) + : + fEntryName(entryName), + fCreated(created) + { + } + + const BString& EntryName() const + { + return fEntryName; + } + + bool WasCreated() const + { + return fCreated; + } + +private: + BString fEntryName; + bool fCreated; +}; + + +// #pragma mark - Volume + + +Volume::Volume(BLooper* looper) : BHandler(), fPath(), @@ -33,8 +68,11 @@ Volume::Volume() fPackagesDirectoryRef(), fRoot(NULL), fPackagesByFileName(), - fPackagesByNodeRef() + fPackagesByNodeRef(), + fPendingNodeMonitorEventsLock("pending node monitor events"), + fPendingNodeMonitorEvents() { + looper->AddHandler(this); } @@ -107,19 +145,55 @@ Volume::Init(const node_ref& rootDirectoryRef, node_ref& _packageRootRef) fPackagesDirectoryRef.device = info.packagesDeviceID; fPackagesDirectoryRef.node = info.packagesDirectoryID; - // read in all packages in the directory + _packageRootRef.device = info.rootDeviceID; + _packageRootRef.node = info.rootDirectoryID; + + return B_OK; +} + + +status_t +Volume::InitPackages() +{ + // node-monitor the volume's packages directory + status_t error = watch_node(&fPackagesDirectoryRef, B_WATCH_DIRECTORY, + BMessenger(this)); + if (error != B_OK) { + ERROR("Volume::InitPackages(): failed to start watching the packages " + "directory of the volume at \"%s\": %s\n", + fPath.String(), strerror(error)); + // Not good, but not fatal. Only the manual package operations in the + // packages directory won't work correctly. + } + + // read the packages directory and get the active packages + int fd = OpenRootDirectory(); + if (fd < 0) { + ERROR("Volume::InitPackages(): failed to open root directory: %s\n", + strerror(fd)); + RETURN_ERROR(fd); + } + FileDescriptorCloser fdCloser(fd); + error = _ReadPackagesDirectory(); if (error != B_OK) RETURN_ERROR(error); - _GetActivePackages(fd); - - _packageRootRef.device = info.rootDeviceID; - _packageRootRef.node = info.rootDirectoryID; + error = _GetActivePackages(fd); + if (error != B_OK) + RETURN_ERROR(error); return B_OK; } + +void +Volume::Unmounted() +{ + stop_watching(BMessenger(this)); +} + + void Volume::MessageReceived(BMessage* message) { @@ -169,6 +243,28 @@ Volume::OpenRootDirectory() const void +Volume::ProcessPendingNodeMonitorEvents() +{ + // get the events + NodeMonitorEventList events; + { + AutoLocker<BLocker> eventsLock(fPendingNodeMonitorEventsLock); + events.MoveFrom(&fPendingNodeMonitorEvents); + } + + // process them +// TODO: Don't do that individually. + while (NodeMonitorEvent* event = events.RemoveHead()) { + ObjectDeleter<NodeMonitorEvent> eventDeleter(event); + if (event->WasCreated()) + _PackagesEntryCreated(event->EntryName()); + else + _PackagesEntryRemoved(event->EntryName()); + } +} + + +void Volume::_HandleEntryCreatedOrRemoved(const BMessage* message, bool created) { // only moves to or from our packages directory are interesting @@ -182,10 +278,7 @@ Volume::_HandleEntryCreatedOrRemoved(const BMessage* message, bool created) return; } - if (created) - _PackagesEntryCreated(name); - else - _PackagesEntryRemoved(name); + _QueueNodeMonitorEvent(name, created); } @@ -209,9 +302,32 @@ Volume::_HandleEntryMoved(const BMessage* message) } if (fromDirectoryID == fPackagesDirectoryRef.node) - _PackagesEntryRemoved(fromName); + _QueueNodeMonitorEvent(fromName, false); if (toDirectoryID == fPackagesDirectoryRef.node) - _PackagesEntryCreated(toName); + _QueueNodeMonitorEvent(toName, true); +} + + +void +Volume::_QueueNodeMonitorEvent(const BString& name, bool wasCreated) +{ + if (name.IsEmpty()) { + ERROR("Volume::_QueueNodeMonitorEvent(): got empty name.\n"); + return; + } + + NodeMonitorEvent* event + = new(std::nothrow) NodeMonitorEvent(name, wasCreated); + if (event == NULL) { + ERROR("Volume::_QueueNodeMonitorEvent(): out of memory.\n"); + return; + } + + AutoLocker<BLocker> eventsLock(fPendingNodeMonitorEventsLock); + fPendingNodeMonitorEvents.Add(event); + eventsLock.Unlock(); + + fRoot->HandleNodeMonitorEvents(this); } diff --git a/src/servers/package/Volume.h b/src/servers/package/Volume.h index 4dc5450..c791cae 100644 --- a/src/servers/package/Volume.h +++ b/src/servers/package/Volume.h @@ -10,9 +10,11 @@ #include <Handler.h> +#include <Locker.h> #include <String.h> #include <packagefs.h> +#include <util/DoublyLinkedList.h> #include "Package.h" @@ -24,11 +26,14 @@ class Root; class Volume : public BHandler { public: - Volume(); + Volume(BLooper* looper); virtual ~Volume(); status_t Init(const node_ref& rootDirectoryRef, node_ref& _packageRootRef); + status_t InitPackages(); + + void Unmounted(); virtual void MessageReceived(BMessage* message); @@ -58,10 +63,18 @@ public: int OpenRootDirectory() const; + void ProcessPendingNodeMonitorEvents(); + +private: + struct NodeMonitorEvent; + typedef DoublyLinkedList<NodeMonitorEvent> NodeMonitorEventList; + private: void _HandleEntryCreatedOrRemoved( const BMessage* message, bool created); void _HandleEntryMoved(const BMessage* message); + void _QueueNodeMonitorEvent(const BString& name, + bool wasCreated); void _PackagesEntryCreated(const char* name); void _PackagesEntryRemoved(const char* name); @@ -77,6 +90,8 @@ private: Root* fRoot; PackageFileNameHashTable fPackagesByFileName; PackageNodeRefHashTable fPackagesByNodeRef; + BLocker fPendingNodeMonitorEventsLock; + NodeMonitorEventList fPendingNodeMonitorEvents; };