Subversion Repositories HelenOS-historic

Compare Revisions

Ignore whitespace Rev 1426 → Rev 1427

/uspace/trunk/libc/generic/async.c
91,10 → 91,19
#include <assert.h>
#include <errno.h>
 
static atomic_t conn_futex = FUTEX_INITIALIZER;
static atomic_t async_futex = FUTEX_INITIALIZER;
static hash_table_t conn_hash_table;
 
typedef struct {
pstid_t ptid; /**< Thread waiting for this message */
int active; /**< If this thread is currently active */
int done; /**< If reply was received */
ipc_call_t *dataptr; /**< Pointer where the answer data
* should be stored */
ipcarg_t retval;
} amsg_t;
 
typedef struct {
link_t link;
ipc_callid_t callid;
ipc_call_t call;
115,7 → 124,6
__thread connection_t *PS_connection;
 
/* Hash table functions */
 
#define CONN_HASH_TABLE_CHAINS 32
 
static hash_index_t conn_hash(unsigned long *key)
146,6 → 154,8
.remove_callback = conn_remove
};
 
/*************************************************/
 
/** Try to route a call to an appropriate connection thread
*
*/
156,12 → 166,12
link_t *hlp;
unsigned long key;
 
futex_down(&conn_futex);
futex_down(&async_futex);
 
key = call->in_phone_hash;
hlp = hash_table_find(&conn_hash_table, &key);
if (!hlp) {
futex_up(&conn_futex);
futex_up(&async_futex);
return 0;
}
conn = hash_table_get_instance(hlp, connection_t, link);
176,7 → 186,7
psthread_add_ready(conn->ptid);
}
 
futex_up(&conn_futex);
futex_up(&async_futex);
 
return 1;
}
188,7 → 198,7
ipc_callid_t callid;
connection_t *conn;
futex_down(&conn_futex);
futex_down(&async_futex);
 
conn = PS_connection;
/* If nothing in queue, wait until something appears */
203,7 → 213,7
*call = msg->call;
free(msg);
futex_up(&conn_futex);
futex_up(&async_futex);
return callid;
}
 
236,10 → 246,10
conn->cthread(conn->callid, &conn->call);
 
/* Remove myself from connection hash table */
futex_down(&conn_futex);
futex_down(&async_futex);
key = conn->in_phone_hash;
hash_table_remove(&conn_hash_table, &key, 1);
futex_up(&conn_futex);
futex_up(&async_futex);
/* Answer all remaining messages with ehangup */
while (!list_empty(&conn->msg_queue)) {
msg = list_get_instance(conn->msg_queue.next, msg_t, link);
288,10 → 298,10
return NULL;
}
key = conn->in_phone_hash;
futex_down(&conn_futex);
futex_down(&async_futex);
/* Add connection to hash table */
hash_table_insert(&conn_hash_table, &key, &conn->link);
futex_up(&conn_futex);
futex_up(&async_futex);
 
psthread_add_ready(conn->ptid);
 
298,7 → 308,7
return conn->ptid;
}
 
/** Handle call to a task */
/** Handle call that was received */
static void handle_call(ipc_callid_t callid, ipc_call_t *call)
{
if (route_call(callid, call))
324,7 → 334,7
 
while (1) {
if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
futex_up(&conn_futex); /* conn_futex is always held
futex_up(&async_futex); /* async_futex is always held
* when entering manager thread
*/
continue;
333,6 → 343,7
 
if (callid & IPC_CALLID_ANSWERED)
continue;
 
handle_call(callid, &call);
}
}
346,7 → 357,7
*/
static int async_manager_thread(void *arg)
{
futex_up(&conn_futex); /* conn_futex is always locked when entering
futex_up(&async_futex); /* async_futex is always locked when entering
* manager */
async_manager();
}
375,3 → 386,77
}
}
 
/** IPC handler for messages in async framework
*
* Notify thread that is waiting for this message, that it arrived
*/
static void reply_received(void *private, int retval,
ipc_call_t *data)
{
amsg_t *msg = (amsg_t *) private;
 
msg->retval = retval;
 
futex_down(&async_futex);
/* Copy data after futex_down, just in case the
* call was detached
*/
if (msg->dataptr)
*msg->dataptr = *data;
msg->done = 1;
if (! msg->active) {
msg->active = 1;
psthread_add_ready(msg->ptid);
}
futex_up(&async_futex);
}
 
/** Send message and return id of the sent message
*
* The return value can be used as input for async_wait() to wait
* for completion.
*/
aid_t async_send_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2,
ipc_call_t *dataptr)
{
amsg_t *msg;
 
msg = malloc(sizeof(*msg));
msg->active = 1;
msg->done = 0;
msg->dataptr = dataptr;
ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received);
 
return (aid_t) msg;
}
 
/** Wait for a message sent by async framework
*
* @param amsgid Message ID to wait for
* @param retval Pointer to variable where will be stored retval
* of the answered message. If NULL, it is ignored.
*
*/
void async_wait_for(aid_t amsgid, ipcarg_t *retval)
{
amsg_t *msg = (amsg_t *) amsgid;
connection_t *conn;
 
futex_down(&async_futex);
if (msg->done) {
futex_up(&async_futex);
goto done;
}
 
msg->ptid = psthread_get_id();
msg->active = 0;
/* Leave locked async_futex when entering this function */
psthread_schedule_next_adv(PS_TO_MANAGER);
/* futex is up automatically after psthread_schedule_next...*/
done:
if (retval)
*retval = msg->retval;
free(msg);
}