1.1 --- a/tests/dstest_pipe_client.cc Mon Jun 07 00:50:14 2021 +0200
1.2 +++ b/tests/dstest_pipe_client.cc Fri Jul 02 23:53:13 2021 +0200
1.3 @@ -27,6 +27,7 @@
1.4 #include <stdio.h>
1.5 #include <string.h>
1.6 #include <stdlib.h>
1.7 +#include <unistd.h> /* sleep */
1.8
1.9 #include <fsclient/client.h>
1.10 #include <fsclient/file.h>
1.11 @@ -55,34 +56,88 @@
1.12
1.13 printf("Written %ld/%ld in #%d of %d/%d to pipe...\n", nwritten, size, region, loop, 2);
1.14 }
1.15 +
1.16 + sleep(1);
1.17 }
1.18
1.19 /* Flush to make the final output available. */
1.20
1.21 client_flush(writer);
1.22 +
1.23 + file_close(writer);
1.24 }
1.25
1.26 /* Use the reader to obtain data from the pipe. */
1.27
1.28 -static void read_pipe(file_t *reader)
1.29 +static void read_pipes(file_t *reader1, file_t *reader2)
1.30 {
1.31 - offset_t size = 600;
1.32 + offset_t size = 600, totals[] = {0, 0};
1.33 + bool active[] = {true, true};
1.34 + int num_active = 2;
1.35 char buffer[size];
1.36 offset_t nread;
1.37 + file_t *reader;
1.38 + long err;
1.39
1.40 - do
1.41 + if ((err = client_wait_init(reader1)) || (err = client_wait_init(reader2)))
1.42 + {
1.43 + printf("Could not initialise waiting for files: %s\n", l4sys_errtostr(err));
1.44 + return;
1.45 + }
1.46 +
1.47 + while (1)
1.48 {
1.49 + /* Wait for notification of content. */
1.50 +
1.51 + printf("Waiting...\n");
1.52 + long err = client_wait_files(&reader);
1.53 +
1.54 + if (err)
1.55 + {
1.56 + printf("Error waiting for notifications: %s\n", l4sys_errtostr(err));
1.57 + return;
1.58 + }
1.59 +
1.60 + if ((reader != reader1) && (reader != reader2))
1.61 + {
1.62 + printf("Spurious notification received for %p versus %p, %p.\n", reader, reader1, reader2);
1.63 + continue;
1.64 + }
1.65 +
1.66 nread = client_read(reader, buffer, size);
1.67
1.68 - printf("Read %ld/%ld from pipe...\n", nread, size);
1.69 + // NOTE: Should really be testing for the condition somehow.
1.70
1.71 - for (offset_t i = 0; i < nread; i += 60)
1.72 + int p = reader == reader1 ? 0 : 1;
1.73 +
1.74 + if (!nread)
1.75 {
1.76 - fwrite(buffer + i, sizeof(char), nread - i > 60 ? 60 : nread - i, stdout);
1.77 - fputs("\n", stdout);
1.78 + if (active[p])
1.79 + {
1.80 + active[p] = false;
1.81 + num_active--;
1.82 +
1.83 + if (!num_active)
1.84 + break;
1.85 + }
1.86 }
1.87 +
1.88 + do
1.89 + {
1.90 + totals[p] += nread;
1.91 +
1.92 + printf("Read %ld/%ld, total %ld, from pipe #%d...\n", nread, size, totals[p], p + 1);
1.93 +#if 0
1.94 + for (offset_t i = 0; i < nread; i += 60)
1.95 + {
1.96 + fwrite(buffer + i, sizeof(char), nread - i > 60 ? 60 : nread - i, stdout);
1.97 + fputs("\n", stdout);
1.98 + }
1.99 +#endif
1.100 + nread = client_read(reader, buffer, size);
1.101 + }
1.102 + while (nread);
1.103 }
1.104 - while (nread);
1.105
1.106 printf("Data shown.\n");
1.107 }
1.108 @@ -95,19 +150,29 @@
1.109
1.110 /* Invoke the open method to receive the file reference. */
1.111
1.112 - file_t reader, writer;
1.113 - long err = pipe_open(page(PIPE_PAGES), &reader, &writer, server);
1.114 + file_t reader1, reader2, writer1, writer2;
1.115 + long err = pipe_open(page(PIPE_PAGES), &reader1, &writer1, server) ||
1.116 + pipe_open(page(PIPE_PAGES), &reader2, &writer2, server);
1.117
1.118 if (err)
1.119 {
1.120 - printf("Could not obtain pipe: %s\n", l4sys_errtostr(err));
1.121 + printf("Could not obtain pipes: %s\n", l4sys_errtostr(err));
1.122 return 1;
1.123 }
1.124
1.125 - /* Make the reader and writer blocking to permit synchronisation. */
1.126 + /* Register the readers for notification. */
1.127
1.128 - if ((err = client_set_blocking(&reader, NOTIFY_CONTENT_AVAILABLE)) ||
1.129 - (err = client_set_blocking(&writer, NOTIFY_SPACE_AVAILABLE)))
1.130 + if ((err = client_subscribe(&reader1, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED)) ||
1.131 + (err = client_subscribe(&reader2, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED)))
1.132 + {
1.133 + printf("Could not subscribe to notifications: %s\n", l4sys_errtostr(err));
1.134 + return 1;
1.135 + }
1.136 +
1.137 + /* Make the writers blocking to permit synchronisation. */
1.138 +
1.139 + if ((err = client_set_blocking(&writer1, NOTIFY_SPACE_AVAILABLE)) ||
1.140 + (err = client_set_blocking(&writer2, NOTIFY_SPACE_AVAILABLE)))
1.141 {
1.142 printf("Could not set as blocking: %s\n", l4sys_errtostr(err));
1.143 return 1;
1.144 @@ -115,12 +180,13 @@
1.145
1.146 /* Schedule reader and writer threads. */
1.147
1.148 - std::thread *activities[2];
1.149 + std::thread *activities[3];
1.150
1.151 - activities[0] = new std::thread(read_pipe, &reader);
1.152 - activities[1] = new std::thread(write_pipe, &writer);
1.153 + activities[0] = new std::thread(read_pipes, &reader1, &reader2);
1.154 + activities[1] = new std::thread(write_pipe, &writer1);
1.155 + activities[2] = new std::thread(write_pipe, &writer2);
1.156
1.157 - for (int i = 0; i < 2; i++)
1.158 + for (int i = 0; i < 3; i++)
1.159 activities[i]->join();
1.160 }
1.161