# HG changeset patch # User Paul Boddie # Date 1629236676 -7200 # Node ID a354a465462b116a35227a6db3386477806107d8 # Parent 06f07bedbc21fd2853a0dd3d50f79364cfe7437f Introduced separate notification domains for file-specific and general waiting and notification operations. This involves moving the notification state into the notifier for each domain. A per-task notifier should allow the blocking status of file descriptors/structures to be preserved across multiple threads, whereas a per-thread notifier (or perhaps a local notifier instead in future) permits general or "open" waiting for notifications associated with a restricted set of file descriptors. diff -r 06f07bedbc21 -r a354a465462b libfsclient/include/fsclient/client.h --- a/libfsclient/include/fsclient/client.h Sat Aug 14 18:41:57 2021 +0200 +++ b/libfsclient/include/fsclient/client.h Tue Aug 17 23:44:36 2021 +0200 @@ -72,8 +72,8 @@ /* Notification operations. */ long client_set_blocking(file_t *file, notify_flags_t flags); -long client_subscribe(file_t *file, notify_flags_t flags); -long client_unsubscribe(file_t *file); +long client_subscribe(file_t *file, notify_flags_t flags, notifier_t notifier_type); +long client_unsubscribe(file_t *file, notifier_t notifier_type); long client_wait_file(file_t *file); long client_wait_files(file_t **file); diff -r 06f07bedbc21 -r a354a465462b libfsclient/include/fsclient/file.h --- a/libfsclient/include/fsclient/file.h Sat Aug 14 18:41:57 2021 +0200 +++ b/libfsclient/include/fsclient/file.h Tue Aug 17 23:44:36 2021 +0200 @@ -38,10 +38,6 @@ l4_cap_idx_t ref; - /* Notification endpoint reference. */ - - l4_cap_idx_t notifier; - /* Mapped memory accessing a file region. */ char *memory; @@ -72,6 +68,17 @@ +/* Notifier types. */ + +typedef enum +{ + NOTIFIER_TASK = 0, + NOTIFIER_THREAD = 1, + +} notifier_t; + + + /* Filesystem operations. */ long file_open_for_user(user_t user, l4_cap_idx_t server, l4_cap_idx_t *opener); @@ -118,8 +125,8 @@ /* Notification functions. */ -long file_notify_subscribe(file_t *file, notify_flags_t flags); -long file_notify_unsubscribe(file_t *file); +long file_notify_subscribe(file_t *file, notify_flags_t flags, notifier_t notifier_type); +long file_notify_unsubscribe(file_t *file, notifier_t notifier_type); long file_notify_wait_file(file_t *file); long file_notify_wait_files(file_t **file); diff -r 06f07bedbc21 -r a354a465462b libfsclient/include/fsclient/notifier.h --- a/libfsclient/include/fsclient/notifier.h Sat Aug 14 18:41:57 2021 +0200 +++ b/libfsclient/include/fsclient/notifier.h Tue Aug 17 23:44:36 2021 +0200 @@ -27,14 +27,37 @@ #include #include -#include #include +/* File-specific notification details. */ + +class FileNotificationState +{ +public: + /* Synchronisation primitives for state access and notification. */ + + std::mutex lock; + std::condition_variable condition; + + /* Pending notifications for monitored files. */ + + notify_flags_t pending = 0; + + /* Endpoints associated with monitored files. */ + + l4_cap_idx_t endpoint = L4_INVALID_CAP; + + bool is_null() { return l4_is_invalid_cap(endpoint); } +}; + + + /* Collection types. */ -typedef std::map FileNotifications; +typedef std::map FileNotificationStates; +typedef std::map FileStateLocks; @@ -43,28 +66,36 @@ class FileNotifier { protected: - std::list _affected; - FileNotifications _affected_flags; + /* General state access locking. */ + + std::mutex _state_lock; + + /* File-specific state locking. */ + + FileStateLocks _file_locks; + + /* Notification state. */ + + FileNotificationStates _state; + + /* Notifier thread details. */ + l4_cap_idx_t _thread = L4_INVALID_CAP; bool _started = false; - std::mutex _lock; - std::condition_variable _notified; + /* Convenience method to access file state. */ + + virtual FileNotificationState &file_state(file_t *file, bool create); /* Helper methods. */ - virtual void _notify(file_t *file, notify_flags_t flags); + virtual void _notify(file_t *file, notify_flags_t flags) = 0; + + virtual bool _transfer(FileNotificationState &state, file_t *file); + + virtual void _unsubscribe(FileNotificationState &state, file_t *file); public: - /* Server details. */ - - int expected_items(); - - ipc_server_handler_type handler(); - - void *interface() - { return static_cast(this); } - /* Local operations. */ virtual long start(); @@ -73,10 +104,6 @@ virtual long unsubscribe(file_t *file); - virtual long wait(file_t **file); - - virtual long wait_file(file_t *file); - /* Event handling support. */ virtual void mainloop(); @@ -84,8 +111,59 @@ +/* An object monitoring notifications for a collection of different files. */ + +class GeneralFileNotifier : public FileNotifier +{ +protected: + /* Locking to protect pending notification members and to coordinate access + to notifications. */ + + std::mutex _general_lock; + + /* General lock synchronisation. */ + + std::condition_variable _general_condition; + + /* Files affected by notifications. */ + + std::list _affected; + + /* Helper methods. */ + + virtual void _notify(file_t *file, notify_flags_t flags); + + virtual bool _retrieve(file_t **file); + + virtual bool _retrieve_for_file(file_t *file); + +public: + virtual long wait(file_t **file); +}; + + + +/* An object monitoring notifications for specific files. */ + +class SpecificFileNotifier : public FileNotifier +{ +protected: + /* Helper methods. */ + + virtual void _notify(file_t *file, notify_flags_t flags); + +public: + virtual long wait_file(file_t *file); +}; + + + /* Helper functions. */ -FileNotifier *get_notifier(); +FileNotifier *get_notifier(notifier_t notifier_type); + +SpecificFileNotifier *get_task_notifier(); + +GeneralFileNotifier *get_thread_notifier(); // vim: tabstop=4 expandtab shiftwidth=4 diff -r 06f07bedbc21 -r a354a465462b libfsclient/lib/src/client.cc --- a/libfsclient/lib/src/client.cc Sat Aug 14 18:41:57 2021 +0200 +++ b/libfsclient/lib/src/client.cc Tue Aug 17 23:44:36 2021 +0200 @@ -545,12 +545,13 @@ if (file->can_block == flags) return L4_EOK; - // NOTE: Set appropriate flags. + /* Since blocking access is used with specific file notifications, the + per-task notifier is used. */ if (flags) - err = client_subscribe(file, flags); + err = client_subscribe(file, flags, NOTIFIER_TASK); else - err = client_unsubscribe(file); + err = client_unsubscribe(file, NOTIFIER_TASK); if (err) return err; @@ -563,12 +564,12 @@ /* Subscribe from events concerning a file. */ -long client_subscribe(file_t *file, notify_flags_t flags) +long client_subscribe(file_t *file, notify_flags_t flags, notifier_t notifier_type) { if (file == NULL) return -L4_EINVAL; - return file_notify_subscribe(file, flags); + return file_notify_subscribe(file, flags, notifier_type); } @@ -587,12 +588,12 @@ /* Unsubscribe from events concerning a file. */ -long client_unsubscribe(file_t *file) +long client_unsubscribe(file_t *file, notifier_t notifier_type) { if (file == NULL) return -L4_EINVAL; - return file_notify_unsubscribe(file); + return file_notify_unsubscribe(file, notifier_type); } diff -r 06f07bedbc21 -r a354a465462b libfsclient/lib/src/file.cc --- a/libfsclient/lib/src/file.cc Sat Aug 14 18:41:57 2021 +0200 +++ b/libfsclient/lib/src/file.cc Tue Aug 17 23:44:36 2021 +0200 @@ -79,7 +79,6 @@ { file->memory = NULL; file->ref = L4_INVALID_CAP; - file->notifier = L4_INVALID_CAP; file->start_pos = 0; file->end_pos = 0; file->data_end = 0; @@ -98,8 +97,10 @@ if (l4_is_valid_cap(file->ref)) ipc_cap_free_um(file->ref); - if (l4_is_valid_cap(file->notifier)) - ipc_cap_free_um(file->notifier); + /* Only unsubscribe after actually closing the file and sending any + notifications. */ + + get_task_notifier()->unsubscribe(file); if (file->memory != NULL) ipc_detach_dataspace(file->memory); @@ -382,21 +383,18 @@ /* Subscribe to notification events on a file. */ -long file_notify_subscribe(file_t *file, notify_flags_t flags) +long file_notify_subscribe(file_t *file, notify_flags_t flags, notifier_t notifier_type) { - FileNotifier *notifier = get_notifier(); + FileNotifier *notifier = get_notifier(notifier_type); return notifier->subscribe(file, flags); } /* Unsubscribe from notification events on a file. */ -long file_notify_unsubscribe(file_t *file) +long file_notify_unsubscribe(file_t *file, notifier_t notifier_type) { - if (l4_is_invalid_cap(file->notifier)) - return -L4_EINVAL; - - FileNotifier *notifier = get_notifier(); + FileNotifier *notifier = get_notifier(notifier_type); return notifier->unsubscribe(file); } @@ -405,7 +403,7 @@ long file_notify_wait_file(file_t *file) { - FileNotifier *notifier = get_notifier(); + SpecificFileNotifier *notifier = get_task_notifier(); return notifier->wait_file(file); } @@ -414,7 +412,7 @@ long file_notify_wait_files(file_t **file) { - FileNotifier *notifier = get_notifier(); + GeneralFileNotifier *notifier = get_thread_notifier(); return notifier->wait(file); } diff -r 06f07bedbc21 -r a354a465462b libfsclient/lib/src/notifier.cc --- a/libfsclient/lib/src/notifier.cc Sat Aug 14 18:41:57 2021 +0200 +++ b/libfsclient/lib/src/notifier.cc Tue Aug 17 23:44:36 2021 +0200 @@ -33,93 +33,88 @@ -/* Thread-local storage workaround. */ +/* Null notification state. */ + +static FileNotificationState _null_state; + + + +/* Lock protecting per-task notifier access. */ + +static std::mutex _lock; + +/* Per-task storage for specific waiting operations. */ + +static SpecificFileNotifier *_notifier = NULL; + +/* Per-thread storage for "open" waiting operations. + (This workaround for thread-local storage maps thread capabilities to + notifiers). */ + +static std::map _notifiers; + + -static std::mutex lock; -static std::map notifiers; +/* Return the per-task notifier for file-specific waiting operations. */ + +SpecificFileNotifier *get_task_notifier() +{ + std::lock_guard guard(_lock); + + /* Start any new notifier. */ -FileNotifier *get_notifier() + if (_notifier == NULL) + { + _notifier = new SpecificFileNotifier; + _notifier->start(); + } + + return _notifier; +} + +/* Return the per-thread notifier for general file waiting operations. */ + +GeneralFileNotifier *get_thread_notifier() { - std::lock_guard guard(lock); + std::lock_guard guard(_lock); l4_cap_idx_t thread = pthread_l4_cap(pthread_self()); - FileNotifier *notifier = notifiers[thread]; + GeneralFileNotifier *notifier = _notifiers[thread]; /* Start any new notifier. */ if (notifier == NULL) { - notifier = new FileNotifier; - notifiers[thread] = notifier; + notifier = new GeneralFileNotifier; + _notifiers[thread] = notifier; notifier->start(); } return notifier; } - - -/* Subscribe to notification events on a file. */ +/* Helper function to obtain the appropriate notifier. */ -long FileNotifier::subscribe(file_t *file, notify_flags_t flags) +FileNotifier *get_notifier(notifier_t notifier_type) { - /* Create a notification endpoint, if necessary. */ - - if (l4_is_invalid_cap(file->notifier)) + switch (notifier_type) { - long err = ipc_server_new_for_thread(&file->notifier, file, _thread); - - if (err) - return err; + case NOTIFIER_TASK: return get_task_notifier(); + case NOTIFIER_THREAD: return get_thread_notifier(); + default: return NULL; } - - client_Notification notify(file->ref); - - return notify.subscribe(file->notifier, flags); } -/* Unsubscribe from notification events on a file. */ -long FileNotifier::unsubscribe(file_t *file) -{ - if (l4_is_invalid_cap(file->notifier)) - return -L4_EINVAL; - - client_Notification notify(file->ref); - long err = notify.unsubscribe(file->notifier); - - if (err) - return err; - - ipc_cap_free_um(file->notifier); - return L4_EOK; -} +/* Invoke the mainloop in a thread. */ -/* Handle a notification event for a file. Ideally, this would be invoked by the - generic server dispatch mechanism, with the gate label being interpreted and - provided as the first parameter. */ - -void FileNotifier::_notify(file_t *file, notify_flags_t flags) +static void *notifier_mainloop(void *data) { - std::unique_lock guard(_lock); - - /* Record the flags for the file object. Where no flags are already recorded, - the new flags will be recorded and the file object queued. Otherwise, the - new flags will be combined with the recorded flags. */ + FileNotifier *notifier = reinterpret_cast(data); - notify_flags_t recorded = _affected_flags[file]; - - _affected_flags[file] = recorded | flags; - - /* Add a file queue entry for any files without recorded notifications. */ - - if (!recorded) - _affected.push_back(file); - - /* Notify any waiting caller. */ - - _notified.notify_one(); + notifier->mainloop(); + return 0; } /* Listen for notifications. */ @@ -165,16 +160,6 @@ ipc_message_free(&msg); } -/* Invoke the mainloop in a thread. */ - -static void *notifier_mainloop(void *data) -{ - FileNotifier *notifier = reinterpret_cast(data); - - notifier->mainloop(); - return 0; -} - /* Start listening for notifications. */ long FileNotifier::start() @@ -199,36 +184,238 @@ return L4_EOK; } + + +/* Return a notification state object for the given file or a null object if no + record existed for the file. */ + +FileNotificationState &FileNotifier::file_state(file_t *file, bool create) +{ + FileNotificationStates::iterator it = _state.find(file); + + if (it == _state.end()) + { + if (create) + return _state[file]; + else + return _null_state; + } + + return it->second; +} + +/* Subscribe to notification events on a file. */ + +long FileNotifier::subscribe(file_t *file, notify_flags_t flags) +{ + /* Acquire the lock for state lookup. */ + + std::unique_lock state_guard(_state_lock); + + FileNotificationState &state = file_state(file, true); + + /* Create a notification endpoint, if necessary. */ + + if (state.is_null()) + { + long err = ipc_server_new_for_thread(&state.endpoint, file, _thread); + + if (err) + return err; + } + + client_Notification notify(file->ref); + + return notify.subscribe(state.endpoint, flags); +} + +/* Unsubscribe from notification events on a file. */ + +long FileNotifier::unsubscribe(file_t *file) +{ + /* Acquire the lock for state lookup. */ + + std::unique_lock state_guard(_state_lock); + + FileNotificationState &state = file_state(file, false); + + if (state.is_null()) + return -L4_EINVAL; + + client_Notification notify(file->ref); + + long err = notify.unsubscribe(state.endpoint); + + if (err) + return err; + + _unsubscribe(state, file); + + /* Remove the lock for updating file state. */ + + _file_locks.erase(file); + + return L4_EOK; +} + +/* Remove file notification state from the notifier. */ + +void FileNotifier::_unsubscribe(FileNotificationState &state, file_t *file) +{ + /* Acquire the lock for updating file state. */ + + std::mutex &file_lock = _file_locks[file]; + std::unique_lock file_guard(file_lock); + + /* Remove file-specific state. */ + + ipc_cap_free_um(state.endpoint); + _state.erase(file); +} + + + +/* Handle a notification event for a file. Ideally, this would be invoked by the + generic server dispatch mechanism, with the gate label being interpreted and + provided as the first parameter. */ + +void GeneralFileNotifier::_notify(file_t *file, notify_flags_t flags) +{ + /* Enter critical section for the notifier (affecting all files). */ + + std::unique_lock general_guard(_general_lock); + + /* Acquire the lock for state lookup. */ + + std::unique_lock state_guard(_state_lock); + + FileNotificationState &state = file_state(file, false); + + if (state.is_null()) + return; + + /* Acquire the lock for the file state itself. */ + + std::unique_lock file_guard(state.lock); + + /* Record flags and return previous flags. */ + + notify_flags_t recorded = state.pending; + + state.pending |= flags; + + /* Add a file queue entry for any files without previous notifications. */ + + if (!recorded) + _affected.push_back(file); + + /* Notify any waiting caller. */ + + _general_condition.notify_one(); +} + +void SpecificFileNotifier::_notify(file_t *file, notify_flags_t flags) +{ + /* Acquire the lock for state lookup. */ + + std::unique_lock state_guard(_state_lock); + + FileNotificationState &state = file_state(file, false); + + if (state.is_null()) + return; + + /* Acquire the lock for the file state itself. */ + + std::unique_lock file_guard(state.lock); + + state.pending |= flags; + + /* Notify any waiting caller. */ + + state.condition.notify_one(); +} + + + +/* Transfer pending notifications to the given file. This must be called with a + lock acquired on the file notification state. */ + +bool FileNotifier::_transfer(FileNotificationState &state, file_t *file) +{ + notify_flags_t recorded = state.pending; + + if (recorded) + { + file->notifications = recorded; + state.pending = 0; + return true; + } + + return false; +} + + + +/* Obtain file state and transfer notifications. */ + +bool GeneralFileNotifier::_retrieve_for_file(file_t *file) +{ + /* Acquire the lock for state lookup. */ + + std::unique_lock state_guard(_state_lock); + + FileNotificationState &state = file_state(file, false); + + if (state.is_null()) + return false; + + /* Acquire the lock for the file state itself, then release the state lock. */ + + std::unique_lock file_guard(state.lock); + + state_guard.unlock(); + + /* Call generic method to transfer notifications, if possible. */ + + return _transfer(state, file); +} + +/* Obtain queued files until one is found that still has events recorded for it. + This must be called with the notifier's general lock acquired. */ + +bool GeneralFileNotifier::_retrieve(file_t **file) +{ + while (!_affected.empty()) + { + *file = _affected.front(); + _affected.pop_front(); + + if (_retrieve_for_file(*file)) + return true; + } + + return false; +} + + + /* Wait for notification events on files. */ -long FileNotifier::wait(file_t **file) +long GeneralFileNotifier::wait(file_t **file) { - std::unique_lock guard(_lock); + std::unique_lock general_guard(_general_lock); while (1) { - /* Obtain queued files until one is found that still has events recorded - for it. (Waiting for events specific to one file will remove recorded - events but not any file queue entries.) */ - - while (!_affected.empty()) - { - *file = _affected.front(); - _affected.pop_front(); - - notify_flags_t recorded = _affected_flags[*file]; + /* With pending notifications, update the first file and exit. */ - if (recorded) - { - (*file)->notifications = recorded; - _affected_flags.erase(*file); - return L4_EOK; - } - } + if (_retrieve(file)) + break; - /* No queued events. */ + /* Otherwise, wait for notifications. */ - _notified.wait(guard); + _general_condition.wait(general_guard); } return L4_EOK; @@ -236,24 +423,33 @@ /* Wait for notifications from a single file. */ -long FileNotifier::wait_file(file_t *file) +long SpecificFileNotifier::wait_file(file_t *file) { - std::unique_lock guard(_lock); + /* Acquire the lock for reading file state. */ + + std::unique_lock state_guard(_state_lock); + + FileNotificationState &state = file_state(file, false); + + if (state.is_null()) + return -L4_EINVAL; + + /* Acquire the lock for the file state itself, then release the state lock. */ + + std::unique_lock file_guard(state.lock); + + state_guard.unlock(); while (1) { - notify_flags_t recorded = _affected_flags[file]; + /* With pending notifications, update the file and exit. */ - if (recorded) - { - file->notifications = recorded; - _affected_flags.erase(file); - return L4_EOK; - } + if (_transfer(state, file)) + break; - /* No recorded events for the file. */ + /* Otherwise, wait for notifications. */ - _notified.wait(guard); + state.condition.wait(file_guard); } return L4_EOK; diff -r 06f07bedbc21 -r a354a465462b tests/dstest_file_client.cc --- a/tests/dstest_file_client.cc Sat Aug 14 18:41:57 2021 +0200 +++ b/tests/dstest_file_client.cc Tue Aug 17 23:44:36 2021 +0200 @@ -61,7 +61,7 @@ /* Register for notifications. */ - if ((err = client_subscribe(file, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED))) + if ((err = client_subscribe(file, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED, NOTIFIER_TASK))) { printf("Could not subscribe to notifications: %s\n", l4sys_errtostr(err)); return; diff -r 06f07bedbc21 -r a354a465462b tests/dstest_pipe_client.cc --- a/tests/dstest_pipe_client.cc Sat Aug 14 18:41:57 2021 +0200 +++ b/tests/dstest_pipe_client.cc Tue Aug 17 23:44:36 2021 +0200 @@ -79,8 +79,8 @@ /* Register the readers for notification. */ - if ((err = client_subscribe(reader1, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED)) || - (err = client_subscribe(reader2, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED))) + if ((err = client_subscribe(reader1, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED, NOTIFIER_THREAD)) || + (err = client_subscribe(reader2, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED, NOTIFIER_THREAD))) { printf("Could not subscribe to notifications: %s\n", l4sys_errtostr(err)); return;