Subversion Repositories HelenOS-historic

Rev

Rev 1392 | Rev 1405 | Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | Download | RSS feed

  1. /*
  2.  * Copyright (C) 2006 Ondrej Palkovsky
  3.  * All rights reserved.
  4.  *
  5.  * Redistribution and use in source and binary forms, with or without
  6.  * modification, are permitted provided that the following conditions
  7.  * are met:
  8.  *
  9.  * - Redistributions of source code must retain the above copyright
  10.  *   notice, this list of conditions and the following disclaimer.
  11.  * - Redistributions in binary form must reproduce the above copyright
  12.  *   notice, this list of conditions and the following disclaimer in the
  13.  *   documentation and/or other materials provided with the distribution.
  14.  * - The name of the author may not be used to endorse or promote products
  15.  *   derived from this software without specific prior written permission.
  16.  *
  17.  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  18.  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  19.  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  20.  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  21.  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  22.  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  23.  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  24.  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  25.  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  26.  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  27.  */
  28.  
  29. /**
  30.  * Asynchronous library
  31.  *
  32.  * The aim of this library is facilitating writing programs utilizing
  33.  * the asynchronous nature of Helenos IPC, yet using a normal way
  34.  * of programming.
  35.  *
  36.  * You should be able to write very simple multithreaded programs,
  37.  * the async framework will automatically take care of most synchronization
  38.  * problems.
  39.  *
  40.  * Default semantics:
  41.  * - send() - send asynchronously. If the kernel refuses to send more
  42.  *            messages, [ try to get responses from kernel, if nothing
  43.  *            found, might try synchronous ]
  44.  *
  45.  * Example of use:
  46.  *
  47.  * 1) Multithreaded client application
  48.  *  create_thread(thread1);
  49.  *  create_thread(thread2);
  50.  *  ...
  51.  *  
  52.  *  thread1() {
  53.  *        conn = ipc_connect_me_to();
  54.  *        c1 = send(conn);
  55.  *        c2 = send(conn);
  56.  *        wait_for(c1);
  57.  *        wait_for(c2);
  58.  *  }
  59.  *
  60.  *
  61.  * 2) Multithreaded server application
  62.  * main() {
  63.  *      wait_for_connection(new_connection);
  64.  * }
  65.  *
  66.  *
  67.  * new_connection(int connection) {
  68.  *       accept(connection);
  69.  *       msg = get_msg();
  70.  *       handle(msg);
  71.  *       answer(msg);
  72.  *
  73.  *       msg = get_msg();
  74.  *       ....
  75.  * }
  76.  *
  77.  * TODO: Detaching/joining dead psthreads? */
  78.  */
  79. #include <futex.h>
  80. #include <async.h>
  81. #include <psthread.h>
  82. #include <stdio.h>
  83. #include <libadt/hash_table.h>
  84. #include <libadt/list.h>
  85. #include <ipc/ipc.h>
  86. #include <assert.h>
  87. #include <errno.h>
  88.  
  89. static atomic_t conn_futex = FUTEX_INITIALIZER;
  90. static hash_table_t conn_hash_table;
  91.  
  92. typedef struct {
  93.     link_t link;
  94.     ipc_callid_t callid;
  95.     ipc_call_t call;
  96. } msg_t;
  97.  
  98. typedef struct {
  99.     link_t link;
  100.     ipcarg_t in_phone_hash;     /**< Incoming phone hash. */
  101.     link_t msg_queue;              /**< Messages that should be delivered to this thread */
  102.     pstid_t ptid;                /**< Thread associated with this connection */
  103.     int active;                     /**< If this thread is currently active */
  104.     /* Structures for connection opening packet */
  105.     ipc_callid_t callid;
  106.     ipc_call_t call;
  107. } connection_t;
  108.  
  109. __thread connection_t *PS_connection;
  110.  
  111. /* Hash table functions */
  112.  
  113. #define CONN_HASH_TABLE_CHAINS  32
  114.  
  115. static hash_index_t conn_hash(unsigned long *key)
  116. {
  117.     assert(key);
  118.     return ((*key) >> 4) % CONN_HASH_TABLE_CHAINS;
  119. }
  120.  
  121. static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
  122. {
  123.     connection_t *hs;
  124.  
  125.     hs = hash_table_get_instance(item, connection_t, link);
  126.    
  127.     return key[0] == hs->in_phone_hash;
  128. }
  129.  
  130. static void conn_remove(link_t *item)
  131. {
  132.     free(hash_table_get_instance(item, connection_t, link));
  133. }
  134.  
  135.  
  136. /** Operations for NS hash table. */
  137. static hash_table_operations_t conn_hash_table_ops = {
  138.     .hash = conn_hash,
  139.     .compare = conn_compare,
  140.     .remove_callback = conn_remove
  141. };
  142.  
  143. /** Try to route a call to an appropriate connection thread
  144.  *
  145.  */
  146. static int route_call(ipc_callid_t callid, ipc_call_t *call)
  147. {
  148.     connection_t *conn;
  149.     msg_t *msg;
  150.     link_t *hlp;
  151.     unsigned long key;
  152.  
  153.     futex_down(&conn_futex);
  154.  
  155.     key = call->in_phone_hash;
  156.     hlp = hash_table_find(&conn_hash_table, &key);
  157.     if (!hlp) {
  158.         futex_up(&conn_futex);
  159.         return 0;
  160.     }
  161.     conn = hash_table_get_instance(hlp, connection_t, link);
  162.  
  163.     msg = malloc(sizeof(*msg));
  164.     msg->callid = callid;
  165.     msg->call = *call;
  166.     list_append(&msg->link, &conn->msg_queue);
  167.    
  168.     if (!conn->active) {
  169.         conn->active = 1;
  170.         psthread_add_ready(conn->ptid);
  171.     }
  172.  
  173.     futex_up(&conn_futex);
  174.  
  175.     return 1;
  176. }
  177.  
  178. /** Return new incoming message for current(thread-local) connection */
  179. ipc_callid_t async_get_call(ipc_call_t *call)
  180. {
  181.     msg_t *msg;
  182.     ipc_callid_t callid;
  183.     connection_t *conn;
  184.    
  185.     futex_down(&conn_futex);
  186.  
  187.     conn = PS_connection;
  188.     /* If nothing in queue, wait until something appears */
  189.     if (list_empty(&conn->msg_queue)) {
  190.         conn->active = 0;
  191.         psthread_schedule_next_adv(PS_TO_MANAGER);
  192.     }
  193.    
  194.     msg = list_get_instance(conn->msg_queue.next, msg_t, link);
  195.     list_remove(&msg->link);
  196.     callid = msg->callid;
  197.     *call = msg->call;
  198.     free(msg);
  199.    
  200.     futex_up(&conn_futex);
  201.     return callid;
  202. }
  203.  
  204. /** Thread function that gets created on new connection
  205.  *
  206.  * This function is defined as a weak symbol - to be redefined in
  207.  * user code.
  208.  */
  209. void client_connection(ipc_callid_t callid, ipc_call_t *call)
  210. {
  211.     ipc_answer_fast(callid, ENOENT, 0, 0);
  212. }
  213.  
  214. /** Wrapper for client connection thread
  215.  *
  216.  * When new connection arrives, thread with this function is created.
  217.  * It calls client_connection and does final cleanup.
  218.  *
  219.  * @parameter arg Connection structure pointer
  220.  */
  221. static int connection_thread(void  *arg)
  222. {
  223.     unsigned long key;
  224.     msg_t *msg;
  225.  
  226.     /* Setup thread local connection pointer */
  227.     PS_connection = (connection_t *)arg;
  228.     client_connection(PS_connection->callid, &PS_connection->call);
  229.  
  230.     /* Remove myself from connection hash table */
  231.     futex_down(&conn_futex);
  232.     key = PS_connection->in_phone_hash;
  233.     hash_table_remove(&conn_hash_table, &key, 1);
  234.     futex_up(&conn_futex);
  235.     /* Answer all remaining messages with ehangup */
  236.     while (!list_empty(&PS_connection->msg_queue)) {
  237.         msg = list_get_instance(PS_connection->msg_queue.next, msg_t, link);
  238.         list_remove(&msg->link);
  239.         ipc_answer_fast(msg->callid, EHANGUP, 0, 0);
  240.         free(msg);
  241.     }
  242. }
  243.  
  244. /** Create new thread for a new connection
  245.  *
  246.  * Creates new thread for connection, fills in connection
  247.  * structures and inserts it into the hash table, so that
  248.  * later we can easily do routing of messages to particular
  249.  * threads.
  250.  */
  251. static void new_connection(ipc_callid_t callid, ipc_call_t *call)
  252. {
  253.     pstid_t ptid;
  254.     connection_t *conn;
  255.     unsigned long key;
  256.  
  257.     conn = malloc(sizeof(*conn));
  258.     if (!conn) {
  259.         ipc_answer_fast(callid, ENOMEM, 0, 0);
  260.         return;
  261.     }
  262.     conn->in_phone_hash = IPC_GET_ARG3(*call);
  263.     list_initialize(&conn->msg_queue);
  264.     conn->ptid = psthread_create(connection_thread, conn);
  265.     conn->callid = callid;
  266.     conn->call = *call;
  267.     conn->active = 1; /* We will activate it asap */
  268.     list_initialize(&conn->link);
  269.     if (!conn->ptid) {
  270.         free(conn);
  271.         ipc_answer_fast(callid, ENOMEM, 0, 0);
  272.         return;
  273.     }
  274.     key = conn->in_phone_hash;
  275.     futex_down(&conn_futex);
  276.     /* Add connection to hash table */
  277.     hash_table_insert(&conn_hash_table, &key, &conn->link);
  278.     futex_up(&conn_futex);
  279.  
  280.     psthread_add_ready(conn->ptid);
  281. }
  282.  
  283. /** Handle call to a task */
  284. static void handle_call(ipc_callid_t callid, ipc_call_t *call)
  285. {
  286.     if (route_call(callid, call))
  287.         return;
  288.  
  289.     switch (IPC_GET_METHOD(*call)) {
  290.     case IPC_M_INTERRUPT:
  291.         break;
  292.     case IPC_M_CONNECT_ME_TO:
  293.         /* Open new connection with thread etc. */
  294.         new_connection(callid, call);
  295.         break;
  296.     default:
  297.         ipc_answer_fast(callid, EHANGUP, 0, 0);
  298.     }
  299. }
  300.  
  301. /** Endless loop dispatching incoming calls and answers */
  302. int async_manager()
  303. {
  304.     ipc_call_t call;
  305.     ipc_callid_t callid;
  306.  
  307.     while (1) {
  308.         if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
  309.             futex_up(&conn_futex); /* conn_futex is always held
  310.                         * when entering manager thread
  311.                         */
  312.             continue;
  313.         }
  314.         callid = ipc_wait_cycle(&call,SYNCH_NO_TIMEOUT,SYNCH_BLOCKING);
  315.  
  316.         if (callid & IPC_CALLID_ANSWERED)
  317.             continue;
  318.         handle_call(callid, &call);
  319.     }
  320. }
  321.  
  322. /** Function to start async_manager as a standalone thread
  323.  *
  324.  * When more kernel threads are used, one async manager should
  325.  * exist per thread. The particular implementation may change,
  326.  * currently one async_manager is started automatically per kernel
  327.  * thread except main thread.
  328.  */
  329. static int async_manager_thread(void *arg)
  330. {
  331.     futex_up(&conn_futex); /* conn_futex is always locked when entering
  332.                 * manager */
  333.     async_manager();
  334. }
  335.  
  336. /** Add one manager to manager list */
  337. void async_create_manager(void)
  338. {
  339.     pstid_t ptid;
  340.  
  341.     ptid = psthread_create(async_manager_thread, NULL);
  342.     psthread_add_manager(ptid);
  343. }
  344.  
  345. /** Remove one manager from manager list */
  346. void async_destroy_manager(void)
  347. {
  348.     psthread_remove_manager();
  349. }
  350.  
  351. /** Initialize internal structures needed for async manager */
  352. int _async_init(void)
  353. {
  354.     if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) {
  355.         printf("%s: cannot create hash table\n", "async");
  356.         return ENOMEM;
  357.     }
  358.    
  359. }
  360.