91,10 → 91,19 |
#include <assert.h> |
#include <errno.h> |
|
static atomic_t conn_futex = FUTEX_INITIALIZER; |
static atomic_t async_futex = FUTEX_INITIALIZER; |
static hash_table_t conn_hash_table; |
|
typedef struct { |
pstid_t ptid; /**< Thread waiting for this message */ |
int active; /**< If this thread is currently active */ |
int done; /**< If reply was received */ |
ipc_call_t *dataptr; /**< Pointer where the answer data |
* should be stored */ |
ipcarg_t retval; |
} amsg_t; |
|
typedef struct { |
link_t link; |
ipc_callid_t callid; |
ipc_call_t call; |
115,7 → 124,6 |
__thread connection_t *PS_connection; |
|
/* Hash table functions */ |
|
#define CONN_HASH_TABLE_CHAINS 32 |
|
static hash_index_t conn_hash(unsigned long *key) |
146,6 → 154,8 |
.remove_callback = conn_remove |
}; |
|
/*************************************************/ |
|
/** Try to route a call to an appropriate connection thread |
* |
*/ |
156,12 → 166,12 |
link_t *hlp; |
unsigned long key; |
|
futex_down(&conn_futex); |
futex_down(&async_futex); |
|
key = call->in_phone_hash; |
hlp = hash_table_find(&conn_hash_table, &key); |
if (!hlp) { |
futex_up(&conn_futex); |
futex_up(&async_futex); |
return 0; |
} |
conn = hash_table_get_instance(hlp, connection_t, link); |
176,7 → 186,7 |
psthread_add_ready(conn->ptid); |
} |
|
futex_up(&conn_futex); |
futex_up(&async_futex); |
|
return 1; |
} |
188,7 → 198,7 |
ipc_callid_t callid; |
connection_t *conn; |
|
futex_down(&conn_futex); |
futex_down(&async_futex); |
|
conn = PS_connection; |
/* If nothing in queue, wait until something appears */ |
203,7 → 213,7 |
*call = msg->call; |
free(msg); |
|
futex_up(&conn_futex); |
futex_up(&async_futex); |
return callid; |
} |
|
236,10 → 246,10 |
conn->cthread(conn->callid, &conn->call); |
|
/* Remove myself from connection hash table */ |
futex_down(&conn_futex); |
futex_down(&async_futex); |
key = conn->in_phone_hash; |
hash_table_remove(&conn_hash_table, &key, 1); |
futex_up(&conn_futex); |
futex_up(&async_futex); |
/* Answer all remaining messages with ehangup */ |
while (!list_empty(&conn->msg_queue)) { |
msg = list_get_instance(conn->msg_queue.next, msg_t, link); |
288,10 → 298,10 |
return NULL; |
} |
key = conn->in_phone_hash; |
futex_down(&conn_futex); |
futex_down(&async_futex); |
/* Add connection to hash table */ |
hash_table_insert(&conn_hash_table, &key, &conn->link); |
futex_up(&conn_futex); |
futex_up(&async_futex); |
|
psthread_add_ready(conn->ptid); |
|
298,7 → 308,7 |
return conn->ptid; |
} |
|
/** Handle call to a task */ |
/** Handle call that was received */ |
static void handle_call(ipc_callid_t callid, ipc_call_t *call) |
{ |
if (route_call(callid, call)) |
324,7 → 334,7 |
|
while (1) { |
if (psthread_schedule_next_adv(PS_FROM_MANAGER)) { |
futex_up(&conn_futex); /* conn_futex is always held |
futex_up(&async_futex); /* async_futex is always held |
* when entering manager thread |
*/ |
continue; |
333,6 → 343,7 |
|
if (callid & IPC_CALLID_ANSWERED) |
continue; |
|
handle_call(callid, &call); |
} |
} |
346,7 → 357,7 |
*/ |
static int async_manager_thread(void *arg) |
{ |
futex_up(&conn_futex); /* conn_futex is always locked when entering |
futex_up(&async_futex); /* async_futex is always locked when entering |
* manager */ |
async_manager(); |
} |
375,3 → 386,77 |
} |
|
} |
|
/** IPC handler for messages in async framework |
* |
* Notify thread that is waiting for this message, that it arrived |
*/ |
static void reply_received(void *private, int retval, |
ipc_call_t *data) |
{ |
amsg_t *msg = (amsg_t *) private; |
|
msg->retval = retval; |
|
futex_down(&async_futex); |
/* Copy data after futex_down, just in case the |
* call was detached |
*/ |
if (msg->dataptr) |
*msg->dataptr = *data; |
|
msg->done = 1; |
if (! msg->active) { |
msg->active = 1; |
psthread_add_ready(msg->ptid); |
} |
futex_up(&async_futex); |
} |
|
/** Send message and return id of the sent message |
* |
* The return value can be used as input for async_wait() to wait |
* for completion. |
*/ |
aid_t async_send_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2, |
ipc_call_t *dataptr) |
{ |
amsg_t *msg; |
|
msg = malloc(sizeof(*msg)); |
msg->active = 1; |
msg->done = 0; |
msg->dataptr = dataptr; |
ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received); |
|
return (aid_t) msg; |
} |
|
/** Wait for a message sent by async framework |
* |
* @param amsgid Message ID to wait for |
* @param retval Pointer to variable where will be stored retval |
* of the answered message. If NULL, it is ignored. |
* |
*/ |
void async_wait_for(aid_t amsgid, ipcarg_t *retval) |
{ |
amsg_t *msg = (amsg_t *) amsgid; |
connection_t *conn; |
|
futex_down(&async_futex); |
if (msg->done) { |
futex_up(&async_futex); |
goto done; |
} |
|
msg->ptid = psthread_get_id(); |
msg->active = 0; |
/* Leave locked async_futex when entering this function */ |
psthread_schedule_next_adv(PS_TO_MANAGER); |
/* futex is up automatically after psthread_schedule_next...*/ |
done: |
if (retval) |
*retval = msg->retval; |
free(msg); |
} |