# HG changeset patch # User Paul Boddie # Date 1680474934 -7200 # Node ID f74f22ce939dfbb105cec4370a931f16c3f77292 # Parent b3eab5c454521596e02dddeec0f28833cb775aba Changed the notifier arrangement to employ per-thread notifier resources that retain individual notifiable object details. Additional synchronisation around object state might be advisible, however. diff -r b3eab5c45452 -r f74f22ce939d libfsclient/lib/src/file.cc --- a/libfsclient/lib/src/file.cc Wed Mar 29 00:32:20 2023 +0200 +++ b/libfsclient/lib/src/file.cc Mon Apr 03 00:35:34 2023 +0200 @@ -96,6 +96,7 @@ file->notifiable.notifications = 0; file->notifiable.base = (notifiable_base_t *) file; + file->notifiable.handler = NULL; } @@ -634,7 +635,6 @@ void file_notify_close(file_notifier_t *notifier) { - notifier->obj->stop(); delete notifier->obj; delete notifier; } @@ -677,8 +677,7 @@ long file_notify_wait_file(file_t *file, file_notifier_t *notifier) { - SpecificObjectNotifier *specific_notifier = dynamic_cast(notifier->obj); - long err = specific_notifier->wait_object(file_notifiable(file)); + long err = notifier->obj->wait_object(file_notifiable(file)); /* Unsubscribe if a closure notification has been received. */ @@ -692,9 +691,8 @@ long file_notify_wait_files(file_t **file, file_notifier_t *notifier) { - GeneralObjectNotifier *general_notifier = dynamic_cast(notifier->obj); notifiable_t *notifiable; - long err = general_notifier->wait(¬ifiable); + long err = notifier->obj->wait(¬ifiable); *file = (file_t *) notifiable->base; diff -r b3eab5c45452 -r f74f22ce939d libfsclient/lib/src/process.cc --- a/libfsclient/lib/src/process.cc Wed Mar 29 00:32:20 2023 +0200 +++ b/libfsclient/lib/src/process.cc Mon Apr 03 00:35:34 2023 +0200 @@ -65,6 +65,12 @@ void process_init(process_t *process) { process->ref = L4_INVALID_CAP; + + /* Initialise the notifiable section of the structure. */ + + process->notifiable.notifications = 0; + process->notifiable.base = (notifiable_base_t *) process; + process->notifiable.handler = NULL; } /* Start a process using the given arguments. @@ -152,7 +158,6 @@ void process_notify_close(process_notifier_t *notifier) { - notifier->obj->stop(); delete notifier->obj; delete notifier; } @@ -195,8 +200,7 @@ long process_notify_wait_process(process_t *process, process_notifier_t *notifier) { - SpecificObjectNotifier *specific_notifier = dynamic_cast(notifier->obj); - long err = specific_notifier->wait_object(process_notifiable(process)); + long err = notifier->obj->wait_object(process_notifiable(process)); /* Unsubscribe if a termination notification has been received. */ @@ -214,9 +218,8 @@ long process_notify_wait_processes(process_t **process, process_notifier_t *notifier) { - GeneralObjectNotifier *general_notifier = dynamic_cast(notifier->obj); notifiable_t *notifiable; - long err = general_notifier->wait(¬ifiable); + long err = notifier->obj->wait(¬ifiable); *process = (process_t *) notifiable->base; diff -r b3eab5c45452 -r f74f22ce939d libnotifier/include/notifier/notifier.h --- a/libnotifier/include/notifier/notifier.h Wed Mar 29 00:32:20 2023 +0200 +++ b/libnotifier/include/notifier/notifier.h Mon Apr 03 00:35:34 2023 +0200 @@ -23,8 +23,8 @@ #include #include -#include #include +#include #include #include @@ -34,36 +34,10 @@ -/* Object-specific notification details. */ - -class ObjectNotificationState -{ -public: - /* Synchronisation primitives for state access and notification. */ - - std::mutex lock; - std::condition_variable condition; - - /* Pending notifications for monitored objects. */ - - notify_flags_t pending_notifications = 0; - notify_values_t pending_values = NOTIFY_VALUES_NULL; - - /* Endpoints associated with monitored objects. */ - - l4_cap_idx_t endpoint = L4_INVALID_CAP; - - bool is_null() { return l4_is_invalid_cap(endpoint); } -}; - - - /* Collection types. */ -typedef std::map ObjectNotificationStates; -typedef std::map ObjectStateLocks; - -typedef std::vector ServerConfigs; +typedef std::list NotifiableObjectQueue; +typedef std::set NotifiableObjects; @@ -72,102 +46,41 @@ class ObjectNotifier { protected: - /* General state access locking. */ - - std::mutex _state_lock; - - /* Object-specific state locking. */ + /* Locking for the affected queue and monitored object registry. */ - ObjectStateLocks _object_locks; - - /* Notification state. */ - - ObjectNotificationStates _state; + std::mutex _affected_lock, _monitored_lock; + std::condition_variable _condition; - /* Notifier resource details. */ - - ServerConfigs _configs; - bool _started = false; - l4_cap_idx_t _irq = L4_INVALID_CAP; + /* Objects affected by notifications. */ - /* Convenience method to access object state. */ - - virtual ObjectNotificationState &object_state(notifiable_t *object, bool create); - - /* Helper methods. */ - - virtual bool _transfer(ObjectNotificationState &state, notifiable_t *object); + NotifiableObjectQueue _queued; + NotifiableObjects _affected, _monitored; public: virtual ~ObjectNotifier(); /* Local operations. */ - virtual long start(); - - virtual void stop(); - virtual long subscribe(notifiable_t *object, notify_flags_t flags); virtual long unsubscribe(notifiable_t *object); - virtual long remove_endpoint(notifiable_t *object, l4_cap_idx_t endpoint); + virtual long wait(notifiable_t **object); + + virtual long wait_object(notifiable_t *object); /* Helper methods. */ - virtual void notify(notifiable_t *object, notify_flags_t flags, - notify_values_t values) = 0; + virtual void notify(notifiable_t *object); + + virtual void release(notifiable_t *object); }; -/* An object monitoring notifications for a collection of different objects. */ - -class GeneralObjectNotifier : public ObjectNotifier -{ -protected: - /* Locking to protect pending notification members and to coordinate access - to notifications. */ - - std::mutex _general_lock; - - /* General lock synchronisation. */ - - std::condition_variable _general_condition; - - /* Objects affected by notifications. */ - - std::list _affected; - - /* Helper methods. */ - - virtual bool _retrieve(notifiable_t **object); - - virtual bool _retrieve_for_object(notifiable_t *object); +/* Collection types. */ -public: - virtual long wait(notifiable_t **object); - - /* Helper methods. */ - - virtual void notify(notifiable_t *object, notify_flags_t flags, - notify_values_t values); -}; - - - -/* An object monitoring notifications for specific objects. */ - -class SpecificObjectNotifier : public ObjectNotifier -{ -public: - virtual long wait_object(notifiable_t *object); - - /* Helper methods. */ - - virtual void notify(notifiable_t *object, notify_flags_t flags, - notify_values_t values); -}; +typedef std::set ObjectNotifiers; @@ -176,19 +89,39 @@ class NotifierResource : public Notifier, public Resource { protected: - ObjectNotifier *_notifier; + /* Locking for the resource. */ + + std::mutex _lock; + std::condition_variable _condition; + + /* Notifiers and the monitored object. */ + + ObjectNotifiers _notifiers; notifiable_t *_object; + /* Pending notification status. */ + + bool _pending = false; + + /* Utility methods. */ + + virtual void _notify(); + + virtual void _release(); + public: - explicit NotifierResource(ObjectNotifier *notifier, notifiable_t *object) - : _notifier(notifier), _object(object) + l4_cap_idx_t endpoint = L4_INVALID_CAP; + + explicit NotifierResource(notifiable_t *object) + : _object(object) { } - explicit NotifierResource() - : _notifier(NULL), _object(NULL) - { - } + virtual ~NotifierResource(); + + /* Resource methods. */ + + virtual void close(); /* Server details. */ @@ -200,14 +133,22 @@ /* Notifier methods. */ virtual long notify(notify_flags_t flags, notify_values_t values); + + /* Local operations. */ + + virtual long add(ObjectNotifier *notifier, notify_flags_t flags); + + virtual long remove(ObjectNotifier *notifier); + + virtual long wait(); }; /* Helper functions. */ -SpecificObjectNotifier *notifier_get_task_notifier(); +ObjectNotifier *notifier_get_task_notifier(); -GeneralObjectNotifier *notifier_get_local_notifier(); +ObjectNotifier *notifier_get_local_notifier(); // vim: tabstop=4 expandtab shiftwidth=4 diff -r b3eab5c45452 -r f74f22ce939d libnotifier/lib/src/notifier.cc --- a/libnotifier/lib/src/notifier.cc Wed Mar 29 00:32:20 2023 +0200 +++ b/libnotifier/lib/src/notifier.cc Mon Apr 03 00:35:34 2023 +0200 @@ -19,11 +19,6 @@ * Boston, MA 02110-1301, USA */ -#include -#include - -#include - #include #include #include @@ -34,47 +29,35 @@ -/* Null notification state. */ - -static ObjectNotificationState _null_state; - - - /* Lock protecting per-task notifier access. */ -static std::mutex _lock; +static std::mutex _task_lock; /* Per-task storage for specific waiting operations. */ -static SpecificObjectNotifier *_notifier = NULL; +static ObjectNotifier *_notifier = NULL; /* Return the per-task notifier for object-specific waiting operations. */ -SpecificObjectNotifier *notifier_get_task_notifier() +ObjectNotifier *notifier_get_task_notifier() { - std::lock_guard guard(_lock); + std::lock_guard guard(_task_lock); /* Start any new notifier. */ if (_notifier == NULL) - { - _notifier = new SpecificObjectNotifier; - _notifier->start(); - } + _notifier = new ObjectNotifier; return _notifier; } /* Return a local notifier for general object waiting operations. */ -GeneralObjectNotifier *notifier_get_local_notifier() +ObjectNotifier *notifier_get_local_notifier() { - GeneralObjectNotifier *notifier = new GeneralObjectNotifier; - - notifier->start(); - return notifier; + return new ObjectNotifier; } @@ -83,358 +66,170 @@ ObjectNotifier::~ObjectNotifier() { - stop(); + /* Remove this notifier from the individual notifier resources. */ + + NotifiableObjects::iterator it; - ServerConfigs::iterator it; + for (it = _monitored.begin(); it != _monitored.end(); it++) + { + notifiable_t *object = *it; - for (it = _configs.begin(); it != _configs.end(); it++) - delete *it; + if (object->handler != NULL) + { + NotifierResource *resource = reinterpret_cast(object->handler); + resource->remove(this); + } + } - _configs.clear(); + _monitored.clear(); /* Handle deletion of the special task notifier. */ + std::lock_guard guard(_task_lock); + if (this == _notifier) _notifier = NULL; } - - -/* Start listening for notifications. */ - -long ObjectNotifier::start() -{ - if (_started) - return L4_EOK; - - /* Create a new thread to serve a "null" resource. This resource is not used - for notifications but merely for control purposes. */ - - NotifierResource *notifier = new NotifierResource; - ResourceServer server(notifier); - long err = server.start_thread(true, false); - - if (err) - return err; - - _configs.push_back(server.config()); - _started = true; - - /* Retain the IRQ created for the server for control purposes. */ - - _irq = server.config()->irq; - - return L4_EOK; -} - +/* Handle a deletion event for an object. */ - -/* Stop the notifier. */ - -void ObjectNotifier::stop() +void ObjectNotifier::release(notifiable_t *object) { - if (l4_is_valid_cap(_irq)) - { - l4_irq_trigger(_irq); - _irq = L4_INVALID_CAP; - } -} - - + std::lock_guard guard(_monitored_lock); -/* Return notification state for the given object or null state if no record - existed for the object. */ - -ObjectNotificationState &ObjectNotifier::object_state(notifiable_t *object, bool create) -{ - ObjectNotificationStates::iterator it = _state.find(object); - - if (it == _state.end()) - { - if (create) - return _state[object]; - else - return _null_state; - } - - return it->second; + _monitored.erase(object); } /* Subscribe to notification events on an object. */ long ObjectNotifier::subscribe(notifiable_t *object, notify_flags_t flags) { - /* Acquire the lock for state lookup. */ - - std::unique_lock state_guard(_state_lock); + std::lock_guard guard(_monitored_lock); - /* Obtain potentially new state for the object. */ + /* Ensure that a handler resource is available for object notifications. */ - ObjectNotificationState &state = object_state(object, true); + NotifierResource *resource; - if (state.is_null()) - { - /* Serve the new object in the notifier thread. */ + if (object->handler != NULL) + resource = reinterpret_cast(object->handler); + + /* Create a resource if none is recorded. */ - NotifierResource *resource = new NotifierResource(this, object); + else + { + resource = new NotifierResource(object); ResourceServer server(resource); - long err = server.start_in_thread(_configs.front()->thread); + long err = server.start_thread(&resource->endpoint); if (err) return err; - _configs.push_back(server.config()); - state.endpoint = server.config()->server; + object->handler = resource; } - /* Forbid repeated subscription. - NOTE: Could instead rely on being unsubscribed, releasing the existing - endpoint. */ + /* Record the object. */ - else - return -L4_EEXIST; + _monitored.insert(object); - /* Allow this object to be re-entered. This may occur because the subscribe - operation can cause deferred notifications to be sent back to the - subscribed notifier and to this object. */ - - state_guard.unlock(); + /* Record this notifier to get general notifications. */ - /* Subscribe, sending the notification endpoint via the principal reference - for the object. */ - - client_Notification notify(object->base->ref); - - return notify.subscribe(state.endpoint, flags); + return resource->add(this, flags); } /* Unsubscribe from notification events on an object. */ long ObjectNotifier::unsubscribe(notifiable_t *object) { - /* Acquire the lock for state lookup. */ + std::lock_guard guard(_monitored_lock); + + if (object->handler == NULL) + return L4_EOK; - std::unique_lock state_guard(_state_lock); + NotifierResource *resource = reinterpret_cast(object->handler); - ObjectNotificationState &state = object_state(object, false); + _monitored.erase(object); + return resource->remove(this); +} + +/* Handle a notification event for an object. */ - if (state.is_null()) - return -L4_ENOENT; +void ObjectNotifier::notify(notifiable_t *object) +{ + /* Enter critical section to access the queue. */ - /* Unsubscribe via the notification interface. */ + std::lock_guard guard(_affected_lock); + + /* Ensure that a queue entry exists for the object. */ + + NotifiableObjects::iterator it = _affected.find(object); - client_Notification notify(object->base->ref); + if (it == _affected.end()) + { + _queued.push_back(object); + _affected.insert(object); + } - notify.unsubscribe(); + /* Notify any waiting caller that at least one notification is available. */ - return remove_endpoint(object, state.endpoint); + _condition.notify_one(); } -/* Remove a notification endpoint for an object. */ +/* Wait for notification events on objects, returning each object that has a + notification registered for it. */ + +long ObjectNotifier::wait(notifiable_t **object) +{ + /* Enter critical section to access the queue. */ + + std::unique_lock guard(_affected_lock); + + while (1) + { + /* With pending notifications, return the first object. */ -long ObjectNotifier::remove_endpoint(notifiable_t *object, l4_cap_idx_t endpoint) + if (!_affected.empty()) + { + *object = _queued.front(); + _queued.pop_front(); + _affected.erase(*object); + break; + } + + /* Otherwise, wait for notifications. */ + + _condition.wait(guard); + } + + return L4_EOK; +} + +/* Wait for notification events on a specific object. */ + +long ObjectNotifier::wait_object(notifiable_t *object) { - if (l4_is_invalid_cap(endpoint)) + if (object->handler == NULL) return -L4_EINVAL; - ipc_cap_free_um(endpoint); - - _state.erase(object); - - /* Remove the lock for updating object state. */ - - _object_locks.erase(object); - + reinterpret_cast(object->handler)->wait(); return L4_EOK; } -/* Handle a notification event for an object. */ - -void GeneralObjectNotifier::notify(notifiable_t *object, notify_flags_t flags, - notify_values_t values) -{ - /* Enter critical section for the notifier (affecting all objects). */ - - std::unique_lock general_guard(_general_lock); - - /* Acquire the lock for state lookup. */ - - std::unique_lock state_guard(_state_lock); - - ObjectNotificationState &state = object_state(object, false); - - if (state.is_null()) - return; - - /* Acquire the lock for the object state itself. */ - - std::unique_lock object_guard(state.lock); - - /* Record flags and note previous flags. */ - - notify_flags_t recorded = state.pending_notifications; - - state.pending_notifications |= flags; - state.pending_values = values; - - /* Add an object queue entry for any objects without previous notifications. */ - - if (!recorded) - _affected.push_back(object); - - /* Notify any waiting caller. */ - - _general_condition.notify_one(); -} +/* Virtual destructor required for base class instance reference deletion. */ -void SpecificObjectNotifier::notify(notifiable_t *object, notify_flags_t flags, - notify_values_t values) +NotifierResource::~NotifierResource() { - /* Acquire the lock for state lookup. */ - - std::unique_lock state_guard(_state_lock); - - ObjectNotificationState &state = object_state(object, false); - - if (state.is_null()) - return; - - /* Acquire the lock for the object state itself. */ - - std::unique_lock object_guard(state.lock); - - state.pending_notifications |= flags; - state.pending_values = values; - - /* Notify any waiting caller. */ - - state.condition.notify_one(); -} - - - -/* Transfer pending notifications to the given object. This must be called with - a lock acquired on the object notification state. */ - -bool ObjectNotifier::_transfer(ObjectNotificationState &state, notifiable_t *object) -{ - notify_flags_t recorded = state.pending_notifications; - - if (recorded) - { - object->notifications = recorded; - object->values = state.pending_values; - state.pending_notifications = 0; - return true; - } - - return false; } - - -/* Obtain object state and transfer notifications. */ - -bool GeneralObjectNotifier::_retrieve_for_object(notifiable_t *object) -{ - /* Acquire the lock for state lookup. */ - - std::unique_lock state_guard(_state_lock); - - ObjectNotificationState &state = object_state(object, false); - - if (state.is_null()) - return false; - - /* Acquire the lock for the object state itself, then release the state lock. */ - - std::unique_lock object_guard(state.lock); - - state_guard.unlock(); - - /* Call generic method to transfer notifications, if possible. */ - - return _transfer(state, object); -} - -/* Obtain queued objects until one is found that still has events recorded for - it. This must be called with the notifier's general lock acquired. */ - -bool GeneralObjectNotifier::_retrieve(notifiable_t **object) -{ - while (!_affected.empty()) - { - *object = _affected.front(); - _affected.pop_front(); - - if (_retrieve_for_object(*object)) - return true; - } - - return false; -} - - - -/* Wait for notification events on objects. */ - -long GeneralObjectNotifier::wait(notifiable_t **object) -{ - std::unique_lock general_guard(_general_lock); +/* Handle the release of the resource. */ - while (1) - { - /* With pending notifications, update the first object and exit. */ - - if (_retrieve(object)) - break; - - /* Otherwise, wait for notifications. */ - - _general_condition.wait(general_guard); - } - - return L4_EOK; -} - -/* Wait for notifications from a single object. */ - -long SpecificObjectNotifier::wait_object(notifiable_t *object) +void NotifierResource::close() { - /* Acquire the lock for reading object state. */ - - std::unique_lock state_guard(_state_lock); - - ObjectNotificationState &state = object_state(object, false); - - if (state.is_null()) - return -L4_EINVAL; - - /* Acquire the lock for the object state itself, then release the state lock. */ - - std::unique_lock object_guard(state.lock); - - state_guard.unlock(); - - while (1) - { - /* With pending notifications, update the object and exit. */ - - if (_transfer(state, object)) - break; - - /* Otherwise, wait for notifications. */ - - state.condition.wait(object_guard); - } - - return L4_EOK; + _release(); + _object->handler = NULL; } - - /* Object-specific resource methods. */ ipc_server_default_config_type NotifierResource::config() @@ -442,14 +237,110 @@ return config_Notifier; } + + +/* Registration of notifiers. */ + +long NotifierResource::add(ObjectNotifier *notifier, notify_flags_t flags) +{ + std::lock_guard guard(_lock); + + bool is_new = _notifiers.empty(); + + _notifiers.insert(notifier); + + /* Subscribe, sending the notification endpoint via the principal reference + for the object. */ + + if (!is_new) + return L4_EOK; + + client_Notification notify(_object->base->ref); + + return notify.subscribe(endpoint, flags); +} + +long NotifierResource::remove(ObjectNotifier *notifier) +{ + std::lock_guard guard(_lock); + + _notifiers.erase(notifier); + + /* Unsubscribe via the notification interface. */ + + if (!_notifiers.empty()) + return L4_EOK; + + client_Notification notify(_object->base->ref); + + return notify.unsubscribe(); +} + + + /* Register a notification received by an object-specific resource. */ long NotifierResource::notify(notify_flags_t flags, notify_values_t values) { - if (_notifier != NULL) - _notifier->notify(_object, flags, values); + /* Update the notifiable object. */ + _object->notifications |= flags; + _object->values = values; + + _notify(); return L4_EOK; } -// vim: tabstop=2 expandtab shiftwidth=2 +void NotifierResource::_notify() +{ + /* Enter critical section for the resource. */ + + std::lock_guard guard(_lock); + + /* Record a pending notification which persists if nothing is waiting. */ + + _pending = true; + + /* Notify any party waiting specifically on this object. */ + + _condition.notify_one(); + + /* Register the notification with all notifier objects. */ + + ObjectNotifiers::iterator it; + + for (it = _notifiers.begin(); it != _notifiers.end(); it++) + (*it)->notify(_object); +} + +void NotifierResource::_release() +{ + /* Enter critical section for the resource. */ + + std::lock_guard guard(_lock); + + ObjectNotifiers::iterator it; + + for (it = _notifiers.begin(); it != _notifiers.end(); it++) + (*it)->release(_object); +} + +/* Wait for notification events on a specific object. */ + +long NotifierResource::wait() +{ + /* Enter critical section for the resource. */ + + std::unique_lock guard(_lock); + + /* Wait for the notification condition. */ + + if (!_pending) + _condition.wait(guard); + + _pending = false; + return L4_EOK; +} + +/* vim: tabstop=2 expandtab shiftwidth=2 +*/ diff -r b3eab5c45452 -r f74f22ce939d libsystypes/include/systypes/base.h --- a/libsystypes/include/systypes/base.h Wed Mar 29 00:32:20 2023 +0200 +++ b/libsystypes/include/systypes/base.h Mon Apr 03 00:35:34 2023 +0200 @@ -77,6 +77,7 @@ notifiable_base_t *base; /* access to the specific object */ notify_flags_t notifications; /* essential notifications */ notify_values_t values; /* signal-specific values */ + void *handler; /* associated notification handler */ } notifiable_t; diff -r b3eab5c45452 -r f74f22ce939d tests/dstest_file_monitor.cc --- a/tests/dstest_file_monitor.cc Wed Mar 29 00:32:20 2023 +0200 +++ b/tests/dstest_file_monitor.cc Mon Apr 03 00:35:34 2023 +0200 @@ -56,6 +56,7 @@ { sprintf(buffer, "%s/file%02d", filename, i); + printf("Open file %d.\n", i); client_open(buffer, O_CREAT | O_RDWR); sleep(1); } @@ -65,21 +66,13 @@ static void monitor_files(file_t *directory) { - long err = client_subscribe(directory, NOTIFY_FILE_OPENED, client_notifier_task()); - - if (err) - { - printf("Could not subscribe to events on directory.\n"); - return; - } - int expected; for (expected = FILES_TO_OPEN; expected; expected--) { /* Wait for notification of content. */ - err = client_wait_file(directory, client_notifier_task()); + long err = client_wait_file(directory, client_notifier_task()); if (err) { @@ -116,6 +109,14 @@ return 1; } + long err = client_subscribe(directory, NOTIFY_FILE_OPENED, client_notifier_task()); + + if (err) + { + printf("Could not subscribe to events on directory.\n"); + return 1; + } + /* Schedule threads. */ std::thread *activities[2]; @@ -127,6 +128,9 @@ activities[i]->join(); printf("Expected %d 'file opened' notifications.\n", FILES_TO_OPEN); + + client_close(directory); + printf("End of test.\n"); return 0; }