# HG changeset patch # User Paul Boddie # Date 1625525125 -7200 # Node ID 637bdd44285e90076170ec83ecb139f24b2214ed # Parent 36bfd4efefb38595dcc2de004d122701d8fcdbf9 Replaced IRQ usage with a dedicated notifier abstraction for file/pipe events. Currently, this employs thread-specific notifiers, but a global notifier with support for monitoring of specific files might be more appropriate. diff -r 36bfd4efefb3 -r 637bdd44285e libfsclient/include/fsclient/client.h --- a/libfsclient/include/fsclient/client.h Fri Jul 02 23:53:13 2021 +0200 +++ b/libfsclient/include/fsclient/client.h Tue Jul 06 00:45:25 2021 +0200 @@ -59,7 +59,6 @@ long client_set_blocking(file_t *file, notify_flags_t flags); long client_subscribe(file_t *file, notify_flags_t flags); long client_unsubscribe(file_t *file); -long client_wait_init(file_t *file); long client_wait_file(file_t *file); long client_wait_files(file_t **file); diff -r 36bfd4efefb3 -r 637bdd44285e libfsclient/include/fsclient/file.h --- a/libfsclient/include/fsclient/file.h Fri Jul 02 23:53:13 2021 +0200 +++ b/libfsclient/include/fsclient/file.h Tue Jul 06 00:45:25 2021 +0200 @@ -37,9 +37,9 @@ l4_cap_idx_t ref; - /* Notification IRQ reference. */ + /* Notification endpoint reference. */ - l4_cap_idx_t irq; + l4_cap_idx_t notifier; /* Mapped memory accessing a file region. */ @@ -67,6 +67,10 @@ notify_flags_t can_block; + /* Saved notifications. */ + + notify_flags_t notifications; + } file_t; @@ -115,7 +119,6 @@ long file_notify_subscribe(file_t *file, notify_flags_t flags); long file_notify_unsubscribe(file_t *file); -long file_notify_bind_file(file_t *file); long file_notify_wait_file(file_t *file); long file_notify_wait_files(file_t **file); diff -r 36bfd4efefb3 -r 637bdd44285e libfsclient/include/fsclient/notifier.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libfsclient/include/fsclient/notifier.h Tue Jul 06 00:45:25 2021 +0200 @@ -0,0 +1,79 @@ +/* + * File event notification support. + * + * Copyright (C) 2021 Paul Boddie + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation; either version 2 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include + + + +/* An object for monitoring file event notifications. */ + +class FileNotifier +{ +protected: + std::list _affected; + l4_cap_idx_t _thread = L4_INVALID_CAP; + bool _started = false; + + std::mutex _lock; + std::condition_variable _notified; + + /* Helper methods. */ + + virtual void _notify(file_t *file, notify_flags_t flags); + +public: + /* Server details. */ + + int expected_items(); + + ipc_server_handler_type handler(); + + void *interface() + { return static_cast(this); } + + /* Local operations. */ + + virtual long subscribe(file_t *file, notify_flags_t flags); + + virtual long unsubscribe(file_t *file); + + virtual void mainloop(); + + virtual long start(); + + virtual long wait(file_t **file); +}; + + + +/* Helper functions. */ + +FileNotifier *get_notifier(); + +// vim: tabstop=4 expandtab shiftwidth=4 diff -r 36bfd4efefb3 -r 637bdd44285e libfsclient/lib/src/Makefile --- a/libfsclient/lib/src/Makefile Fri Jul 02 23:53:13 2021 +0200 +++ b/libfsclient/lib/src/Makefile Tue Jul 06 00:45:25 2021 +0200 @@ -21,7 +21,7 @@ CLIENT_INTERFACES_SRC_CC = $(call interfaces_to_client_cc,$(CLIENT_INTERFACES_CC)) -PLAIN_SRC_CC = client.cc file.cc +PLAIN_SRC_CC = client.cc file.cc notifier.cc # Normal definitions. diff -r 36bfd4efefb3 -r 637bdd44285e libfsclient/lib/src/client.cc --- a/libfsclient/lib/src/client.cc Fri Jul 02 23:53:13 2021 +0200 +++ b/libfsclient/lib/src/client.cc Tue Jul 06 00:45:25 2021 +0200 @@ -459,23 +459,6 @@ -/* Bind and initialise files involved with notifications. */ - -long client_wait_init(file_t *file) -{ - if (file == NULL) - return -L4_EINVAL; - - long err = file_notify_bind_file(file); - - if (err) - return err; - - ipc_init_irq(file->irq); - - return L4_EOK; -} - /* Wait for events involving a specific file. */ long client_wait_file(file_t *file) diff -r 36bfd4efefb3 -r 637bdd44285e libfsclient/lib/src/file.cc --- a/libfsclient/lib/src/file.cc Fri Jul 02 23:53:13 2021 +0200 +++ b/libfsclient/lib/src/file.cc Tue Jul 06 00:45:25 2021 +0200 @@ -21,19 +21,12 @@ #include #include -#include - -#include - -#include -#include #include #include "dataspace_client.h" #include "file_client.h" #include "flush_client.h" -#include "notification_client.h" #include "opener_client.h" #include "opener_context_client.h" #include "pipe_client.h" @@ -41,6 +34,7 @@ #include "mapped_file_client.h" #include "file.h" +#include "notifier.h" @@ -73,7 +67,7 @@ { file->memory = NULL; file->ref = L4_INVALID_CAP; - file->irq = L4_INVALID_CAP; + file->notifier = L4_INVALID_CAP; file->start_pos = 0; file->end_pos = 0; file->data_end = 0; @@ -81,6 +75,7 @@ file->can_mmap = 1; file->has_size = 1; file->can_block = 0; + file->notifications = 0; } @@ -92,8 +87,8 @@ if (l4_is_valid_cap(file->ref)) ipc_cap_free_um(file->ref); - if (l4_is_valid_cap(file->irq)) - ipc_cap_free_um(file->irq); + if (l4_is_valid_cap(file->notifier)) + ipc_cap_free_um(file->notifier); if (file->memory != NULL) ipc_detach_dataspace(file->memory); @@ -378,64 +373,48 @@ long file_notify_subscribe(file_t *file, notify_flags_t flags) { - client_Notification notify(file->ref); + FileNotifier *notifier = get_notifier(); - return notify.subscribe(flags, &file->irq); + return notifier->subscribe(file, flags); } /* Unsubscribe from notification events on a file. */ long file_notify_unsubscribe(file_t *file) { - if (l4_is_invalid_cap(file->irq)) + if (l4_is_invalid_cap(file->notifier)) return -L4_EINVAL; - l4_irq_detach(file->irq); - - client_Notification notify(file->ref); - - return notify.unsubscribe(); -} + FileNotifier *notifier = get_notifier(); -/* Bind a file IRQ to the current thread. */ - -long file_notify_bind_file(file_t *file) -{ - if (l4_is_invalid_cap(file->irq)) - return -L4_EINVAL; - - return ipc_bind_irq(file->irq, (l4_umword_t) file, pthread_l4_cap(pthread_self())); + return notifier->unsubscribe(file); } /* Wait for a notification event on a file. */ long file_notify_wait_file(file_t *file) { - long err = file_notify_bind_file(file); + file_t *affected; + + do + { + long err = file_notify_wait_files(&affected); - if (err) - return err; + if (err) + return err; + } + while (affected != file); - return l4_error(l4_irq_receive(file->irq, L4_IPC_NEVER)); + return L4_EOK; } /* Wait for notification events on files. */ long file_notify_wait_files(file_t **file) { - l4_umword_t label; - l4_msgtag_t tag = l4_ipc_wait(l4_utcb(), &label, L4_IPC_NEVER); - long err = l4_error(tag); + FileNotifier *notifier = get_notifier(); - if (err) - *file = NULL; - else - { - label = label & ~3UL; - *file = reinterpret_cast(label); - } - - return err; + return notifier->wait(file); } diff -r 36bfd4efefb3 -r 637bdd44285e libfsclient/lib/src/notifier.cc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libfsclient/lib/src/notifier.cc Tue Jul 06 00:45:25 2021 +0200 @@ -0,0 +1,205 @@ +/* + * File event notification support. + * + * Copyright (C) 2021 Paul Boddie + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation; either version 2 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA + */ + +#include +#include + +#include +#include + +#include +#include + +#include "notification_client.h" +#include "notifier.h" + + + +/* Thread-local storage workaround. */ + +static std::mutex lock; +static std::map notifiers; + +FileNotifier *get_notifier() +{ + std::lock_guard guard(lock); + + l4_cap_idx_t thread = pthread_l4_cap(pthread_self()); + FileNotifier *notifier = notifiers[thread]; + + /* Start any new notifier. */ + + if (notifier == NULL) + { + notifier = new FileNotifier; + notifiers[thread] = notifier; + notifier->start(); + } + + return notifier; +} + + + +/* Subscribe to notification events on a file. */ + +long FileNotifier::subscribe(file_t *file, notify_flags_t flags) +{ + /* Create a notification endpoint, if necessary. */ + + if (l4_is_invalid_cap(file->notifier)) + { + long err = ipc_server_new_for_thread(&file->notifier, file, _thread); + + if (err) + return err; + } + + client_Notification notify(file->ref); + + return notify.subscribe(file->notifier, flags); +} + +/* Unsubscribe from notification events on a file. */ + +long FileNotifier::unsubscribe(file_t *file) +{ + if (l4_is_invalid_cap(file->notifier)) + return -L4_EINVAL; + + ipc_cap_free_um(file->notifier); + + client_Notification notify(file->ref); + + return notify.unsubscribe(); +} + +/* Handle a notification event for a file. Ideally, this would be invoked by the + generic server dispatch mechanism, with the gate label being interpreted and + provided as the first parameter. */ + +void FileNotifier::_notify(file_t *file, notify_flags_t flags) +{ + std::unique_lock guard(_lock); + + /* Record the flags in the file object. */ + + file->notifications = flags; + _affected.push_back(file); + + /* Notify any waiting caller. */ + + _notified.notify_one(); +} + +/* Listen for notifications. */ + +void FileNotifier::mainloop() +{ + ipc_message_t msg; + l4_umword_t label; + + while (1) + { + ipc_message_wait(&msg, &label); + + /* Clear lower label bits. */ + + label = label & ~3UL; + + /* Ignore erroneous messages. */ + + if (l4_ipc_error(msg.tag, l4_utcb())) + continue; + + /* Reply to notifications. */ + + ipc_message_reply(&msg); + + /* Interpret gate labels as file objects. */ + + file_t *file = (file_t *) label; + notify_flags_t flags = ipc_message_get_word(&msg, 0); + + /* Register the notification. */ + + _notify(file, flags); + } + + ipc_message_free(&msg); +} + +/* Invoke the mainloop in a thread. */ + +static void *notifier_mainloop(void *data) +{ + FileNotifier *notifier = reinterpret_cast(data); + + notifier->mainloop(); + return 0; +} + +/* Start listening for notifications. */ + +long FileNotifier::start() +{ + if (_started) + return L4_EOK; + + pthread_t thread; + pthread_attr_t attr; + long err; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + + err = pthread_create(&thread, &attr, notifier_mainloop, this); + if (err) + return err; + + _thread = pthread_l4_cap(thread); + _started = true; + + return L4_EOK; +} + +/* Wait for notification events on files. */ + +long FileNotifier::wait(file_t **file) +{ + std::unique_lock guard(_lock); + + while (1) + { + if (!_affected.empty()) + { + *file = _affected.front(); + _affected.pop_front(); + return L4_EOK; + } + else + _notified.wait(guard); + } + + return L4_EOK; +} + +// vim: tabstop=2 expandtab shiftwidth=2 diff -r 36bfd4efefb3 -r 637bdd44285e libfsserver/include/fsserver/pipe_pager.h --- a/libfsserver/include/fsserver/pipe_pager.h Fri Jul 02 23:53:13 2021 +0200 +++ b/libfsserver/include/fsserver/pipe_pager.h Tue Jul 06 00:45:25 2021 +0200 @@ -76,7 +76,7 @@ /* Notification methods. */ - virtual long subscribe(notify_flags_t flags, l4_cap_idx_t *irq); + virtual long subscribe(l4_cap_idx_t notifier, notify_flags_t flags); virtual long unsubscribe(); }; diff -r 36bfd4efefb3 -r 637bdd44285e libfsserver/include/fsserver/pipe_paging.h --- a/libfsserver/include/fsserver/pipe_paging.h Fri Jul 02 23:53:13 2021 +0200 +++ b/libfsserver/include/fsserver/pipe_paging.h Tue Jul 06 00:45:25 2021 +0200 @@ -54,10 +54,10 @@ unsigned int _endpoints = 2; - /* Notification IRQs. */ + /* Notification endpoints. */ - l4_cap_idx_t _irqs[2]; - notify_flags_t _flags[2]; + l4_cap_idx_t _notifiers[2]; + notify_flags_t _flags[2], _deferred[2]; /* Common functionality. */ @@ -75,7 +75,7 @@ virtual void notify(bool writing, notify_flags_t flags); - virtual l4_cap_idx_t subscribe(bool writing, notify_flags_t flags); + virtual void subscribe(bool writing, l4_cap_idx_t notifier, notify_flags_t flags); virtual void unsubscribe(bool writing); diff -r 36bfd4efefb3 -r 637bdd44285e libfsserver/lib/Makefile --- a/libfsserver/lib/Makefile Fri Jul 02 23:53:13 2021 +0200 +++ b/libfsserver/lib/Makefile Tue Jul 06 00:45:25 2021 +0200 @@ -28,10 +28,14 @@ # Individual interfaces. +CLIENT_INTERFACES_CC = notifier + SERVER_INTERFACES_CC = opener pipe_opener $(call common_interfaces,$(COMP_INTERFACES_CC)) # Generated and plain source files. +CLIENT_INTERFACES_SRC_CC = $(call interfaces_to_client_cc,$(CLIENT_INTERFACES_CC)) + SERVER_INTERFACES_SRC_CC = $(call interfaces_to_server_cc,$(SERVER_INTERFACES_CC) $(COMP_INTERFACES_CC)) PLAIN_SRC_CC = \ @@ -66,6 +70,7 @@ # Normal definitions. SRC_CC = \ + $(CLIENT_INTERFACES_SRC_CC) \ $(SERVER_INTERFACES_SRC_CC) \ $(PLAIN_SRC_CC) @@ -81,4 +86,6 @@ include $(L4DIR)/mk/lib.mk include $(IDL_MK_DIR)/interface_rules.mk +$(PLAIN_SRC_CC): $(CLIENT_INTERFACES_SRC_CC) + $(PLAIN_SRC_CC): $(SERVER_INTERFACES_SRC_CC) diff -r 36bfd4efefb3 -r 637bdd44285e libfsserver/lib/pipes/pipe_pager.cc --- a/libfsserver/lib/pipes/pipe_pager.cc Fri Jul 02 23:53:13 2021 +0200 +++ b/libfsserver/lib/pipes/pipe_pager.cc Tue Jul 06 00:45:25 2021 +0200 @@ -164,12 +164,12 @@ /* Subscribe to notifications. */ -long PipePager::subscribe(notify_flags_t flags, l4_cap_idx_t *irq) +long PipePager::subscribe(l4_cap_idx_t notifier, notify_flags_t flags) { /* Readers can subscribe to new data (at end), and pipe closed events. Writers can subscribe to new space and pipe closed events. */ - *irq = _paging->subscribe(_writing, flags); + _paging->subscribe(_writing, notifier, flags); return L4_EOK; } diff -r 36bfd4efefb3 -r 637bdd44285e libfsserver/lib/pipes/pipe_paging.cc --- a/libfsserver/lib/pipes/pipe_paging.cc Fri Jul 02 23:53:13 2021 +0200 +++ b/libfsserver/lib/pipes/pipe_paging.cc Tue Jul 06 00:45:25 2021 +0200 @@ -19,15 +19,14 @@ * Boston, MA 02110-1301, USA */ -#include - #include -#include #include #include "page_queue_partitioned.h" #include "pipe_paging.h" +#include "notifier_client.h" + PipePaging::PipePaging(Memory *memory, offset_t size) @@ -44,40 +43,50 @@ for (unsigned int i = 0; i < 2; i++) _regions[i] = NULL; - /* Initialise IRQ objects and flags for notifications. */ + /* Initialise endpoints and flags for notifications. */ for (unsigned int i = 0; i < 2; i++) { - _irqs[i] = L4_INVALID_CAP; + _notifiers[i] = L4_INVALID_CAP; _flags[i] = 0; + _deferred[i] = 0; } } -/* Create an IRQ to subscribe to an endpoint's notifications. */ +/* Subscribe to an endpoint's notifications using a notification endpoint. */ -l4_cap_idx_t PipePaging::subscribe(bool writing, notify_flags_t flags) +void PipePaging::subscribe(bool writing, l4_cap_idx_t notifier, notify_flags_t flags) { int i = writing ? 1 : 0; - if (l4_is_invalid_cap(_irqs[i])) - ipc_create_irq(&_irqs[i]); + if (l4_is_valid_cap(_notifiers[i])) + unsubscribe(writing); + _notifiers[i] = notifier; _flags[i] = flags; - return _irqs[i]; + /* Send deferred conditions on behalf of the other endpoint held before + subscription occurred. */ + + if (_deferred[i]) + { + notify(!writing, _deferred[i]); + _deferred[i] = 0; + } } -/* Release any IRQ used for an endpoint's notifications. */ +/* Unsubscribe from an endpoint's notifications. */ void PipePaging::unsubscribe(bool writing) { int i = writing ? 1 : 0; - if (l4_is_valid_cap(_irqs[i])) + if (l4_is_valid_cap(_notifiers[i])) { - ipc_cap_free_um(_irqs[i]); - _irqs[i] = L4_INVALID_CAP; + ipc_cap_free_um(_notifiers[i]); + _notifiers[i] = L4_INVALID_CAP; _flags[i] = 0; + _deferred[i] = 0; } } @@ -89,8 +98,20 @@ int i = writing ? 0 : 1; - if (l4_is_valid_cap(_irqs[i]) && (flags & _flags[i])) - l4_irq_trigger(_irqs[i]); + /* Notify the other endpoint or hold any notification for potential future + subscription. */ + + if (l4_is_valid_cap(_notifiers[i])) + { + if (flags & _flags[i]) + { + client_Notifier notifier(_notifiers[i]); + + notifier.notify(flags & _flags[i]); + } + } + else + _deferred[i] = flags; } /* Return whether one or more endpoints have detached. */ @@ -131,14 +152,14 @@ for (unsigned int i = 0; i < 2; i++) discard_region(i); - /* Release IRQs. */ + /* Release notifiers. */ for (unsigned int i = 0; i < 2; i++) { - if (l4_is_valid_cap(_irqs[i])) + if (l4_is_valid_cap(_notifiers[i])) { - ipc_cap_free_um(_irqs[i]); - _irqs[i] = L4_INVALID_CAP; + ipc_cap_free_um(_notifiers[i]); + _notifiers[i] = L4_INVALID_CAP; } } diff -r 36bfd4efefb3 -r 637bdd44285e tests/dstest_pipe_client.cc --- a/tests/dstest_pipe_client.cc Fri Jul 02 23:53:13 2021 +0200 +++ b/tests/dstest_pipe_client.cc Tue Jul 06 00:45:25 2021 +0200 @@ -41,10 +41,19 @@ /* Use the writer to fill the pipe with data. */ -static void write_pipe(file_t *writer) +static void write_pipe(file_t *writer, int number) { offset_t size = 600; char buffer[size]; + long err; + + /* Make writers blocking to permit synchronisation. */ + + if ((err = client_set_blocking(writer, NOTIFY_SPACE_AVAILABLE))) + { + printf("Could not set pipe #%d as blocking: %s\n", number, l4sys_errtostr(err)); + return; + } for (int loop = 0; loop < 3; loop++) { @@ -54,7 +63,7 @@ offset_t nwritten = client_write(writer, buffer, size); - printf("Written %ld/%ld in #%d of %d/%d to pipe...\n", nwritten, size, region, loop, 2); + printf("Written %ld/%ld in #%d of %d/%d to pipe #%d...\n", nwritten, size, region, loop, 2, number); } sleep(1); @@ -74,23 +83,28 @@ offset_t size = 600, totals[] = {0, 0}; bool active[] = {true, true}; int num_active = 2; - char buffer[size]; - offset_t nread; + long err; file_t *reader; - long err; + + /* Register the readers for notification. */ + + // NOTE: Use the flags to detect initial conditions! - if ((err = client_wait_init(reader1)) || (err = client_wait_init(reader2))) + if ((err = client_subscribe(reader1, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED)) || + (err = client_subscribe(reader2, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED))) { - printf("Could not initialise waiting for files: %s\n", l4sys_errtostr(err)); + printf("Could not subscribe to notifications: %s\n", l4sys_errtostr(err)); return; } while (1) { + char buffer[size]; + offset_t nread; + /* Wait for notification of content. */ - printf("Waiting...\n"); - long err = client_wait_files(&reader); + err = client_wait_files(&reader); if (err) { @@ -122,11 +136,11 @@ } } - do + while (nread) { totals[p] += nread; - printf("Read %ld/%ld, total %ld, from pipe #%d...\n", nread, size, totals[p], p + 1); + printf("Read %ld/%ld, total %ld, first %c, last %c, from pipe #%d...\n", nread, size, totals[p], *buffer, *(buffer + nread - 1), p + 1); #if 0 for (offset_t i = 0; i < nread; i += 60) { @@ -136,7 +150,6 @@ #endif nread = client_read(reader, buffer, size); } - while (nread); } printf("Data shown.\n"); @@ -160,31 +173,13 @@ return 1; } - /* Register the readers for notification. */ - - if ((err = client_subscribe(&reader1, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED)) || - (err = client_subscribe(&reader2, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED))) - { - printf("Could not subscribe to notifications: %s\n", l4sys_errtostr(err)); - return 1; - } - - /* Make the writers blocking to permit synchronisation. */ - - if ((err = client_set_blocking(&writer1, NOTIFY_SPACE_AVAILABLE)) || - (err = client_set_blocking(&writer2, NOTIFY_SPACE_AVAILABLE))) - { - printf("Could not set as blocking: %s\n", l4sys_errtostr(err)); - return 1; - } - /* Schedule reader and writer threads. */ std::thread *activities[3]; activities[0] = new std::thread(read_pipes, &reader1, &reader2); - activities[1] = new std::thread(write_pipe, &writer1); - activities[2] = new std::thread(write_pipe, &writer2); + activities[1] = new std::thread(write_pipe, &writer1, 1); + activities[2] = new std::thread(write_pipe, &writer2, 2); for (int i = 0; i < 3; i++) activities[i]->join();