Rev 1427 | Rev 1441 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed
Rev | Author | Line No. | Line |
---|---|---|---|
1351 | palkovsky | 1 | /* |
2 | * Copyright (C) 2006 Ondrej Palkovsky |
||
3 | * All rights reserved. |
||
4 | * |
||
5 | * Redistribution and use in source and binary forms, with or without |
||
6 | * modification, are permitted provided that the following conditions |
||
7 | * are met: |
||
8 | * |
||
9 | * - Redistributions of source code must retain the above copyright |
||
10 | * notice, this list of conditions and the following disclaimer. |
||
11 | * - Redistributions in binary form must reproduce the above copyright |
||
12 | * notice, this list of conditions and the following disclaimer in the |
||
13 | * documentation and/or other materials provided with the distribution. |
||
14 | * - The name of the author may not be used to endorse or promote products |
||
15 | * derived from this software without specific prior written permission. |
||
16 | * |
||
17 | * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR |
||
18 | * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES |
||
19 | * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. |
||
20 | * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, |
||
21 | * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT |
||
22 | * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||
23 | * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||
24 | * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||
25 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF |
||
26 | * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||
27 | */ |
||
28 | |||
1392 | palkovsky | 29 | /** |
30 | * Asynchronous library |
||
31 | * |
||
32 | * The aim of this library is facilitating writing programs utilizing |
||
33 | * the asynchronous nature of Helenos IPC, yet using a normal way |
||
34 | * of programming. |
||
35 | * |
||
36 | * You should be able to write very simple multithreaded programs, |
||
37 | * the async framework will automatically take care of most synchronization |
||
38 | * problems. |
||
39 | * |
||
40 | * Default semantics: |
||
41 | * - send() - send asynchronously. If the kernel refuses to send more |
||
42 | * messages, [ try to get responses from kernel, if nothing |
||
43 | * found, might try synchronous ] |
||
44 | * |
||
45 | * Example of use: |
||
46 | * |
||
47 | * 1) Multithreaded client application |
||
48 | * create_thread(thread1); |
||
49 | * create_thread(thread2); |
||
50 | * ... |
||
51 | * |
||
52 | * thread1() { |
||
53 | * conn = ipc_connect_me_to(); |
||
54 | * c1 = send(conn); |
||
55 | * c2 = send(conn); |
||
56 | * wait_for(c1); |
||
57 | * wait_for(c2); |
||
58 | * } |
||
59 | * |
||
60 | * |
||
61 | * 2) Multithreaded server application |
||
62 | * main() { |
||
1407 | palkovsky | 63 | * async_manager(); |
1392 | palkovsky | 64 | * } |
65 | * |
||
66 | * |
||
1407 | palkovsky | 67 | * client_connection(icallid, *icall) { |
68 | * if (want_refuse) { |
||
69 | * ipc_answer_fast(icallid, ELIMIT, 0, 0); |
||
70 | * return; |
||
71 | * } |
||
72 | * ipc_answer_fast(icallid, 0, 0, 0); |
||
1392 | palkovsky | 73 | * |
1407 | palkovsky | 74 | * callid = async_get_call(&call); |
75 | * handle(callid, call); |
||
76 | * ipc_answer_fast(callid, 1,2,3); |
||
77 | * |
||
78 | * callid = async_get_call(&call); |
||
1392 | palkovsky | 79 | * .... |
80 | * } |
||
1404 | palkovsky | 81 | * |
1405 | decky | 82 | * TODO: Detaching/joining dead psthreads? |
1392 | palkovsky | 83 | */ |
84 | #include <futex.h> |
||
85 | #include <async.h> |
||
86 | #include <psthread.h> |
||
87 | #include <stdio.h> |
||
88 | #include <libadt/hash_table.h> |
||
89 | #include <libadt/list.h> |
||
90 | #include <ipc/ipc.h> |
||
91 | #include <assert.h> |
||
92 | #include <errno.h> |
||
93 | |||
1427 | palkovsky | 94 | static atomic_t async_futex = FUTEX_INITIALIZER; |
1392 | palkovsky | 95 | static hash_table_t conn_hash_table; |
96 | |||
97 | typedef struct { |
||
1427 | palkovsky | 98 | pstid_t ptid; /**< Thread waiting for this message */ |
99 | int active; /**< If this thread is currently active */ |
||
100 | int done; /**< If reply was received */ |
||
101 | ipc_call_t *dataptr; /**< Pointer where the answer data |
||
102 | * should be stored */ |
||
103 | ipcarg_t retval; |
||
104 | } amsg_t; |
||
105 | |||
106 | typedef struct { |
||
1392 | palkovsky | 107 | link_t link; |
108 | ipc_callid_t callid; |
||
109 | ipc_call_t call; |
||
110 | } msg_t; |
||
111 | |||
112 | typedef struct { |
||
113 | link_t link; |
||
114 | ipcarg_t in_phone_hash; /**< Incoming phone hash. */ |
||
115 | link_t msg_queue; /**< Messages that should be delivered to this thread */ |
||
116 | pstid_t ptid; /**< Thread associated with this connection */ |
||
117 | int active; /**< If this thread is currently active */ |
||
118 | /* Structures for connection opening packet */ |
||
119 | ipc_callid_t callid; |
||
120 | ipc_call_t call; |
||
1407 | palkovsky | 121 | void (*cthread)(ipc_callid_t,ipc_call_t *); |
1392 | palkovsky | 122 | } connection_t; |
123 | |||
124 | __thread connection_t *PS_connection; |
||
125 | |||
126 | /* Hash table functions */ |
||
1404 | palkovsky | 127 | #define CONN_HASH_TABLE_CHAINS 32 |
1392 | palkovsky | 128 | |
129 | static hash_index_t conn_hash(unsigned long *key) |
||
1351 | palkovsky | 130 | { |
1392 | palkovsky | 131 | assert(key); |
1404 | palkovsky | 132 | return ((*key) >> 4) % CONN_HASH_TABLE_CHAINS; |
1351 | palkovsky | 133 | } |
134 | |||
1392 | palkovsky | 135 | static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item) |
1351 | palkovsky | 136 | { |
1392 | palkovsky | 137 | connection_t *hs; |
138 | |||
139 | hs = hash_table_get_instance(item, connection_t, link); |
||
140 | |||
141 | return key[0] == hs->in_phone_hash; |
||
1351 | palkovsky | 142 | } |
143 | |||
1392 | palkovsky | 144 | static void conn_remove(link_t *item) |
1351 | palkovsky | 145 | { |
1392 | palkovsky | 146 | free(hash_table_get_instance(item, connection_t, link)); |
1351 | palkovsky | 147 | } |
148 | |||
1392 | palkovsky | 149 | |
150 | /** Operations for NS hash table. */ |
||
151 | static hash_table_operations_t conn_hash_table_ops = { |
||
152 | .hash = conn_hash, |
||
153 | .compare = conn_compare, |
||
154 | .remove_callback = conn_remove |
||
155 | }; |
||
156 | |||
1427 | palkovsky | 157 | /*************************************************/ |
158 | |||
1392 | palkovsky | 159 | /** Try to route a call to an appropriate connection thread |
160 | * |
||
161 | */ |
||
162 | static int route_call(ipc_callid_t callid, ipc_call_t *call) |
||
1351 | palkovsky | 163 | { |
1392 | palkovsky | 164 | connection_t *conn; |
165 | msg_t *msg; |
||
166 | link_t *hlp; |
||
167 | unsigned long key; |
||
168 | |||
1427 | palkovsky | 169 | futex_down(&async_futex); |
1392 | palkovsky | 170 | |
171 | key = call->in_phone_hash; |
||
172 | hlp = hash_table_find(&conn_hash_table, &key); |
||
173 | if (!hlp) { |
||
1427 | palkovsky | 174 | futex_up(&async_futex); |
1392 | palkovsky | 175 | return 0; |
1351 | palkovsky | 176 | } |
1392 | palkovsky | 177 | conn = hash_table_get_instance(hlp, connection_t, link); |
1351 | palkovsky | 178 | |
1392 | palkovsky | 179 | msg = malloc(sizeof(*msg)); |
180 | msg->callid = callid; |
||
181 | msg->call = *call; |
||
182 | list_append(&msg->link, &conn->msg_queue); |
||
183 | |||
184 | if (!conn->active) { |
||
185 | conn->active = 1; |
||
186 | psthread_add_ready(conn->ptid); |
||
187 | } |
||
1351 | palkovsky | 188 | |
1427 | palkovsky | 189 | futex_up(&async_futex); |
1392 | palkovsky | 190 | |
191 | return 1; |
||
192 | } |
||
193 | |||
1404 | palkovsky | 194 | /** Return new incoming message for current(thread-local) connection */ |
1392 | palkovsky | 195 | ipc_callid_t async_get_call(ipc_call_t *call) |
196 | { |
||
197 | msg_t *msg; |
||
198 | ipc_callid_t callid; |
||
199 | connection_t *conn; |
||
200 | |||
1427 | palkovsky | 201 | futex_down(&async_futex); |
1392 | palkovsky | 202 | |
203 | conn = PS_connection; |
||
204 | /* If nothing in queue, wait until something appears */ |
||
205 | if (list_empty(&conn->msg_queue)) { |
||
206 | conn->active = 0; |
||
207 | psthread_schedule_next_adv(PS_TO_MANAGER); |
||
1351 | palkovsky | 208 | } |
209 | |||
1392 | palkovsky | 210 | msg = list_get_instance(conn->msg_queue.next, msg_t, link); |
211 | list_remove(&msg->link); |
||
212 | callid = msg->callid; |
||
213 | *call = msg->call; |
||
214 | free(msg); |
||
215 | |||
1427 | palkovsky | 216 | futex_up(&async_futex); |
1392 | palkovsky | 217 | return callid; |
1351 | palkovsky | 218 | } |
219 | |||
1404 | palkovsky | 220 | /** Thread function that gets created on new connection |
221 | * |
||
222 | * This function is defined as a weak symbol - to be redefined in |
||
223 | * user code. |
||
224 | */ |
||
1392 | palkovsky | 225 | void client_connection(ipc_callid_t callid, ipc_call_t *call) |
226 | { |
||
1404 | palkovsky | 227 | ipc_answer_fast(callid, ENOENT, 0, 0); |
1392 | palkovsky | 228 | } |
1351 | palkovsky | 229 | |
1404 | palkovsky | 230 | /** Wrapper for client connection thread |
231 | * |
||
232 | * When new connection arrives, thread with this function is created. |
||
233 | * It calls client_connection and does final cleanup. |
||
234 | * |
||
235 | * @parameter arg Connection structure pointer |
||
236 | */ |
||
1392 | palkovsky | 237 | static int connection_thread(void *arg) |
1351 | palkovsky | 238 | { |
1404 | palkovsky | 239 | unsigned long key; |
240 | msg_t *msg; |
||
1408 | palkovsky | 241 | connection_t *conn; |
1404 | palkovsky | 242 | |
1392 | palkovsky | 243 | /* Setup thread local connection pointer */ |
244 | PS_connection = (connection_t *)arg; |
||
1408 | palkovsky | 245 | conn = PS_connection; |
246 | conn->cthread(conn->callid, &conn->call); |
||
1392 | palkovsky | 247 | |
1404 | palkovsky | 248 | /* Remove myself from connection hash table */ |
1427 | palkovsky | 249 | futex_down(&async_futex); |
1408 | palkovsky | 250 | key = conn->in_phone_hash; |
1404 | palkovsky | 251 | hash_table_remove(&conn_hash_table, &key, 1); |
1427 | palkovsky | 252 | futex_up(&async_futex); |
1404 | palkovsky | 253 | /* Answer all remaining messages with ehangup */ |
1408 | palkovsky | 254 | while (!list_empty(&conn->msg_queue)) { |
255 | msg = list_get_instance(conn->msg_queue.next, msg_t, link); |
||
1404 | palkovsky | 256 | list_remove(&msg->link); |
257 | ipc_answer_fast(msg->callid, EHANGUP, 0, 0); |
||
258 | free(msg); |
||
259 | } |
||
1351 | palkovsky | 260 | } |
1392 | palkovsky | 261 | |
262 | /** Create new thread for a new connection |
||
263 | * |
||
264 | * Creates new thread for connection, fills in connection |
||
265 | * structures and inserts it into the hash table, so that |
||
266 | * later we can easily do routing of messages to particular |
||
267 | * threads. |
||
1407 | palkovsky | 268 | * |
269 | * @param callid Callid of the IPC_M_CONNECT_ME_TO packet |
||
270 | * @param call Call data of the opening packet |
||
271 | * @param cthread Thread function that should be called upon |
||
272 | * opening the connection |
||
273 | * @return New thread id |
||
1392 | palkovsky | 274 | */ |
1407 | palkovsky | 275 | pstid_t async_new_connection(ipc_callid_t callid, ipc_call_t *call, |
276 | void (*cthread)(ipc_callid_t,ipc_call_t *)) |
||
1392 | palkovsky | 277 | { |
278 | pstid_t ptid; |
||
279 | connection_t *conn; |
||
280 | unsigned long key; |
||
281 | |||
282 | conn = malloc(sizeof(*conn)); |
||
283 | if (!conn) { |
||
284 | ipc_answer_fast(callid, ENOMEM, 0, 0); |
||
1407 | palkovsky | 285 | return NULL; |
1392 | palkovsky | 286 | } |
287 | conn->in_phone_hash = IPC_GET_ARG3(*call); |
||
288 | list_initialize(&conn->msg_queue); |
||
289 | conn->ptid = psthread_create(connection_thread, conn); |
||
290 | conn->callid = callid; |
||
291 | conn->call = *call; |
||
292 | conn->active = 1; /* We will activate it asap */ |
||
1407 | palkovsky | 293 | conn->cthread = cthread; |
1392 | palkovsky | 294 | list_initialize(&conn->link); |
295 | if (!conn->ptid) { |
||
296 | free(conn); |
||
297 | ipc_answer_fast(callid, ENOMEM, 0, 0); |
||
1407 | palkovsky | 298 | return NULL; |
1392 | palkovsky | 299 | } |
300 | key = conn->in_phone_hash; |
||
1427 | palkovsky | 301 | futex_down(&async_futex); |
1392 | palkovsky | 302 | /* Add connection to hash table */ |
303 | hash_table_insert(&conn_hash_table, &key, &conn->link); |
||
1427 | palkovsky | 304 | futex_up(&async_futex); |
1392 | palkovsky | 305 | |
306 | psthread_add_ready(conn->ptid); |
||
1407 | palkovsky | 307 | |
308 | return conn->ptid; |
||
1392 | palkovsky | 309 | } |
310 | |||
1427 | palkovsky | 311 | /** Handle call that was received */ |
1392 | palkovsky | 312 | static void handle_call(ipc_callid_t callid, ipc_call_t *call) |
313 | { |
||
314 | if (route_call(callid, call)) |
||
315 | return; |
||
316 | |||
317 | switch (IPC_GET_METHOD(*call)) { |
||
318 | case IPC_M_INTERRUPT: |
||
319 | break; |
||
320 | case IPC_M_CONNECT_ME_TO: |
||
321 | /* Open new connection with thread etc. */ |
||
1407 | palkovsky | 322 | async_new_connection(callid, call, client_connection); |
1392 | palkovsky | 323 | break; |
324 | default: |
||
325 | ipc_answer_fast(callid, EHANGUP, 0, 0); |
||
326 | } |
||
327 | } |
||
328 | |||
329 | /** Endless loop dispatching incoming calls and answers */ |
||
330 | int async_manager() |
||
331 | { |
||
332 | ipc_call_t call; |
||
333 | ipc_callid_t callid; |
||
1435 | palkovsky | 334 | int timeout; |
1392 | palkovsky | 335 | |
336 | while (1) { |
||
337 | if (psthread_schedule_next_adv(PS_FROM_MANAGER)) { |
||
1427 | palkovsky | 338 | futex_up(&async_futex); /* async_futex is always held |
1392 | palkovsky | 339 | * when entering manager thread |
340 | */ |
||
341 | continue; |
||
342 | } |
||
1435 | palkovsky | 343 | /* |
344 | if (expires) |
||
345 | timeout = .... ; |
||
346 | else |
||
347 | */ |
||
348 | timeout = SYNCH_NO_TIMEOUT; |
||
349 | callid = ipc_wait_cycle(&call, timeout, SYNCH_BLOCKING); |
||
1392 | palkovsky | 350 | |
1435 | palkovsky | 351 | if (!callid) { |
352 | // handle_expired_timeouts.......; |
||
353 | continue; |
||
354 | } |
||
355 | |||
1392 | palkovsky | 356 | if (callid & IPC_CALLID_ANSWERED) |
357 | continue; |
||
1427 | palkovsky | 358 | |
1392 | palkovsky | 359 | handle_call(callid, &call); |
360 | } |
||
361 | } |
||
362 | |||
1404 | palkovsky | 363 | /** Function to start async_manager as a standalone thread |
364 | * |
||
365 | * When more kernel threads are used, one async manager should |
||
366 | * exist per thread. The particular implementation may change, |
||
367 | * currently one async_manager is started automatically per kernel |
||
368 | * thread except main thread. |
||
369 | */ |
||
1392 | palkovsky | 370 | static int async_manager_thread(void *arg) |
371 | { |
||
1427 | palkovsky | 372 | futex_up(&async_futex); /* async_futex is always locked when entering |
1392 | palkovsky | 373 | * manager */ |
374 | async_manager(); |
||
375 | } |
||
376 | |||
377 | /** Add one manager to manager list */ |
||
378 | void async_create_manager(void) |
||
379 | { |
||
380 | pstid_t ptid; |
||
381 | |||
382 | ptid = psthread_create(async_manager_thread, NULL); |
||
383 | psthread_add_manager(ptid); |
||
384 | } |
||
385 | |||
386 | /** Remove one manager from manager list */ |
||
387 | void async_destroy_manager(void) |
||
388 | { |
||
389 | psthread_remove_manager(); |
||
390 | } |
||
391 | |||
392 | /** Initialize internal structures needed for async manager */ |
||
393 | int _async_init(void) |
||
394 | { |
||
1404 | palkovsky | 395 | if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) { |
1392 | palkovsky | 396 | printf("%s: cannot create hash table\n", "async"); |
397 | return ENOMEM; |
||
398 | } |
||
399 | |||
400 | } |
||
1427 | palkovsky | 401 | |
402 | /** IPC handler for messages in async framework |
||
403 | * |
||
404 | * Notify thread that is waiting for this message, that it arrived |
||
405 | */ |
||
406 | static void reply_received(void *private, int retval, |
||
407 | ipc_call_t *data) |
||
408 | { |
||
409 | amsg_t *msg = (amsg_t *) private; |
||
410 | |||
411 | msg->retval = retval; |
||
412 | |||
413 | futex_down(&async_futex); |
||
414 | /* Copy data after futex_down, just in case the |
||
415 | * call was detached |
||
416 | */ |
||
417 | if (msg->dataptr) |
||
418 | *msg->dataptr = *data; |
||
1435 | palkovsky | 419 | |
420 | /* TODO: memory barrier?? */ |
||
1427 | palkovsky | 421 | msg->done = 1; |
422 | if (! msg->active) { |
||
423 | msg->active = 1; |
||
424 | psthread_add_ready(msg->ptid); |
||
425 | } |
||
426 | futex_up(&async_futex); |
||
427 | } |
||
428 | |||
429 | /** Send message and return id of the sent message |
||
430 | * |
||
431 | * The return value can be used as input for async_wait() to wait |
||
432 | * for completion. |
||
433 | */ |
||
434 | aid_t async_send_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2, |
||
435 | ipc_call_t *dataptr) |
||
436 | { |
||
437 | amsg_t *msg; |
||
438 | |||
439 | msg = malloc(sizeof(*msg)); |
||
440 | msg->active = 1; |
||
441 | msg->done = 0; |
||
442 | msg->dataptr = dataptr; |
||
443 | ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received); |
||
444 | |||
445 | return (aid_t) msg; |
||
446 | } |
||
447 | |||
448 | /** Wait for a message sent by async framework |
||
449 | * |
||
450 | * @param amsgid Message ID to wait for |
||
451 | * @param retval Pointer to variable where will be stored retval |
||
452 | * of the answered message. If NULL, it is ignored. |
||
453 | * |
||
454 | */ |
||
455 | void async_wait_for(aid_t amsgid, ipcarg_t *retval) |
||
456 | { |
||
457 | amsg_t *msg = (amsg_t *) amsgid; |
||
458 | connection_t *conn; |
||
459 | |||
460 | futex_down(&async_futex); |
||
461 | if (msg->done) { |
||
462 | futex_up(&async_futex); |
||
463 | goto done; |
||
464 | } |
||
465 | |||
466 | msg->ptid = psthread_get_id(); |
||
467 | msg->active = 0; |
||
468 | /* Leave locked async_futex when entering this function */ |
||
469 | psthread_schedule_next_adv(PS_TO_MANAGER); |
||
470 | /* futex is up automatically after psthread_schedule_next...*/ |
||
471 | done: |
||
472 | if (retval) |
||
473 | *retval = msg->retval; |
||
474 | free(msg); |
||
475 | } |
||
1435 | palkovsky | 476 | |
477 | |||
478 | /* int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, int timeout) */ |
||
479 | /* { */ |
||
480 | /* amsg_t *msg = (amsg_t *) amsgid; */ |
||
481 | /* connection_t *conn; */ |
||
482 | |||
483 | /* futex_down(&async_futex); */ |
||
484 | /* if (msg->done) { */ |
||
485 | /* futex_up(&async_futex); */ |
||
486 | /* goto done; */ |
||
487 | /* } */ |
||
488 | |||
489 | /* msg->ptid = psthread_get_id(); */ |
||
490 | /* msg->active = 0; */ |
||
491 | /* msg->expires = gettime() + timeout; */ |
||
492 | /* setup_timeouts_etc...(); */ |
||
493 | |||
494 | /* /\* Leave locked async_futex when entering this function *\/ */ |
||
495 | /* psthread_schedule_next_adv(PS_TO_MANAGER); */ |
||
496 | /* /\* futex is up automatically after psthread_schedule_next...*\/ */ |
||
497 | |||
498 | /* if (!msg->done) */ |
||
499 | /* return casy-casy; */ |
||
500 | |||
501 | /* /\* TODO: When memory barrier in reply_received, we can skip this *\/ */ |
||
502 | /* futex_down(&async_futex); */ |
||
503 | /* futex_up(&async_futex); */ |
||
504 | /* done: */ |
||
505 | |||
506 | /* if (retval) */ |
||
507 | /* *retval = msg->retval; */ |
||
508 | /* free(msg); */ |
||
509 | /* } */ |
||
510 |