3.1 --- a/libfsclient/lib/src/client.cc Mon Jun 07 00:50:14 2021 +0200
3.2 +++ b/libfsclient/lib/src/client.cc Fri Jul 02 23:53:13 2021 +0200
3.3 @@ -20,6 +20,7 @@
3.4 */
3.5
3.6 #include <l4/re/env.h>
3.7 +#include <ipc/irq.h>
3.8
3.9 #include <stdio.h>
3.10 #include <stdlib.h>
3.11 @@ -115,7 +116,7 @@
3.12
3.13 /* Handle an inability to access by blocking, exiting if waiting failed. */
3.14
3.15 - if (client_wait(file))
3.16 + if (client_wait_file(file))
3.17 return 0;
3.18 }
3.19
3.20 @@ -458,14 +459,39 @@
3.21
3.22
3.23
3.24 -/* Register for events concerning a file. */
3.25 +/* Bind and initialise files involved with notifications. */
3.26
3.27 -long client_wait(file_t *file)
3.28 +long client_wait_init(file_t *file)
3.29 {
3.30 if (file == NULL)
3.31 return -L4_EINVAL;
3.32
3.33 - return file_notify_wait(file);
3.34 + long err = file_notify_bind_file(file);
3.35 +
3.36 + if (err)
3.37 + return err;
3.38 +
3.39 + ipc_init_irq(file->irq);
3.40 +
3.41 + return L4_EOK;
3.42 +}
3.43 +
3.44 +/* Wait for events involving a specific file. */
3.45 +
3.46 +long client_wait_file(file_t *file)
3.47 +{
3.48 + if (file == NULL)
3.49 + return -L4_EINVAL;
3.50 +
3.51 + return file_notify_wait_file(file);
3.52 +}
3.53 +
3.54 +/* Wait for events concerning files, referencing a file object if an event is
3.55 + delivered. */
3.56 +
3.57 +long client_wait_files(file_t **file)
3.58 +{
3.59 + return file_notify_wait_files(file);
3.60 }
3.61
3.62
4.1 --- a/libfsclient/lib/src/file.cc Mon Jun 07 00:50:14 2021 +0200
4.2 +++ b/libfsclient/lib/src/file.cc Fri Jul 02 23:53:13 2021 +0200
4.3 @@ -387,24 +387,55 @@
4.4
4.5 long file_notify_unsubscribe(file_t *file)
4.6 {
4.7 + if (l4_is_invalid_cap(file->irq))
4.8 + return -L4_EINVAL;
4.9 +
4.10 + l4_irq_detach(file->irq);
4.11 +
4.12 client_Notification notify(file->ref);
4.13
4.14 return notify.unsubscribe();
4.15 }
4.16
4.17 -/* Wait for notification events on a file. */
4.18 +/* Bind a file IRQ to the current thread. */
4.19 +
4.20 +long file_notify_bind_file(file_t *file)
4.21 +{
4.22 + if (l4_is_invalid_cap(file->irq))
4.23 + return -L4_EINVAL;
4.24
4.25 -long file_notify_wait(file_t *file)
4.26 + return ipc_bind_irq(file->irq, (l4_umword_t) file, pthread_l4_cap(pthread_self()));
4.27 +}
4.28 +
4.29 +/* Wait for a notification event on a file. */
4.30 +
4.31 +long file_notify_wait_file(file_t *file)
4.32 {
4.33 - long err = ipc_bind_irq(file->irq, (l4_umword_t) &file->irq, pthread_l4_cap(pthread_self()));
4.34 + long err = file_notify_bind_file(file);
4.35
4.36 if (err)
4.37 return err;
4.38
4.39 - l4_msgtag_t tag = l4_irq_receive(file->irq, L4_IPC_NEVER);
4.40 + return l4_error(l4_irq_receive(file->irq, L4_IPC_NEVER));
4.41 +}
4.42 +
4.43 +/* Wait for notification events on files. */
4.44
4.45 - l4_irq_detach(file->irq);
4.46 - return l4_error(tag);
4.47 +long file_notify_wait_files(file_t **file)
4.48 +{
4.49 + l4_umword_t label;
4.50 + l4_msgtag_t tag = l4_ipc_wait(l4_utcb(), &label, L4_IPC_NEVER);
4.51 + long err = l4_error(tag);
4.52 +
4.53 + if (err)
4.54 + *file = NULL;
4.55 + else
4.56 + {
4.57 + label = label & ~3UL;
4.58 + *file = reinterpret_cast<file_t *>(label);
4.59 + }
4.60 +
4.61 + return err;
4.62 }
4.63
4.64
5.1 --- a/tests/dstest_pipe_client.cc Mon Jun 07 00:50:14 2021 +0200
5.2 +++ b/tests/dstest_pipe_client.cc Fri Jul 02 23:53:13 2021 +0200
5.3 @@ -27,6 +27,7 @@
5.4 #include <stdio.h>
5.5 #include <string.h>
5.6 #include <stdlib.h>
5.7 +#include <unistd.h> /* sleep */
5.8
5.9 #include <fsclient/client.h>
5.10 #include <fsclient/file.h>
5.11 @@ -55,34 +56,88 @@
5.12
5.13 printf("Written %ld/%ld in #%d of %d/%d to pipe...\n", nwritten, size, region, loop, 2);
5.14 }
5.15 +
5.16 + sleep(1);
5.17 }
5.18
5.19 /* Flush to make the final output available. */
5.20
5.21 client_flush(writer);
5.22 +
5.23 + file_close(writer);
5.24 }
5.25
5.26 /* Use the reader to obtain data from the pipe. */
5.27
5.28 -static void read_pipe(file_t *reader)
5.29 +static void read_pipes(file_t *reader1, file_t *reader2)
5.30 {
5.31 - offset_t size = 600;
5.32 + offset_t size = 600, totals[] = {0, 0};
5.33 + bool active[] = {true, true};
5.34 + int num_active = 2;
5.35 char buffer[size];
5.36 offset_t nread;
5.37 + file_t *reader;
5.38 + long err;
5.39
5.40 - do
5.41 + if ((err = client_wait_init(reader1)) || (err = client_wait_init(reader2)))
5.42 + {
5.43 + printf("Could not initialise waiting for files: %s\n", l4sys_errtostr(err));
5.44 + return;
5.45 + }
5.46 +
5.47 + while (1)
5.48 {
5.49 + /* Wait for notification of content. */
5.50 +
5.51 + printf("Waiting...\n");
5.52 + long err = client_wait_files(&reader);
5.53 +
5.54 + if (err)
5.55 + {
5.56 + printf("Error waiting for notifications: %s\n", l4sys_errtostr(err));
5.57 + return;
5.58 + }
5.59 +
5.60 + if ((reader != reader1) && (reader != reader2))
5.61 + {
5.62 + printf("Spurious notification received for %p versus %p, %p.\n", reader, reader1, reader2);
5.63 + continue;
5.64 + }
5.65 +
5.66 nread = client_read(reader, buffer, size);
5.67
5.68 - printf("Read %ld/%ld from pipe...\n", nread, size);
5.69 + // NOTE: Should really be testing for the condition somehow.
5.70
5.71 - for (offset_t i = 0; i < nread; i += 60)
5.72 + int p = reader == reader1 ? 0 : 1;
5.73 +
5.74 + if (!nread)
5.75 {
5.76 - fwrite(buffer + i, sizeof(char), nread - i > 60 ? 60 : nread - i, stdout);
5.77 - fputs("\n", stdout);
5.78 + if (active[p])
5.79 + {
5.80 + active[p] = false;
5.81 + num_active--;
5.82 +
5.83 + if (!num_active)
5.84 + break;
5.85 + }
5.86 }
5.87 +
5.88 + do
5.89 + {
5.90 + totals[p] += nread;
5.91 +
5.92 + printf("Read %ld/%ld, total %ld, from pipe #%d...\n", nread, size, totals[p], p + 1);
5.93 +#if 0
5.94 + for (offset_t i = 0; i < nread; i += 60)
5.95 + {
5.96 + fwrite(buffer + i, sizeof(char), nread - i > 60 ? 60 : nread - i, stdout);
5.97 + fputs("\n", stdout);
5.98 + }
5.99 +#endif
5.100 + nread = client_read(reader, buffer, size);
5.101 + }
5.102 + while (nread);
5.103 }
5.104 - while (nread);
5.105
5.106 printf("Data shown.\n");
5.107 }
5.108 @@ -95,19 +150,29 @@
5.109
5.110 /* Invoke the open method to receive the file reference. */
5.111
5.112 - file_t reader, writer;
5.113 - long err = pipe_open(page(PIPE_PAGES), &reader, &writer, server);
5.114 + file_t reader1, reader2, writer1, writer2;
5.115 + long err = pipe_open(page(PIPE_PAGES), &reader1, &writer1, server) ||
5.116 + pipe_open(page(PIPE_PAGES), &reader2, &writer2, server);
5.117
5.118 if (err)
5.119 {
5.120 - printf("Could not obtain pipe: %s\n", l4sys_errtostr(err));
5.121 + printf("Could not obtain pipes: %s\n", l4sys_errtostr(err));
5.122 return 1;
5.123 }
5.124
5.125 - /* Make the reader and writer blocking to permit synchronisation. */
5.126 + /* Register the readers for notification. */
5.127
5.128 - if ((err = client_set_blocking(&reader, NOTIFY_CONTENT_AVAILABLE)) ||
5.129 - (err = client_set_blocking(&writer, NOTIFY_SPACE_AVAILABLE)))
5.130 + if ((err = client_subscribe(&reader1, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED)) ||
5.131 + (err = client_subscribe(&reader2, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED)))
5.132 + {
5.133 + printf("Could not subscribe to notifications: %s\n", l4sys_errtostr(err));
5.134 + return 1;
5.135 + }
5.136 +
5.137 + /* Make the writers blocking to permit synchronisation. */
5.138 +
5.139 + if ((err = client_set_blocking(&writer1, NOTIFY_SPACE_AVAILABLE)) ||
5.140 + (err = client_set_blocking(&writer2, NOTIFY_SPACE_AVAILABLE)))
5.141 {
5.142 printf("Could not set as blocking: %s\n", l4sys_errtostr(err));
5.143 return 1;
5.144 @@ -115,12 +180,13 @@
5.145
5.146 /* Schedule reader and writer threads. */
5.147
5.148 - std::thread *activities[2];
5.149 + std::thread *activities[3];
5.150
5.151 - activities[0] = new std::thread(read_pipe, &reader);
5.152 - activities[1] = new std::thread(write_pipe, &writer);
5.153 + activities[0] = new std::thread(read_pipes, &reader1, &reader2);
5.154 + activities[1] = new std::thread(write_pipe, &writer1);
5.155 + activities[2] = new std::thread(write_pipe, &writer2);
5.156
5.157 - for (int i = 0; i < 2; i++)
5.158 + for (int i = 0; i < 3; i++)
5.159 activities[i]->join();
5.160 }
5.161