1 /* 2 * Test pipe operations. 3 * 4 * Copyright (C) 2020, 2021 Paul Boddie <paul@boddie.org.uk> 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License as 8 * published by the Free Software Foundation; either version 2 of 9 * the License, or (at your option) any later version. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License 17 * along with this program; if not, write to the Free Software 18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, 19 * Boston, MA 02110-1301, USA 20 */ 21 22 #include <l4/re/env.h> 23 #include <l4/sys/err.h> 24 25 #include <thread> 26 27 #include <stdio.h> 28 #include <string.h> 29 #include <stdlib.h> 30 #include <unistd.h> /* sleep */ 31 32 #include <fsclient/client.h> 33 #include <fsclient/file.h> 34 #include <mem/memory_utils.h> 35 36 37 38 /* Minimum pipe region size in pages. */ 39 40 const unsigned int PIPE_PAGES = 2; 41 42 /* Use the writer to fill the pipe with data. */ 43 44 static void write_pipe(file_t *writer) 45 { 46 offset_t size = 600; 47 char buffer[size]; 48 49 for (int loop = 0; loop < 3; loop++) 50 { 51 for (int region = 0; region < 26; region++) 52 { 53 memset(buffer, (int) 'a' + region, size); 54 55 offset_t nwritten = client_write(writer, buffer, size); 56 57 printf("Written %ld/%ld in #%d of %d/%d to pipe...\n", nwritten, size, region, loop, 2); 58 } 59 60 sleep(1); 61 } 62 63 /* Flush to make the final output available. */ 64 65 client_flush(writer); 66 67 file_close(writer); 68 } 69 70 /* Use the reader to obtain data from the pipe. */ 71 72 static void read_pipes(file_t *reader1, file_t *reader2) 73 { 74 offset_t size = 600, totals[] = {0, 0}; 75 bool active[] = {true, true}; 76 int num_active = 2; 77 char buffer[size]; 78 offset_t nread; 79 file_t *reader; 80 long err; 81 82 if ((err = client_wait_init(reader1)) || (err = client_wait_init(reader2))) 83 { 84 printf("Could not initialise waiting for files: %s\n", l4sys_errtostr(err)); 85 return; 86 } 87 88 while (1) 89 { 90 /* Wait for notification of content. */ 91 92 printf("Waiting...\n"); 93 long err = client_wait_files(&reader); 94 95 if (err) 96 { 97 printf("Error waiting for notifications: %s\n", l4sys_errtostr(err)); 98 return; 99 } 100 101 if ((reader != reader1) && (reader != reader2)) 102 { 103 printf("Spurious notification received for %p versus %p, %p.\n", reader, reader1, reader2); 104 continue; 105 } 106 107 nread = client_read(reader, buffer, size); 108 109 // NOTE: Should really be testing for the condition somehow. 110 111 int p = reader == reader1 ? 0 : 1; 112 113 if (!nread) 114 { 115 if (active[p]) 116 { 117 active[p] = false; 118 num_active--; 119 120 if (!num_active) 121 break; 122 } 123 } 124 125 do 126 { 127 totals[p] += nread; 128 129 printf("Read %ld/%ld, total %ld, from pipe #%d...\n", nread, size, totals[p], p + 1); 130 #if 0 131 for (offset_t i = 0; i < nread; i += 60) 132 { 133 fwrite(buffer + i, sizeof(char), nread - i > 60 ? 60 : nread - i, stdout); 134 fputs("\n", stdout); 135 } 136 #endif 137 nread = client_read(reader, buffer, size); 138 } 139 while (nread); 140 } 141 142 printf("Data shown.\n"); 143 } 144 145 int main(void) 146 { 147 /* Obtain access to the filesystem. */ 148 149 l4_cap_idx_t server = l4re_env_get_cap("server"); 150 151 /* Invoke the open method to receive the file reference. */ 152 153 file_t reader1, reader2, writer1, writer2; 154 long err = pipe_open(page(PIPE_PAGES), &reader1, &writer1, server) || 155 pipe_open(page(PIPE_PAGES), &reader2, &writer2, server); 156 157 if (err) 158 { 159 printf("Could not obtain pipes: %s\n", l4sys_errtostr(err)); 160 return 1; 161 } 162 163 /* Register the readers for notification. */ 164 165 if ((err = client_subscribe(&reader1, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED)) || 166 (err = client_subscribe(&reader2, NOTIFY_CONTENT_AVAILABLE | NOTIFY_PEER_CLOSED))) 167 { 168 printf("Could not subscribe to notifications: %s\n", l4sys_errtostr(err)); 169 return 1; 170 } 171 172 /* Make the writers blocking to permit synchronisation. */ 173 174 if ((err = client_set_blocking(&writer1, NOTIFY_SPACE_AVAILABLE)) || 175 (err = client_set_blocking(&writer2, NOTIFY_SPACE_AVAILABLE))) 176 { 177 printf("Could not set as blocking: %s\n", l4sys_errtostr(err)); 178 return 1; 179 } 180 181 /* Schedule reader and writer threads. */ 182 183 std::thread *activities[3]; 184 185 activities[0] = new std::thread(read_pipes, &reader1, &reader2); 186 activities[1] = new std::thread(write_pipe, &writer1); 187 activities[2] = new std::thread(write_pipe, &writer2); 188 189 for (int i = 0; i < 3; i++) 190 activities[i]->join(); 191 } 192 193 // vim: tabstop=2 expandtab shiftwidth=2