Subversion Repositories HelenOS-historic

Compare Revisions

Ignore whitespace Rev 1500 → Rev 1499

/uspace/trunk/libc/generic/async.c
98,21 → 98,15
static LIST_INITIALIZE(timeout_list);
 
typedef struct {
struct timeval expires; /**< Expiration time for waiting thread */
int inlist; /**< If true, this struct is in timeout list */
link_t link;
 
pstid_t ptid; /**< Thread waiting for this message */
int active; /**< If this thread is currently active */
int timedout; /**< If true, we timed out */
} awaiter_t;
 
typedef struct {
awaiter_t wdata;
 
int done; /**< If reply was received */
ipc_call_t *dataptr; /**< Pointer where the answer data
* is stored */
* should be stored */
struct timeval expires; /**< Expiration time for waiting thread */
int has_timeout; /**< If true, this struct is in timeout list */
link_t link;
 
ipcarg_t retval;
} amsg_t;
 
123,11 → 117,11
} msg_t;
 
typedef struct {
awaiter_t wdata;
 
link_t link; /**< Hash table link */
ipcarg_t in_phone_hash; /**< Incoming phone hash. */
link_t msg_queue; /**< Messages that should be delivered to this thread */
link_t link;
ipcarg_t in_phone_hash; /**< Incoming phone hash. */
link_t msg_queue; /**< Messages that should be delivered to this thread */
pstid_t ptid; /**< Thread associated with this connection */
int active; /**< If this thread is currently active */
/* Structures for connection opening packet */
ipc_callid_t callid;
ipc_call_t call;
214,27 → 208,6
.remove_callback = conn_remove
};
 
/** Insert sort timeout msg into timeouts list
*
* Assume async_futex is held
*/
static void insert_timeout(awaiter_t *wd)
{
link_t *tmp;
awaiter_t *cur;
 
wd->timedout = 0;
 
tmp = timeout_list.next;
while (tmp != &timeout_list) {
cur = list_get_instance(tmp, awaiter_t, link);
if (tv_gteq(&cur->expires, &wd->expires))
break;
tmp = tmp->next;
}
list_append(&wd->link, tmp);
}
 
/*************************************************/
 
/** Try to route a call to an appropriate connection thread
262,15 → 235,9
msg->call = *call;
list_append(&msg->link, &conn->msg_queue);
/* If the call is waiting for event, run it */
if (!conn->wdata.active) {
/* If in timeout list, remove it */
if (conn->wdata.inlist) {
conn->wdata.inlist = 0;
list_remove(&conn->wdata.link);
}
conn->wdata.active = 1;
psthread_add_ready(conn->wdata.ptid);
if (!conn->active) {
conn->active = 1;
psthread_add_ready(conn->ptid);
}
 
futex_up(&async_futex);
279,7 → 246,7
}
 
/** Return new incoming message for current(thread-local) connection */
ipc_callid_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs)
ipc_callid_t async_get_call(ipc_call_t *call)
{
msg_t *msg;
ipc_callid_t callid;
288,29 → 255,10
 
futex_down(&async_futex);
 
if (usecs) {
gettimeofday(&PS_connection->wdata.expires, NULL);
tv_add(&PS_connection->wdata.expires, usecs);
} else {
PS_connection->wdata.inlist = 0;
}
/* If nothing in queue, wait until something appears */
while (list_empty(&PS_connection->msg_queue)) {
if (usecs) {
PS_connection->wdata.inlist = 1;
insert_timeout(&PS_connection->wdata);
}
PS_connection->wdata.active = 0;
if (list_empty(&PS_connection->msg_queue)) {
PS_connection->active = 0;
psthread_schedule_next_adv(PS_TO_MANAGER);
/* Futex is up after getting back from async_manager
* get it again */
futex_down(&async_futex);
if (usecs && PS_connection->wdata.timedout && \
list_empty(&PS_connection->msg_queue)) {
/* If we timed out-> exit */
futex_up(&async_futex);
return 0;
}
}
msg = list_get_instance(PS_connection->msg_queue.next, msg_t, link);
402,27 → 350,27
}
conn->in_phone_hash = in_phone_hash;
list_initialize(&conn->msg_queue);
conn->ptid = psthread_create(connection_thread, conn);
conn->callid = callid;
if (call)
conn->call = *call;
conn->wdata.active = 1; /* We will activate it asap */
conn->active = 1; /* We will activate it asap */
conn->cthread = cthread;
 
conn->wdata.ptid = psthread_create(connection_thread, conn);
if (!conn->wdata.ptid) {
list_initialize(&conn->link);
if (!conn->ptid) {
free(conn);
ipc_answer_fast(callid, ENOMEM, 0, 0);
return NULL;
}
/* Add connection to hash table */
key = conn->in_phone_hash;
futex_down(&async_futex);
/* Add connection to hash table */
hash_table_insert(&conn_hash_table, &key, &conn->link);
futex_up(&async_futex);
 
psthread_add_ready(conn->wdata.ptid);
psthread_add_ready(conn->ptid);
 
return conn->wdata.ptid;
return conn->ptid;
}
 
/** Handle call that was received */
451,7 → 399,7
static void handle_expired_timeouts(void)
{
struct timeval tv;
awaiter_t *waiter;
amsg_t *amsg;
link_t *cur;
 
gettimeofday(&tv,NULL);
459,19 → 407,18
 
cur = timeout_list.next;
while (cur != &timeout_list) {
waiter = list_get_instance(cur,awaiter_t,link);
if (tv_gt(&waiter->expires, &tv))
amsg = list_get_instance(cur,amsg_t,link);
if (tv_gt(&amsg->expires, &tv))
break;
cur = cur->next;
list_remove(&waiter->link);
waiter->inlist = 0;
waiter->timedout = 1;
list_remove(&amsg->link);
amsg->has_timeout = 0;
/* Redundant condition? The thread should not
* be active when it gets here.
*/
if (!waiter->active) {
waiter->active = 1;
psthread_add_ready(waiter->ptid);
if (!amsg->active) {
amsg->active = 1;
psthread_add_ready(amsg->ptid);
}
}
 
484,7 → 431,7
ipc_call_t call;
ipc_callid_t callid;
int timeout;
awaiter_t *waiter;
amsg_t *amsg;
struct timeval tv;
 
while (1) {
496,13 → 443,13
}
futex_down(&async_futex);
if (!list_empty(&timeout_list)) {
waiter = list_get_instance(timeout_list.next,awaiter_t,link);
amsg = list_get_instance(timeout_list.next,amsg_t,link);
gettimeofday(&tv,NULL);
if (tv_gteq(&tv, &waiter->expires)) {
if (tv_gteq(&tv, &amsg->expires)) {
handle_expired_timeouts();
continue;
} else
timeout = tv_sub(&waiter->expires, &tv);
timeout = tv_sub(&amsg->expires, &tv);
} else
timeout = SYNCH_NO_TIMEOUT;
futex_up(&async_futex);
580,12 → 527,12
 
write_barrier();
/* Remove message from timeout list */
if (msg->wdata.inlist)
list_remove(&msg->wdata.link);
if (msg->has_timeout)
list_remove(&msg->link);
msg->done = 1;
if (! msg->wdata.active) {
msg->wdata.active = 1;
psthread_add_ready(msg->wdata.ptid);
if (! msg->active) {
msg->active = 1;
psthread_add_ready(msg->ptid);
}
futex_up(&async_futex);
}
601,11 → 548,9
amsg_t *msg;
 
msg = malloc(sizeof(*msg));
msg->active = 1;
msg->done = 0;
msg->dataptr = dataptr;
 
msg->wdata.active = 1; /* We may sleep in next method, but it
* will use it's own mechanism */
ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received);
 
return (aid_t) msg;
629,9 → 574,9
goto done;
}
 
msg->wdata.ptid = psthread_get_id();
msg->wdata.active = 0;
msg->wdata.inlist = 0;
msg->ptid = psthread_get_id();
msg->active = 0;
msg->has_timeout = 0;
/* Leave locked async_futex when entering this function */
psthread_schedule_next_adv(PS_TO_MANAGER);
/* futex is up automatically after psthread_schedule_next...*/
641,6 → 586,25
free(msg);
}
 
/** Insert sort timeout msg into timeouts list
*
* Assume async_futex is held
*/
static void insert_timeout(amsg_t *msg)
{
link_t *tmp;
amsg_t *cur;
 
tmp = timeout_list.next;
while (tmp != &timeout_list) {
cur = list_get_instance(tmp, amsg_t, link);
if (tv_gteq(&cur->expires, &msg->expires))
break;
tmp = tmp->next;
}
list_append(&msg->link, tmp);
}
 
/** Wait for a message sent by async framework with timeout
*
* @param amsgid Message ID to wait for
661,15 → 625,14
goto done;
}
 
gettimeofday(&msg->wdata.expires, NULL);
tv_add(&msg->wdata.expires, timeout);
msg->ptid = psthread_get_id();
msg->active = 0;
msg->has_timeout = 1;
 
msg->wdata.ptid = psthread_get_id();
msg->wdata.active = 0;
msg->wdata.inlist = 1;
gettimeofday(&msg->expires, NULL);
tv_add(&msg->expires, timeout);
insert_timeout(msg);
 
insert_timeout(&msg->wdata);
 
/* Leave locked async_futex when entering this function */
psthread_schedule_next_adv(PS_TO_MANAGER);
/* futex is up automatically after psthread_schedule_next...*/
697,15 → 660,15
if (!msg)
return;
 
msg->wdata.ptid = psthread_get_id();
msg->wdata.inlist = 1;
msg->wdata.active = 0;
msg->ptid = psthread_get_id();
msg->active = 0;
msg->has_timeout = 1;
 
gettimeofday(&msg->wdata.expires, NULL);
tv_add(&msg->wdata.expires, timeout);
gettimeofday(&msg->expires, NULL);
tv_add(&msg->expires, timeout);
 
futex_down(&async_futex);
insert_timeout(&msg->wdata);
insert_timeout(msg);
/* Leave locked async_futex when entering this function */
psthread_schedule_next_adv(PS_TO_MANAGER);
/* futex is up automatically after psthread_schedule_next...*/