98,15 → 98,21 |
static LIST_INITIALIZE(timeout_list); |
|
typedef struct { |
struct timeval expires; /**< Expiration time for waiting thread */ |
int inlist; /**< If true, this struct is in timeout list */ |
link_t link; |
|
pstid_t ptid; /**< Thread waiting for this message */ |
int active; /**< If this thread is currently active */ |
int timedout; /**< If true, we timed out */ |
} awaiter_t; |
|
typedef struct { |
awaiter_t wdata; |
|
int done; /**< If reply was received */ |
ipc_call_t *dataptr; /**< Pointer where the answer data |
* should be stored */ |
struct timeval expires; /**< Expiration time for waiting thread */ |
int has_timeout; /**< If true, this struct is in timeout list */ |
link_t link; |
|
* is stored */ |
ipcarg_t retval; |
} amsg_t; |
|
117,11 → 123,11 |
} msg_t; |
|
typedef struct { |
link_t link; |
ipcarg_t in_phone_hash; /**< Incoming phone hash. */ |
link_t msg_queue; /**< Messages that should be delivered to this thread */ |
pstid_t ptid; /**< Thread associated with this connection */ |
int active; /**< If this thread is currently active */ |
awaiter_t wdata; |
|
link_t link; /**< Hash table link */ |
ipcarg_t in_phone_hash; /**< Incoming phone hash. */ |
link_t msg_queue; /**< Messages that should be delivered to this thread */ |
/* Structures for connection opening packet */ |
ipc_callid_t callid; |
ipc_call_t call; |
208,6 → 214,27 |
.remove_callback = conn_remove |
}; |
|
/** Insert sort timeout msg into timeouts list |
* |
* Assume async_futex is held |
*/ |
static void insert_timeout(awaiter_t *wd) |
{ |
link_t *tmp; |
awaiter_t *cur; |
|
wd->timedout = 0; |
|
tmp = timeout_list.next; |
while (tmp != &timeout_list) { |
cur = list_get_instance(tmp, awaiter_t, link); |
if (tv_gteq(&cur->expires, &wd->expires)) |
break; |
tmp = tmp->next; |
} |
list_append(&wd->link, tmp); |
} |
|
/*************************************************/ |
|
/** Try to route a call to an appropriate connection thread |
235,9 → 262,15 |
msg->call = *call; |
list_append(&msg->link, &conn->msg_queue); |
|
if (!conn->active) { |
conn->active = 1; |
psthread_add_ready(conn->ptid); |
/* If the call is waiting for event, run it */ |
if (!conn->wdata.active) { |
/* If in timeout list, remove it */ |
if (conn->wdata.inlist) { |
conn->wdata.inlist = 0; |
list_remove(&conn->wdata.link); |
} |
conn->wdata.active = 1; |
psthread_add_ready(conn->wdata.ptid); |
} |
|
futex_up(&async_futex); |
246,7 → 279,7 |
} |
|
/** Return new incoming message for current(thread-local) connection */ |
ipc_callid_t async_get_call(ipc_call_t *call) |
ipc_callid_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs) |
{ |
msg_t *msg; |
ipc_callid_t callid; |
255,10 → 288,29 |
|
futex_down(&async_futex); |
|
if (usecs) { |
gettimeofday(&PS_connection->wdata.expires, NULL); |
tv_add(&PS_connection->wdata.expires, usecs); |
} else { |
PS_connection->wdata.inlist = 0; |
} |
/* If nothing in queue, wait until something appears */ |
if (list_empty(&PS_connection->msg_queue)) { |
PS_connection->active = 0; |
while (list_empty(&PS_connection->msg_queue)) { |
if (usecs) { |
PS_connection->wdata.inlist = 1; |
insert_timeout(&PS_connection->wdata); |
} |
PS_connection->wdata.active = 0; |
psthread_schedule_next_adv(PS_TO_MANAGER); |
/* Futex is up after getting back from async_manager |
* get it again */ |
futex_down(&async_futex); |
if (usecs && PS_connection->wdata.timedout && \ |
list_empty(&PS_connection->msg_queue)) { |
/* If we timed out-> exit */ |
futex_up(&async_futex); |
return 0; |
} |
} |
|
msg = list_get_instance(PS_connection->msg_queue.next, msg_t, link); |
350,27 → 402,27 |
} |
conn->in_phone_hash = in_phone_hash; |
list_initialize(&conn->msg_queue); |
conn->ptid = psthread_create(connection_thread, conn); |
conn->callid = callid; |
if (call) |
conn->call = *call; |
conn->active = 1; /* We will activate it asap */ |
conn->wdata.active = 1; /* We will activate it asap */ |
conn->cthread = cthread; |
list_initialize(&conn->link); |
if (!conn->ptid) { |
|
conn->wdata.ptid = psthread_create(connection_thread, conn); |
if (!conn->wdata.ptid) { |
free(conn); |
ipc_answer_fast(callid, ENOMEM, 0, 0); |
return NULL; |
} |
/* Add connection to hash table */ |
key = conn->in_phone_hash; |
futex_down(&async_futex); |
/* Add connection to hash table */ |
hash_table_insert(&conn_hash_table, &key, &conn->link); |
futex_up(&async_futex); |
|
psthread_add_ready(conn->ptid); |
psthread_add_ready(conn->wdata.ptid); |
|
return conn->ptid; |
return conn->wdata.ptid; |
} |
|
/** Handle call that was received */ |
399,7 → 451,7 |
static void handle_expired_timeouts(void) |
{ |
struct timeval tv; |
amsg_t *amsg; |
awaiter_t *waiter; |
link_t *cur; |
|
gettimeofday(&tv,NULL); |
407,18 → 459,19 |
|
cur = timeout_list.next; |
while (cur != &timeout_list) { |
amsg = list_get_instance(cur,amsg_t,link); |
if (tv_gt(&amsg->expires, &tv)) |
waiter = list_get_instance(cur,awaiter_t,link); |
if (tv_gt(&waiter->expires, &tv)) |
break; |
cur = cur->next; |
list_remove(&amsg->link); |
amsg->has_timeout = 0; |
list_remove(&waiter->link); |
waiter->inlist = 0; |
waiter->timedout = 1; |
/* Redundant condition? The thread should not |
* be active when it gets here. |
*/ |
if (!amsg->active) { |
amsg->active = 1; |
psthread_add_ready(amsg->ptid); |
if (!waiter->active) { |
waiter->active = 1; |
psthread_add_ready(waiter->ptid); |
} |
} |
|
431,7 → 484,7 |
ipc_call_t call; |
ipc_callid_t callid; |
int timeout; |
amsg_t *amsg; |
awaiter_t *waiter; |
struct timeval tv; |
|
while (1) { |
443,13 → 496,13 |
} |
futex_down(&async_futex); |
if (!list_empty(&timeout_list)) { |
amsg = list_get_instance(timeout_list.next,amsg_t,link); |
waiter = list_get_instance(timeout_list.next,awaiter_t,link); |
gettimeofday(&tv,NULL); |
if (tv_gteq(&tv, &amsg->expires)) { |
if (tv_gteq(&tv, &waiter->expires)) { |
handle_expired_timeouts(); |
continue; |
} else |
timeout = tv_sub(&amsg->expires, &tv); |
timeout = tv_sub(&waiter->expires, &tv); |
} else |
timeout = SYNCH_NO_TIMEOUT; |
futex_up(&async_futex); |
527,12 → 580,12 |
|
write_barrier(); |
/* Remove message from timeout list */ |
if (msg->has_timeout) |
list_remove(&msg->link); |
if (msg->wdata.inlist) |
list_remove(&msg->wdata.link); |
msg->done = 1; |
if (! msg->active) { |
msg->active = 1; |
psthread_add_ready(msg->ptid); |
if (! msg->wdata.active) { |
msg->wdata.active = 1; |
psthread_add_ready(msg->wdata.ptid); |
} |
futex_up(&async_futex); |
} |
548,9 → 601,11 |
amsg_t *msg; |
|
msg = malloc(sizeof(*msg)); |
msg->active = 1; |
msg->done = 0; |
msg->dataptr = dataptr; |
|
msg->wdata.active = 1; /* We may sleep in next method, but it |
* will use it's own mechanism */ |
ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received); |
|
return (aid_t) msg; |
574,9 → 629,9 |
goto done; |
} |
|
msg->ptid = psthread_get_id(); |
msg->active = 0; |
msg->has_timeout = 0; |
msg->wdata.ptid = psthread_get_id(); |
msg->wdata.active = 0; |
msg->wdata.inlist = 0; |
/* Leave locked async_futex when entering this function */ |
psthread_schedule_next_adv(PS_TO_MANAGER); |
/* futex is up automatically after psthread_schedule_next...*/ |
586,25 → 641,6 |
free(msg); |
} |
|
/** Insert sort timeout msg into timeouts list |
* |
* Assume async_futex is held |
*/ |
static void insert_timeout(amsg_t *msg) |
{ |
link_t *tmp; |
amsg_t *cur; |
|
tmp = timeout_list.next; |
while (tmp != &timeout_list) { |
cur = list_get_instance(tmp, amsg_t, link); |
if (tv_gteq(&cur->expires, &msg->expires)) |
break; |
tmp = tmp->next; |
} |
list_append(&msg->link, tmp); |
} |
|
/** Wait for a message sent by async framework with timeout |
* |
* @param amsgid Message ID to wait for |
625,14 → 661,15 |
goto done; |
} |
|
msg->ptid = psthread_get_id(); |
msg->active = 0; |
msg->has_timeout = 1; |
gettimeofday(&msg->wdata.expires, NULL); |
tv_add(&msg->wdata.expires, timeout); |
|
gettimeofday(&msg->expires, NULL); |
tv_add(&msg->expires, timeout); |
insert_timeout(msg); |
msg->wdata.ptid = psthread_get_id(); |
msg->wdata.active = 0; |
msg->wdata.inlist = 1; |
|
insert_timeout(&msg->wdata); |
|
/* Leave locked async_futex when entering this function */ |
psthread_schedule_next_adv(PS_TO_MANAGER); |
/* futex is up automatically after psthread_schedule_next...*/ |
660,15 → 697,15 |
if (!msg) |
return; |
|
msg->ptid = psthread_get_id(); |
msg->active = 0; |
msg->has_timeout = 1; |
msg->wdata.ptid = psthread_get_id(); |
msg->wdata.inlist = 1; |
msg->wdata.active = 0; |
|
gettimeofday(&msg->expires, NULL); |
tv_add(&msg->expires, timeout); |
gettimeofday(&msg->wdata.expires, NULL); |
tv_add(&msg->wdata.expires, timeout); |
|
futex_down(&async_futex); |
insert_timeout(msg); |
insert_timeout(&msg->wdata); |
/* Leave locked async_futex when entering this function */ |
psthread_schedule_next_adv(PS_TO_MANAGER); |
/* futex is up automatically after psthread_schedule_next...*/ |