90,9 → 90,12 |
#include <ipc/ipc.h> |
#include <assert.h> |
#include <errno.h> |
#include <time.h> |
#include <arch/barrier.h> |
|
static atomic_t async_futex = FUTEX_INITIALIZER; |
static hash_table_t conn_hash_table; |
static LIST_INITIALIZE(timeout_list); |
|
typedef struct { |
pstid_t ptid; /**< Thread waiting for this message */ |
100,6 → 103,10 |
int done; /**< If reply was received */ |
ipc_call_t *dataptr; /**< Pointer where the answer data |
* should be stored */ |
struct timeval expires; /**< Expiration time for waiting thread */ |
int has_timeout; /**< If true, this struct is in timeout list */ |
link_t link; |
|
ipcarg_t retval; |
} amsg_t; |
|
123,6 → 130,41 |
|
__thread connection_t *PS_connection; |
|
/** Add microseconds to give timeval */ |
static void tv_add(struct timeval *tv, suseconds_t usecs) |
{ |
tv->tv_sec += usecs / 1000000; |
tv->tv_usec += usecs % 1000000; |
if (tv->tv_usec > 1000000) { |
tv->tv_sec++; |
tv->tv_usec -= 1000000; |
} |
} |
|
/** Subtract 2 timevals, return microseconds difference */ |
static suseconds_t tv_sub(struct timeval *tv1, struct timeval *tv2) |
{ |
suseconds_t result; |
|
result = tv1->tv_usec - tv2->tv_usec; |
result += (tv1->tv_sec - tv2->tv_sec) * 1000000; |
|
return result; |
} |
|
/** Compare timeval |
* |
* @return 1 if tv1 > tv2, otherwise 0 |
*/ |
static int tv_gt(struct timeval *tv1, struct timeval *tv2) |
{ |
if (tv1->tv_sec > tv2->tv_sec) |
return 1; |
if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec > tv2->tv_usec) |
return 1; |
return 0; |
} |
|
/* Hash table functions */ |
#define CONN_HASH_TABLE_CHAINS 32 |
|
326,12 → 368,44 |
} |
} |
|
/** Fire all timeouts that expired */ |
static void handle_expired_timeouts(void) |
{ |
struct timeval tv; |
amsg_t *amsg; |
link_t *cur; |
|
gettimeofday(&tv,NULL); |
futex_down(&async_futex); |
|
cur = timeout_list.next; |
while (cur != &timeout_list) { |
amsg = list_get_instance(cur,amsg_t,link); |
if (tv_gt(&amsg->expires, &tv)) |
break; |
cur = cur->next; |
list_remove(&amsg->link); |
amsg->has_timeout = 0; |
/* Redundant condition? The thread should not |
* be active when it gets here. |
*/ |
if (!amsg->active) { |
amsg->active = 1; |
psthread_add_ready(amsg->ptid); |
} |
} |
|
futex_up(&async_futex); |
} |
|
/** Endless loop dispatching incoming calls and answers */ |
int async_manager() |
int async_manager(void) |
{ |
ipc_call_t call; |
ipc_callid_t callid; |
int timeout; |
amsg_t *amsg; |
struct timeval tv; |
|
while (1) { |
if (psthread_schedule_next_adv(PS_FROM_MANAGER)) { |
340,16 → 414,23 |
*/ |
continue; |
} |
/* |
if (expires) |
timeout = .... ; |
else |
*/ |
futex_down(&async_futex); |
if (!list_empty(&timeout_list)) { |
amsg = list_get_instance(timeout_list.next,amsg_t,link); |
gettimeofday(&tv,NULL); |
if (tv_gt(&tv, &amsg->expires)) { |
handle_expired_timeouts(); |
continue; |
} else |
timeout = tv_sub(&amsg->expires, &tv); |
} else |
timeout = SYNCH_NO_TIMEOUT; |
futex_up(&async_futex); |
|
callid = ipc_wait_cycle(&call, timeout, SYNCH_BLOCKING); |
|
if (!callid) { |
// handle_expired_timeouts.......; |
handle_expired_timeouts(); |
continue; |
} |
|
417,7 → 498,10 |
if (msg->dataptr) |
*msg->dataptr = *data; |
|
/* TODO: memory barrier?? */ |
write_barrier(); |
/* Remove message from timeout list */ |
if (msg->has_timeout) |
list_remove(&msg->link); |
msg->done = 1; |
if (! msg->active) { |
msg->active = 1; |
465,6 → 549,7 |
|
msg->ptid = psthread_get_id(); |
msg->active = 0; |
msg->has_timeout = 0; |
/* Leave locked async_futex when entering this function */ |
psthread_schedule_next_adv(PS_TO_MANAGER); |
/* futex is up automatically after psthread_schedule_next...*/ |
474,37 → 559,65 |
free(msg); |
} |
|
/** Insert sort timeout msg into timeouts list |
* |
* Assume async_futex is held |
*/ |
static void insert_timeout(amsg_t *msg) |
{ |
link_t *tmp; |
amsg_t *cur; |
|
/* int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, int timeout) */ |
/* { */ |
/* amsg_t *msg = (amsg_t *) amsgid; */ |
/* connection_t *conn; */ |
tmp = timeout_list.next; |
while (tmp != &timeout_list) { |
cur = list_get_instance(tmp, amsg_t, link); |
if (tv_gt(&cur->expires, &msg->expires)) |
break; |
tmp = tmp->next; |
} |
list_append(&msg->link, tmp); |
} |
|
/* futex_down(&async_futex); */ |
/* if (msg->done) { */ |
/* futex_up(&async_futex); */ |
/* goto done; */ |
/* } */ |
/** Wait for a message sent by async framework with timeout |
* |
* @param amsgid Message ID to wait for |
* @param retval Pointer to variable where will be stored retval |
* of the answered message. If NULL, it is ignored. |
* @param timeout Timeout in usecs |
* @return 0 on success, ETIMEOUT if timeout expired |
* |
*/ |
int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, suseconds_t timeout) |
{ |
amsg_t *msg = (amsg_t *) amsgid; |
connection_t *conn; |
|
/* msg->ptid = psthread_get_id(); */ |
/* msg->active = 0; */ |
/* msg->expires = gettime() + timeout; */ |
/* setup_timeouts_etc...(); */ |
futex_down(&async_futex); |
if (msg->done) { |
futex_up(&async_futex); |
goto done; |
} |
|
/* /\* Leave locked async_futex when entering this function *\/ */ |
/* psthread_schedule_next_adv(PS_TO_MANAGER); */ |
/* /\* futex is up automatically after psthread_schedule_next...*\/ */ |
msg->ptid = psthread_get_id(); |
msg->active = 0; |
msg->has_timeout = 1; |
|
/* if (!msg->done) */ |
/* return casy-casy; */ |
gettimeofday(&msg->expires, NULL); |
tv_add(&msg->expires, timeout); |
insert_timeout(msg); |
|
/* /\* TODO: When memory barrier in reply_received, we can skip this *\/ */ |
/* futex_down(&async_futex); */ |
/* futex_up(&async_futex); */ |
/* done: */ |
/* Leave locked async_futex when entering this function */ |
psthread_schedule_next_adv(PS_TO_MANAGER); |
/* futex is up automatically after psthread_schedule_next...*/ |
|
/* if (retval) */ |
/* *retval = msg->retval; */ |
/* free(msg); */ |
/* } */ |
if (!msg->done) |
return ETIMEOUT; |
|
done: |
if (retval) |
*retval = msg->retval; |
free(msg); |
|
return 0; |
} |
|