# HG changeset patch # User Paul Boddie # Date 1621292120 -7200 # Node ID 1463eae1b2df1789c5c912a21773ba872357fd90 # Parent 9a0450379ec780ba9b8d87d651499fb194f33afe Added initial support for notifications and blocking operations on pipes. Introduced usage of the flush and notification interfaces. diff -r 9a0450379ec7 -r 1463eae1b2df libfsclient/include/fsclient/client.h --- a/libfsclient/include/fsclient/client.h Sat May 15 23:38:39 2021 +0200 +++ b/libfsclient/include/fsclient/client.h Tue May 18 00:55:20 2021 +0200 @@ -27,7 +27,7 @@ EXTERN_C_BEGIN -/* File operations. */ +/* Opening and closing operations. */ void client_close(file_t *file); file_t *client_open(const char *name, flags_t flags); @@ -54,6 +54,13 @@ offset_t client_seek(file_t *file, offset_t offset, int whence); long client_tell(file_t *file); +/* Notification operations. */ + +long client_set_blocking(file_t *file, int can_block); +long client_subscribe(file_t *file, flags_t flags); +long client_unsubscribe(file_t *file); +long client_wait(file_t *file); + EXTERN_C_END // vim: tabstop=2 expandtab shiftwidth=2 diff -r 9a0450379ec7 -r 1463eae1b2df libfsclient/include/fsclient/file.h --- a/libfsclient/include/fsclient/file.h Sat May 15 23:38:39 2021 +0200 +++ b/libfsclient/include/fsclient/file.h Tue May 18 00:55:20 2021 +0200 @@ -37,6 +37,10 @@ l4_cap_idx_t ref; + /* Notification IRQ reference. */ + + l4_cap_idx_t irq; + /* Mapped memory accessing a file region. */ char *memory; @@ -59,6 +63,10 @@ int has_size; + /* Blocking accesses. */ + + int can_block; + } file_t; @@ -103,6 +111,12 @@ void file_data_read(file_t *file, char *buf, offset_t to_transfer); void file_data_write(file_t *file, char *buf, offset_t to_transfer); +/* Notification functions. */ + +long file_notify_subscribe(file_t *file, flags_t flags); +long file_notify_unsubscribe(file_t *file); +long file_notify_wait(file_t *file); + /* Pipe operations. */ @@ -115,6 +129,8 @@ long pipe_next(file_t *pipe); long pipe_written(file_t *pipe, offset_t size); + + EXTERN_C_END // vim: tabstop=2 expandtab shiftwidth=2 diff -r 9a0450379ec7 -r 1463eae1b2df libfsclient/lib/src/Makefile --- a/libfsclient/lib/src/Makefile Sat May 15 23:38:39 2021 +0200 +++ b/libfsclient/lib/src/Makefile Tue May 18 00:55:20 2021 +0200 @@ -15,7 +15,7 @@ # Individual interfaces. -CLIENT_INTERFACES_CC = dataspace file mapped_file opener opener_context pipe pipe_opener +CLIENT_INTERFACES_CC = dataspace file flush mapped_file notification opener opener_context pipe pipe_opener # Generated and plain source files. diff -r 9a0450379ec7 -r 1463eae1b2df libfsclient/lib/src/client.cc --- a/libfsclient/lib/src/client.cc Sat May 15 23:38:39 2021 +0200 +++ b/libfsclient/lib/src/client.cc Tue May 18 00:55:20 2021 +0200 @@ -34,6 +34,114 @@ +/* Access the given position and synchronise state with the file object. */ + +static long _access(file_t *file, offset_t position) +{ + long err; + + if (file->can_mmap) + { + /* Where the position is outside the current region, re-map. */ + + if ((position < file->start_pos) || (position >= file->end_pos)) + { + if (file_mmap(file, position, file_span(file))) + return -L4_EIO; + } + + /* Otherwise, flush any written data in the current region and update the + file size details. */ + + else + { + err = client_flush(file); + if (err) + return err; + } + } + else + { + /* Strict conditions for region navigation in pipes. */ + + if ((position < file->start_pos) || (position > file->end_pos)) + { + return -L4_EIO; + } + + /* The next region is only available at the end of the mapped memory. */ + + else if (position == file->end_pos) + { + err = client_next_region(file); + if (err) + return err; + + file->data_current = 0; + return L4_EOK; + } + + /* Within the current pipe region, synchronise with the pipe object. */ + + else + { + err = client_current_region(file); + if (err) + return err; + } + } + + /* Update the current data offset. */ + + file->data_current = position - file->start_pos; + + return L4_EOK; +} + + + +/* Return whether an access could occur, blocking if necessary. */ + +static int _access_blocking(file_t *file, offset_t position) +{ + long err; + + while ((err = _access(file, position))) + { + /* Exit if blocking is not configured or suitable. */ + + if ((err != -L4_EBUSY) || !file->can_block) + return 0; + + /* Handle an inability to access by blocking, exiting if waiting failed. */ + + if (client_wait(file)) + return 0; + } + + return 1; +} + + + +/* Ensure that memory is mapped for accessing the given file, using the + indicated count as a region size hint. */ + +static void *_map_memory(file_t *file, offset_t count) +{ + if (file->memory == NULL) + { + if (file->can_mmap) + return client_mmap(file, client_tell(file), count); + else if (pipe_current(file)) + return NULL; + } + + return file->memory; +} + + + /* Close a filesystem object. */ void client_close(file_t *file) @@ -108,72 +216,6 @@ -/* Access the given position and synchronise state with the file object. */ - -static long _access(file_t *file, offset_t position) -{ - long err; - - if (file->can_mmap) - { - /* Where the position is outside the current region, re-map. */ - - if ((position < file->start_pos) || (position >= file->end_pos)) - { - if (file_mmap(file, position, file_span(file))) - return -L4_EIO; - } - - /* Otherwise, flush any written data in the current region and update the - file size details. */ - - else - { - err = client_flush(file); - if (err) - return err; - } - } - else - { - /* Strict conditions for region navigation in pipes. */ - - if ((position < file->start_pos) || (position > file->end_pos)) - { - return -L4_EIO; - } - - /* The next region is only available at the end of the mapped memory. */ - - else if (position == file->end_pos) - { - err = client_next_region(file); - if (err) - return err; - } - - /* Within the current pipe region, synchronise with the pipe object. */ - - else - { - err = client_current_region(file); - if (err) - return err; - } - } - - /* Update the current data offset. */ - - if (file->has_size) - file->data_current = position - file->start_pos; - else - file->data_current = 0; - - return L4_EOK; -} - - - /* Flush data explicitly to the filesystem object. */ long client_flush(file_t *file) @@ -224,24 +266,6 @@ -/* Ensure that memory is mapped for accessing the given file, using the - indicated count as a region size hint. */ - -static void *_map_memory(file_t *file, offset_t count) -{ - if (file->memory == NULL) - { - if (file->can_mmap) - return client_mmap(file, client_tell(file), count); - else if (pipe_current(file)) - return NULL; - } - - return file->memory; -} - - - /* Read from the filesystem object into the buffer provided. */ offset_t client_read(file_t *file, void *buf, offset_t count) @@ -268,7 +292,7 @@ /* Flush any unwritten data, preparing to read from the file position at the end of the data, and returning if no new data is available. */ - if (_access(file, file_data_end_position(file))) + if (!_access_blocking(file, file_data_end_position(file))) break; available = file_data_available(file); @@ -323,7 +347,7 @@ default: /* NOTE: Set errno to EINVAL. */ - return -1; + return current; } /* Retain the current position if unchanged. */ @@ -365,12 +389,53 @@ /* Handle unwritten data and reset the buffer for reading. */ - _access(file, position); + if (_access(file, position)) + return current; + return position; } +/* Set or unset blocking access for a file. */ + +long client_set_blocking(file_t *file, int can_block) +{ + long err; + + if (file->can_block == can_block) + return L4_EOK; + + // NOTE: Set appropriate flags. + + if (can_block) + err = client_subscribe(file, 0); + else + err = client_unsubscribe(file); + + if (err) + return err; + + file->can_block = can_block; + return L4_EOK; +} + + + +/* Subscribe from events concerning a file. */ + +long client_subscribe(file_t *file, flags_t flags) +{ + if (file == NULL) + return -L4_EINVAL; + + return file_notify_subscribe(file, flags); +} + + + +/* Return the current position in the file. */ + long client_tell(file_t *file) { if (file == NULL) @@ -381,6 +446,30 @@ +/* Unsubscribe from events concerning a file. */ + +long client_unsubscribe(file_t *file) +{ + if (file == NULL) + return -L4_EINVAL; + + return file_notify_unsubscribe(file); +} + + + +/* Register for events concerning a file. */ + +long client_wait(file_t *file) +{ + if (file == NULL) + return -L4_EINVAL; + + return file_notify_wait(file); +} + + + /* Write to the filesystem object from the buffer provided. */ offset_t client_write(file_t *file, const void *buf, offset_t count) @@ -424,7 +513,7 @@ /* Flush any unwritten data and continue writing from the current data position. */ - if (_access(file, file_data_current_position(file))) + if (!_access_blocking(file, file_data_current_position(file))) break; space = file_data_space(file); diff -r 9a0450379ec7 -r 1463eae1b2df libfsclient/lib/src/file.cc --- a/libfsclient/lib/src/file.cc Sat May 15 23:38:39 2021 +0200 +++ b/libfsclient/lib/src/file.cc Tue May 18 00:55:20 2021 +0200 @@ -21,11 +21,19 @@ #include #include +#include + +#include + +#include +#include #include #include "dataspace_client.h" #include "file_client.h" +#include "flush_client.h" +#include "notification_client.h" #include "opener_client.h" #include "opener_context_client.h" #include "pipe_client.h" @@ -65,12 +73,14 @@ { file->memory = NULL; file->ref = L4_INVALID_CAP; + file->irq = L4_INVALID_CAP; file->start_pos = 0; file->end_pos = 0; file->data_end = 0; file->data_current = 0; file->can_mmap = 1; file->has_size = 1; + file->can_block = 0; } @@ -82,6 +92,9 @@ if (l4_is_valid_cap(file->ref)) ipc_cap_free_um(file->ref); + if (l4_is_valid_cap(file->irq)) + ipc_cap_free_um(file->irq); + if (file->memory != NULL) ipc_detach_dataspace(file->memory); @@ -163,7 +176,7 @@ long file_flush(file_t *file) { - client_File _file(file->ref); + client_Flush _file(file->ref); long err = _file.flush(file->data_current, &file->size); if (err) @@ -361,6 +374,41 @@ +/* Subscribe to notification events on a file. */ + +long file_notify_subscribe(file_t *file, flags_t flags) +{ + client_Notification notify(file->ref); + + return notify.subscribe(flags, &file->irq); +} + +/* Unsubscribe from notification events on a file. */ + +long file_notify_unsubscribe(file_t *file) +{ + client_Notification notify(file->ref); + + return notify.unsubscribe(); +} + +/* Wait for notification events on a file. */ + +long file_notify_wait(file_t *file) +{ + long err = ipc_bind_irq(file->irq, (l4_umword_t) &file->irq, pthread_l4_cap(pthread_self())); + + if (err) + return err; + + l4_msgtag_t tag = l4_irq_receive(file->irq, L4_IPC_NEVER); + + l4_irq_detach(file->irq); + return l4_error(tag); +} + + + /* Open two pipe endpoints using the given pipe server. */ long pipe_open(offset_t size, file_t *reader, file_t *writer, l4_cap_idx_t server) @@ -401,19 +449,20 @@ { client_Pipe _pipe(pipe->ref); long err = _pipe.current_region(&pipe->data_end, &pipe->size); - char *memory = pipe->memory; if (err) return err; pipe->end_pos = pipe->size; - err = ipc_attach_dataspace(pipe->ref, file_span(pipe), (void **) &pipe->memory); - if (err) - return err; + /* Attach memory if necessary. */ - if (memory != NULL) - ipc_detach_dataspace(memory); + if (pipe->memory == NULL) + { + err = ipc_attach_dataspace(pipe->ref, file_span(pipe), (void **) &pipe->memory); + if (err) + return err; + } return L4_EOK; } diff -r 9a0450379ec7 -r 1463eae1b2df libfsserver/include/fsserver/pipe_pager.h --- a/libfsserver/include/fsserver/pipe_pager.h Sat May 15 23:38:39 2021 +0200 +++ b/libfsserver/include/fsserver/pipe_pager.h Tue May 18 00:55:20 2021 +0200 @@ -64,9 +64,21 @@ /* Pipe methods. */ + virtual long closed(int *closed); + virtual long current_region(offset_t *populated_size, offset_t *size); virtual long next_region(offset_t *populated_size, offset_t *size); + + /* Flushing/synchronisation. */ + + virtual long flush(offset_t populated_size, offset_t *size); + + /* Notification methods. */ + + virtual long subscribe(flags_t flags, l4_cap_idx_t *irq); + + virtual long unsubscribe(); }; // vim: tabstop=4 expandtab shiftwidth=4 diff -r 9a0450379ec7 -r 1463eae1b2df libfsserver/include/fsserver/pipe_paging.h --- a/libfsserver/include/fsserver/pipe_paging.h Sat May 15 23:38:39 2021 +0200 +++ b/libfsserver/include/fsserver/pipe_paging.h Tue May 18 00:55:20 2021 +0200 @@ -54,6 +54,10 @@ unsigned int _endpoints = 2; + /* Notification IRQs. */ + + l4_cap_idx_t _irqs[2]; + public: explicit PipePaging(Memory *memory, offset_t size); @@ -62,6 +66,14 @@ virtual offset_t region_size() { return _size; } + /* Notification support. */ + + virtual void notify(bool writing); + + virtual l4_cap_idx_t subscribe(bool writing); + + virtual void unsubscribe(bool writing); + /* Region management. */ virtual PageMapper *add_region(); diff -r 9a0450379ec7 -r 1463eae1b2df libfsserver/lib/Makefile --- a/libfsserver/lib/Makefile Sat May 15 23:38:39 2021 +0200 +++ b/libfsserver/lib/Makefile Tue May 18 00:55:20 2021 +0200 @@ -16,13 +16,13 @@ # Compound interfaces. mapped_file_object_NAME = MappedFileObject -mapped_file_object_INTERFACES = dataspace file mapped_file +mapped_file_object_INTERFACES = dataspace file flush mapped_file opener_context_object_NAME = OpenerContextObject opener_context_object_INTERFACES = dataspace opener_context pipe_object_NAME = PipeObject -pipe_object_INTERFACES = dataspace pipe +pipe_object_INTERFACES = dataspace flush notification pipe COMP_INTERFACES_CC = mapped_file_object opener_context_object pipe_object diff -r 9a0450379ec7 -r 1463eae1b2df libfsserver/lib/pipes/pipe_pager.cc --- a/libfsserver/lib/pipes/pipe_pager.cc Sat May 15 23:38:39 2021 +0200 +++ b/libfsserver/lib/pipes/pipe_pager.cc Tue May 18 00:55:20 2021 +0200 @@ -55,6 +55,10 @@ void PipePager::close() { _paging->detach(); + + /* Notify the other endpoint. */ + + _paging->notify(_writing); } /* Support paging. */ @@ -66,6 +70,14 @@ +/* Return whether the pipe is closed or partly closed. */ + +long PipePager::closed(int *closed) +{ + *closed = _paging->closed(); + return L4_EOK; +} + /* Return details of the current region. */ long PipePager::current_region(offset_t *populated_size, offset_t *size) @@ -131,4 +143,37 @@ return -L4_EBUSY; } + + +/* Update the populated size of a pipe region and notify the other endpoint. */ + +long PipePager::flush(offset_t populated_size, offset_t *size) +{ + if (_mapper != NULL) + _mapper->set_data_size(populated_size); + + *size = _size; + + _paging->notify(_writing); + return L4_EOK; +} + + + +/* Subscribe to notifications. */ + +long PipePager::subscribe(flags_t flags, l4_cap_idx_t *irq) +{ + // NOTE: Need to interpret flags. + + *irq = _paging->subscribe(_writing); + return L4_EOK; +} + +long PipePager::unsubscribe() +{ + _paging->unsubscribe(_writing); + return L4_EOK; +} + // vim: tabstop=4 expandtab shiftwidth=4 diff -r 9a0450379ec7 -r 1463eae1b2df libfsserver/lib/pipes/pipe_paging.cc --- a/libfsserver/lib/pipes/pipe_paging.cc Sat May 15 23:38:39 2021 +0200 +++ b/libfsserver/lib/pipes/pipe_paging.cc Tue May 18 00:55:20 2021 +0200 @@ -19,6 +19,10 @@ * Boston, MA 02110-1301, USA */ +#include + +#include +#include #include #include @@ -40,6 +44,48 @@ for (unsigned int i = 0; i < 2; i++) _regions[i] = NULL; + + /* Initialise IRQ objects for notifications. */ + + for (unsigned int i = 0; i < 2; i++) + _irqs[i] = L4_INVALID_CAP; +} + +/* Create an IRQ to subscribe to an endpoint's notifications. */ + +l4_cap_idx_t PipePaging::subscribe(bool writing) +{ + int i = writing ? 1 : 0; + + if (l4_is_invalid_cap(_irqs[i])) + ipc_create_irq(&_irqs[i]); + + return _irqs[i]; +} + +/* Release any IRQ used for an endpoint's notifications. */ + +void PipePaging::unsubscribe(bool writing) +{ + int i = writing ? 1 : 0; + + if (l4_is_valid_cap(_irqs[i])) + { + ipc_cap_free_um(_irqs[i]); + _irqs[i] = L4_INVALID_CAP; + } +} + +/* Notify the other endpoint. */ + +void PipePaging::notify(bool writing) +{ + /* Let the writer notify the reader, and the other way round. */ + + int i = writing ? 0 : 1; + + if (l4_is_valid_cap(_irqs[i])) + l4_irq_trigger(_irqs[i]); } /* Return whether one or more endpoints have detached. */ @@ -77,6 +123,17 @@ } } + /* Release IRQs. */ + + for (unsigned int i = 0; i < 2; i++) + { + if (l4_is_valid_cap(_irqs[i])) + { + ipc_cap_free_um(_irqs[i]); + _irqs[i] = L4_INVALID_CAP; + } + } + /* Delete the page collection and related objects. */ delete _pages; @@ -108,6 +165,13 @@ mapper->set_data_size(0); _regions[_writing] = mapper; + + /* Let the writer notify the reader. */ + + notify(true); + + /* Return the next region's mapper. */ + return mapper; } @@ -144,6 +208,10 @@ _reading = 1 - _reading; + /* Let the reader notify the writer. */ + + notify(false); + /* Return the next region's mapper. */ return _regions[_reading]; diff -r 9a0450379ec7 -r 1463eae1b2df tests/dstest_pipe_client.cc --- a/tests/dstest_pipe_client.cc Sat May 15 23:38:39 2021 +0200 +++ b/tests/dstest_pipe_client.cc Tue May 18 00:55:20 2021 +0200 @@ -22,6 +22,8 @@ #include #include +#include + #include #include #include @@ -32,8 +34,59 @@ +/* Minimum pipe region size in pages. */ + const unsigned int PIPE_PAGES = 2; +/* Use the writer to fill the pipe with data. */ + +static void write_pipe(file_t *writer) +{ + offset_t size = 600; + char buffer[size]; + + for (int loop = 0; loop < 3; loop++) + { + for (int region = 0; region < 26; region++) + { + memset(buffer, (int) 'a' + region, size); + + offset_t nwritten = client_write(writer, buffer, size); + + printf("Written %ld/%ld in #%d of %d/%d to pipe...\n", nwritten, size, region, loop, 2); + } + } + + /* Flush to make the final output available. */ + + client_flush(writer); +} + +/* Use the reader to obtain data from the pipe. */ + +static void read_pipe(file_t *reader) +{ + offset_t size = 600; + char buffer[size]; + offset_t nread; + + do + { + nread = client_read(reader, buffer, size); + + printf("Read %ld/%ld from pipe...\n", nread, size); + + for (offset_t i = 0; i < nread; i += 60) + { + fwrite(buffer + i, sizeof(char), nread - i > 60 ? 60 : nread - i, stdout); + fputs("\n", stdout); + } + } + while (nread); + + printf("Data shown.\n"); +} + int main(void) { /* Obtain access to the filesystem. */ @@ -51,63 +104,23 @@ return 1; } - /* Use the writer to fill the pipe with data. */ - - offset_t size = 600; - char buffer[size]; - int region = 0; - - for (int loop = 0; loop < 3; loop++) - { - while (1) - { - printf("Writing %ld to pipe...\n", size); - - memset(buffer, (int) 'a' + region, size); - - offset_t nwritten = client_write(&writer, buffer, size); - - printf("Written %ld to pipe...\n", nwritten); - - for (offset_t i = 0; i < nwritten; i += 60) - { - fwrite(buffer + i, sizeof(char), nwritten - i > 60 ? 60 : nwritten - i, stdout); - fputs("\n", stdout); - } + /* Make the reader and writer blocking to permit synchronisation. */ - if (!nwritten) - break; - - region++; - - if (region == 26) - region = 0; - } - - /* Use the reader to obtain data from the pipe. */ - - offset_t nread; - - do - { - printf("Reading %ld from pipe...\n", size); - - nread = client_read(&reader, buffer, size); - - printf("Read %ld from pipe...\n", nread); - - for (offset_t i = 0; i < nread; i += 60) - { - fwrite(buffer + i, sizeof(char), nread - i > 60 ? 60 : nread - i, stdout); - fputs("\n", stdout); - } - } - while (nread); + if ((err = client_set_blocking(&reader, true)) || (err = client_set_blocking(&writer, true))) + { + printf("Could not set as blocking: %s\n", l4sys_errtostr(err)); + return 1; } - printf("Data shown.\n"); + /* Schedule reader and writer threads. */ + + std::thread *activities[2]; - return 0; + activities[0] = new std::thread(read_pipe, &reader); + activities[1] = new std::thread(write_pipe, &writer); + + for (int i = 0; i < 2; i++) + activities[i]->join(); } // vim: tabstop=2 expandtab shiftwidth=2