Subversion Repositories HelenOS-historic

Compare Revisions

Ignore whitespace Rev 1499 → Rev 1500

/uspace/trunk/libc/generic/async.c
98,15 → 98,21
static LIST_INITIALIZE(timeout_list);
 
typedef struct {
struct timeval expires; /**< Expiration time for waiting thread */
int inlist; /**< If true, this struct is in timeout list */
link_t link;
 
pstid_t ptid; /**< Thread waiting for this message */
int active; /**< If this thread is currently active */
int timedout; /**< If true, we timed out */
} awaiter_t;
 
typedef struct {
awaiter_t wdata;
 
int done; /**< If reply was received */
ipc_call_t *dataptr; /**< Pointer where the answer data
* should be stored */
struct timeval expires; /**< Expiration time for waiting thread */
int has_timeout; /**< If true, this struct is in timeout list */
link_t link;
 
* is stored */
ipcarg_t retval;
} amsg_t;
 
117,11 → 123,11
} msg_t;
 
typedef struct {
link_t link;
ipcarg_t in_phone_hash; /**< Incoming phone hash. */
link_t msg_queue; /**< Messages that should be delivered to this thread */
pstid_t ptid; /**< Thread associated with this connection */
int active; /**< If this thread is currently active */
awaiter_t wdata;
 
link_t link; /**< Hash table link */
ipcarg_t in_phone_hash; /**< Incoming phone hash. */
link_t msg_queue; /**< Messages that should be delivered to this thread */
/* Structures for connection opening packet */
ipc_callid_t callid;
ipc_call_t call;
208,6 → 214,27
.remove_callback = conn_remove
};
 
/** Insert sort timeout msg into timeouts list
*
* Assume async_futex is held
*/
static void insert_timeout(awaiter_t *wd)
{
link_t *tmp;
awaiter_t *cur;
 
wd->timedout = 0;
 
tmp = timeout_list.next;
while (tmp != &timeout_list) {
cur = list_get_instance(tmp, awaiter_t, link);
if (tv_gteq(&cur->expires, &wd->expires))
break;
tmp = tmp->next;
}
list_append(&wd->link, tmp);
}
 
/*************************************************/
 
/** Try to route a call to an appropriate connection thread
235,9 → 262,15
msg->call = *call;
list_append(&msg->link, &conn->msg_queue);
if (!conn->active) {
conn->active = 1;
psthread_add_ready(conn->ptid);
/* If the call is waiting for event, run it */
if (!conn->wdata.active) {
/* If in timeout list, remove it */
if (conn->wdata.inlist) {
conn->wdata.inlist = 0;
list_remove(&conn->wdata.link);
}
conn->wdata.active = 1;
psthread_add_ready(conn->wdata.ptid);
}
 
futex_up(&async_futex);
246,7 → 279,7
}
 
/** Return new incoming message for current(thread-local) connection */
ipc_callid_t async_get_call(ipc_call_t *call)
ipc_callid_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs)
{
msg_t *msg;
ipc_callid_t callid;
255,10 → 288,29
 
futex_down(&async_futex);
 
if (usecs) {
gettimeofday(&PS_connection->wdata.expires, NULL);
tv_add(&PS_connection->wdata.expires, usecs);
} else {
PS_connection->wdata.inlist = 0;
}
/* If nothing in queue, wait until something appears */
if (list_empty(&PS_connection->msg_queue)) {
PS_connection->active = 0;
while (list_empty(&PS_connection->msg_queue)) {
if (usecs) {
PS_connection->wdata.inlist = 1;
insert_timeout(&PS_connection->wdata);
}
PS_connection->wdata.active = 0;
psthread_schedule_next_adv(PS_TO_MANAGER);
/* Futex is up after getting back from async_manager
* get it again */
futex_down(&async_futex);
if (usecs && PS_connection->wdata.timedout && \
list_empty(&PS_connection->msg_queue)) {
/* If we timed out-> exit */
futex_up(&async_futex);
return 0;
}
}
msg = list_get_instance(PS_connection->msg_queue.next, msg_t, link);
350,27 → 402,27
}
conn->in_phone_hash = in_phone_hash;
list_initialize(&conn->msg_queue);
conn->ptid = psthread_create(connection_thread, conn);
conn->callid = callid;
if (call)
conn->call = *call;
conn->active = 1; /* We will activate it asap */
conn->wdata.active = 1; /* We will activate it asap */
conn->cthread = cthread;
list_initialize(&conn->link);
if (!conn->ptid) {
 
conn->wdata.ptid = psthread_create(connection_thread, conn);
if (!conn->wdata.ptid) {
free(conn);
ipc_answer_fast(callid, ENOMEM, 0, 0);
return NULL;
}
/* Add connection to hash table */
key = conn->in_phone_hash;
futex_down(&async_futex);
/* Add connection to hash table */
hash_table_insert(&conn_hash_table, &key, &conn->link);
futex_up(&async_futex);
 
psthread_add_ready(conn->ptid);
psthread_add_ready(conn->wdata.ptid);
 
return conn->ptid;
return conn->wdata.ptid;
}
 
/** Handle call that was received */
399,7 → 451,7
static void handle_expired_timeouts(void)
{
struct timeval tv;
amsg_t *amsg;
awaiter_t *waiter;
link_t *cur;
 
gettimeofday(&tv,NULL);
407,18 → 459,19
 
cur = timeout_list.next;
while (cur != &timeout_list) {
amsg = list_get_instance(cur,amsg_t,link);
if (tv_gt(&amsg->expires, &tv))
waiter = list_get_instance(cur,awaiter_t,link);
if (tv_gt(&waiter->expires, &tv))
break;
cur = cur->next;
list_remove(&amsg->link);
amsg->has_timeout = 0;
list_remove(&waiter->link);
waiter->inlist = 0;
waiter->timedout = 1;
/* Redundant condition? The thread should not
* be active when it gets here.
*/
if (!amsg->active) {
amsg->active = 1;
psthread_add_ready(amsg->ptid);
if (!waiter->active) {
waiter->active = 1;
psthread_add_ready(waiter->ptid);
}
}
 
431,7 → 484,7
ipc_call_t call;
ipc_callid_t callid;
int timeout;
amsg_t *amsg;
awaiter_t *waiter;
struct timeval tv;
 
while (1) {
443,13 → 496,13
}
futex_down(&async_futex);
if (!list_empty(&timeout_list)) {
amsg = list_get_instance(timeout_list.next,amsg_t,link);
waiter = list_get_instance(timeout_list.next,awaiter_t,link);
gettimeofday(&tv,NULL);
if (tv_gteq(&tv, &amsg->expires)) {
if (tv_gteq(&tv, &waiter->expires)) {
handle_expired_timeouts();
continue;
} else
timeout = tv_sub(&amsg->expires, &tv);
timeout = tv_sub(&waiter->expires, &tv);
} else
timeout = SYNCH_NO_TIMEOUT;
futex_up(&async_futex);
527,12 → 580,12
 
write_barrier();
/* Remove message from timeout list */
if (msg->has_timeout)
list_remove(&msg->link);
if (msg->wdata.inlist)
list_remove(&msg->wdata.link);
msg->done = 1;
if (! msg->active) {
msg->active = 1;
psthread_add_ready(msg->ptid);
if (! msg->wdata.active) {
msg->wdata.active = 1;
psthread_add_ready(msg->wdata.ptid);
}
futex_up(&async_futex);
}
548,9 → 601,11
amsg_t *msg;
 
msg = malloc(sizeof(*msg));
msg->active = 1;
msg->done = 0;
msg->dataptr = dataptr;
 
msg->wdata.active = 1; /* We may sleep in next method, but it
* will use it's own mechanism */
ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received);
 
return (aid_t) msg;
574,9 → 629,9
goto done;
}
 
msg->ptid = psthread_get_id();
msg->active = 0;
msg->has_timeout = 0;
msg->wdata.ptid = psthread_get_id();
msg->wdata.active = 0;
msg->wdata.inlist = 0;
/* Leave locked async_futex when entering this function */
psthread_schedule_next_adv(PS_TO_MANAGER);
/* futex is up automatically after psthread_schedule_next...*/
586,25 → 641,6
free(msg);
}
 
/** Insert sort timeout msg into timeouts list
*
* Assume async_futex is held
*/
static void insert_timeout(amsg_t *msg)
{
link_t *tmp;
amsg_t *cur;
 
tmp = timeout_list.next;
while (tmp != &timeout_list) {
cur = list_get_instance(tmp, amsg_t, link);
if (tv_gteq(&cur->expires, &msg->expires))
break;
tmp = tmp->next;
}
list_append(&msg->link, tmp);
}
 
/** Wait for a message sent by async framework with timeout
*
* @param amsgid Message ID to wait for
625,14 → 661,15
goto done;
}
 
msg->ptid = psthread_get_id();
msg->active = 0;
msg->has_timeout = 1;
gettimeofday(&msg->wdata.expires, NULL);
tv_add(&msg->wdata.expires, timeout);
 
gettimeofday(&msg->expires, NULL);
tv_add(&msg->expires, timeout);
insert_timeout(msg);
msg->wdata.ptid = psthread_get_id();
msg->wdata.active = 0;
msg->wdata.inlist = 1;
 
insert_timeout(&msg->wdata);
 
/* Leave locked async_futex when entering this function */
psthread_schedule_next_adv(PS_TO_MANAGER);
/* futex is up automatically after psthread_schedule_next...*/
660,15 → 697,15
if (!msg)
return;
 
msg->ptid = psthread_get_id();
msg->active = 0;
msg->has_timeout = 1;
msg->wdata.ptid = psthread_get_id();
msg->wdata.inlist = 1;
msg->wdata.active = 0;
 
gettimeofday(&msg->expires, NULL);
tv_add(&msg->expires, timeout);
gettimeofday(&msg->wdata.expires, NULL);
tv_add(&msg->wdata.expires, timeout);
 
futex_down(&async_futex);
insert_timeout(msg);
insert_timeout(&msg->wdata);
/* Leave locked async_futex when entering this function */
psthread_schedule_next_adv(PS_TO_MANAGER);
/* futex is up automatically after psthread_schedule_next...*/