Subversion Repositories HelenOS-historic

Compare Revisions

Ignore whitespace Rev 1440 → Rev 1441

/uspace/trunk/libc/generic/async.c
90,9 → 90,12
#include <ipc/ipc.h>
#include <assert.h>
#include <errno.h>
#include <time.h>
#include <arch/barrier.h>
 
static atomic_t async_futex = FUTEX_INITIALIZER;
static hash_table_t conn_hash_table;
static LIST_INITIALIZE(timeout_list);
 
typedef struct {
pstid_t ptid; /**< Thread waiting for this message */
100,6 → 103,10
int done; /**< If reply was received */
ipc_call_t *dataptr; /**< Pointer where the answer data
* should be stored */
struct timeval expires; /**< Expiration time for waiting thread */
int has_timeout; /**< If true, this struct is in timeout list */
link_t link;
 
ipcarg_t retval;
} amsg_t;
 
123,6 → 130,41
 
__thread connection_t *PS_connection;
 
/** Add microseconds to give timeval */
static void tv_add(struct timeval *tv, suseconds_t usecs)
{
tv->tv_sec += usecs / 1000000;
tv->tv_usec += usecs % 1000000;
if (tv->tv_usec > 1000000) {
tv->tv_sec++;
tv->tv_usec -= 1000000;
}
}
 
/** Subtract 2 timevals, return microseconds difference */
static suseconds_t tv_sub(struct timeval *tv1, struct timeval *tv2)
{
suseconds_t result;
 
result = tv1->tv_usec - tv2->tv_usec;
result += (tv1->tv_sec - tv2->tv_sec) * 1000000;
 
return result;
}
 
/** Compare timeval
*
* @return 1 if tv1 > tv2, otherwise 0
*/
static int tv_gt(struct timeval *tv1, struct timeval *tv2)
{
if (tv1->tv_sec > tv2->tv_sec)
return 1;
if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec > tv2->tv_usec)
return 1;
return 0;
}
 
/* Hash table functions */
#define CONN_HASH_TABLE_CHAINS 32
 
326,12 → 368,44
}
}
 
/** Fire all timeouts that expired */
static void handle_expired_timeouts(void)
{
struct timeval tv;
amsg_t *amsg;
link_t *cur;
 
gettimeofday(&tv,NULL);
futex_down(&async_futex);
 
cur = timeout_list.next;
while (cur != &timeout_list) {
amsg = list_get_instance(cur,amsg_t,link);
if (tv_gt(&amsg->expires, &tv))
break;
cur = cur->next;
list_remove(&amsg->link);
amsg->has_timeout = 0;
/* Redundant condition? The thread should not
* be active when it gets here.
*/
if (!amsg->active) {
amsg->active = 1;
psthread_add_ready(amsg->ptid);
}
}
 
futex_up(&async_futex);
}
 
/** Endless loop dispatching incoming calls and answers */
int async_manager()
int async_manager(void)
{
ipc_call_t call;
ipc_callid_t callid;
int timeout;
amsg_t *amsg;
struct timeval tv;
 
while (1) {
if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
340,16 → 414,23
*/
continue;
}
/*
if (expires)
timeout = .... ;
else
*/
futex_down(&async_futex);
if (!list_empty(&timeout_list)) {
amsg = list_get_instance(timeout_list.next,amsg_t,link);
gettimeofday(&tv,NULL);
if (tv_gt(&tv, &amsg->expires)) {
handle_expired_timeouts();
continue;
} else
timeout = tv_sub(&amsg->expires, &tv);
} else
timeout = SYNCH_NO_TIMEOUT;
futex_up(&async_futex);
 
callid = ipc_wait_cycle(&call, timeout, SYNCH_BLOCKING);
 
if (!callid) {
// handle_expired_timeouts.......;
handle_expired_timeouts();
continue;
}
 
417,7 → 498,10
if (msg->dataptr)
*msg->dataptr = *data;
 
/* TODO: memory barrier?? */
write_barrier();
/* Remove message from timeout list */
if (msg->has_timeout)
list_remove(&msg->link);
msg->done = 1;
if (! msg->active) {
msg->active = 1;
465,6 → 549,7
 
msg->ptid = psthread_get_id();
msg->active = 0;
msg->has_timeout = 0;
/* Leave locked async_futex when entering this function */
psthread_schedule_next_adv(PS_TO_MANAGER);
/* futex is up automatically after psthread_schedule_next...*/
474,37 → 559,65
free(msg);
}
 
/** Insert sort timeout msg into timeouts list
*
* Assume async_futex is held
*/
static void insert_timeout(amsg_t *msg)
{
link_t *tmp;
amsg_t *cur;
 
/* int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, int timeout) */
/* { */
/* amsg_t *msg = (amsg_t *) amsgid; */
/* connection_t *conn; */
tmp = timeout_list.next;
while (tmp != &timeout_list) {
cur = list_get_instance(tmp, amsg_t, link);
if (tv_gt(&cur->expires, &msg->expires))
break;
tmp = tmp->next;
}
list_append(&msg->link, tmp);
}
 
/* futex_down(&async_futex); */
/* if (msg->done) { */
/* futex_up(&async_futex); */
/* goto done; */
/* } */
/** Wait for a message sent by async framework with timeout
*
* @param amsgid Message ID to wait for
* @param retval Pointer to variable where will be stored retval
* of the answered message. If NULL, it is ignored.
* @param timeout Timeout in usecs
* @return 0 on success, ETIMEOUT if timeout expired
*
*/
int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, suseconds_t timeout)
{
amsg_t *msg = (amsg_t *) amsgid;
connection_t *conn;
 
/* msg->ptid = psthread_get_id(); */
/* msg->active = 0; */
/* msg->expires = gettime() + timeout; */
/* setup_timeouts_etc...(); */
futex_down(&async_futex);
if (msg->done) {
futex_up(&async_futex);
goto done;
}
 
/* /\* Leave locked async_futex when entering this function *\/ */
/* psthread_schedule_next_adv(PS_TO_MANAGER); */
/* /\* futex is up automatically after psthread_schedule_next...*\/ */
msg->ptid = psthread_get_id();
msg->active = 0;
msg->has_timeout = 1;
 
/* if (!msg->done) */
/* return casy-casy; */
gettimeofday(&msg->expires, NULL);
tv_add(&msg->expires, timeout);
insert_timeout(msg);
 
/* /\* TODO: When memory barrier in reply_received, we can skip this *\/ */
/* futex_down(&async_futex); */
/* futex_up(&async_futex); */
/* done: */
/* if (retval) */
/* *retval = msg->retval; */
/* free(msg); */
/* } */
/* Leave locked async_futex when entering this function */
psthread_schedule_next_adv(PS_TO_MANAGER);
/* futex is up automatically after psthread_schedule_next...*/
 
if (!msg->done)
return ETIMEOUT;
 
done:
if (retval)
*retval = msg->retval;
free(msg);
 
return 0;
}