Subversion Repositories HelenOS

Compare Revisions

Ignore whitespace Rev 1391 → Rev 1392

/uspace/trunk/libc/generic/async.c/async.c
26,52 → 26,305
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
 
ipc_wait_t call_func(args)
/**
* Asynchronous library
*
* The aim of this library is facilitating writing programs utilizing
* the asynchronous nature of Helenos IPC, yet using a normal way
* of programming.
*
* You should be able to write very simple multithreaded programs,
* the async framework will automatically take care of most synchronization
* problems.
*
* Default semantics:
* - send() - send asynchronously. If the kernel refuses to send more
* messages, [ try to get responses from kernel, if nothing
* found, might try synchronous ]
*
* Example of use:
*
* 1) Multithreaded client application
* create_thread(thread1);
* create_thread(thread2);
* ...
*
* thread1() {
* conn = ipc_connect_me_to();
* c1 = send(conn);
* c2 = send(conn);
* wait_for(c1);
* wait_for(c2);
* }
*
*
* 2) Multithreaded server application
* main() {
* wait_for_connection(new_connection);
* }
*
*
* new_connection(int connection) {
* accept(connection);
* msg = get_msg();
* handle(msg);
* answer(msg);
*
* msg = get_msg();
* ....
* }
*/
#include <futex.h>
#include <async.h>
#include <psthread.h>
#include <stdio.h>
#include <libadt/hash_table.h>
#include <libadt/list.h>
#include <ipc/ipc.h>
#include <assert.h>
#include <errno.h>
 
static atomic_t conn_futex = FUTEX_INITIALIZER;
static hash_table_t conn_hash_table;
 
typedef struct {
link_t link;
ipc_callid_t callid;
ipc_call_t call;
} msg_t;
 
typedef struct {
link_t link;
ipcarg_t in_phone_hash; /**< Incoming phone hash. */
link_t msg_queue; /**< Messages that should be delivered to this thread */
pstid_t ptid; /**< Thread associated with this connection */
int active; /**< If this thread is currently active */
int opened; /* If the connection was accepted */
/* Structures for connection opening packet */
ipc_callid_t callid;
ipc_call_t call;
} connection_t;
 
__thread connection_t *PS_connection;
 
/* Hash table functions */
 
#define ASYNC_HASH_TABLE_CHAINS 32
 
static hash_index_t conn_hash(unsigned long *key)
{
assert(key);
return ((*key) >> 4) % ASYNC_HASH_TABLE_CHAINS;
}
 
ipc_wait_t fire_function(args)
static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
{
stack = malloc(stacksize);
setup(stack);
add_to_list_of_ready_funcs(stack);
if (threads_waiting_for_message)
send_message_to_self_to_one_up();
connection_t *hs;
 
hs = hash_table_get_instance(item, connection_t, link);
return key[0] == hs->in_phone_hash;
}
 
void discard_result(ipc_wait_t funcid)
static void conn_remove(link_t *item)
{
free(hash_table_get_instance(item, connection_t, link));
}
 
int wait_result(ipc_wait_t funcid);
 
/** Operations for NS hash table. */
static hash_table_operations_t conn_hash_table_ops = {
.hash = conn_hash,
.compare = conn_compare,
.remove_callback = conn_remove
};
 
/** Try to route a call to an appropriate connection thread
*
*/
static int route_call(ipc_callid_t callid, ipc_call_t *call)
{
save_context(self);
restart:
if result_available() {
if in_list_of_ready(self):
tear_off_list(self);
return retval;
connection_t *conn;
msg_t *msg;
link_t *hlp;
unsigned long key;
 
futex_down(&conn_futex);
 
key = call->in_phone_hash;
hlp = hash_table_find(&conn_hash_table, &key);
if (!hlp) {
futex_up(&conn_futex);
return 0;
}
add_to_waitlist_of(funcid);
conn = hash_table_get_instance(hlp, connection_t, link);
 
take_something_from_list_of_ready();
if something {
restore_context(something);
} else { /* nothing */
wait_for_call();
if (answer) {
mark_result_ready();
put_waiting_thread_to_waitlist();
msg = malloc(sizeof(*msg));
msg->callid = callid;
msg->call = *call;
list_append(&msg->link, &conn->msg_queue);
if (!conn->active) {
conn->active = 1;
psthread_add_ready(conn->ptid);
}
 
goto restart;
}
futex_up(&conn_futex);
 
return 1;
}
 
ipc_callid_t async_get_call(ipc_call_t *call)
{
msg_t *msg;
ipc_callid_t callid;
connection_t *conn;
futex_down(&conn_futex);
 
conn = PS_connection;
/* If nothing in queue, wait until something appears */
if (list_empty(&conn->msg_queue)) {
conn->active = 0;
psthread_schedule_next_adv(PS_TO_MANAGER);
}
msg = list_get_instance(conn->msg_queue.next, msg_t, link);
list_remove(&msg->link);
callid = msg->callid;
*call = msg->call;
free(msg);
futex_up(&conn_futex);
return callid;
}
 
void client_connection(ipc_callid_t callid, ipc_call_t *call)
{
printf("Got connection - no handler.\n");
_exit(1);
}
 
int ipc_call_sync(args)
static int connection_thread(void *arg)
{
return ipc_wait(call_func(args));
/* Setup thread local connection pointer */
PS_connection = (connection_t *)arg;
client_connection(PS_connection->callid, &PS_connection->call);
 
futex_down(&conn_futex);
/* TODO: remove myself from connection hash table */
futex_up(&conn_futex);
/* TODO: answer all unanswered messages in queue with
* EHANGUP */
}
 
/** Create new thread for a new connection
*
* Creates new thread for connection, fills in connection
* structures and inserts it into the hash table, so that
* later we can easily do routing of messages to particular
* threads.
*/
static void new_connection(ipc_callid_t callid, ipc_call_t *call)
{
pstid_t ptid;
connection_t *conn;
unsigned long key;
 
conn = malloc(sizeof(*conn));
if (!conn) {
ipc_answer_fast(callid, ENOMEM, 0, 0);
return;
}
conn->in_phone_hash = IPC_GET_ARG3(*call);
list_initialize(&conn->msg_queue);
conn->opened = 0;
conn->ptid = psthread_create(connection_thread, conn);
conn->callid = callid;
conn->call = *call;
conn->active = 1; /* We will activate it asap */
list_initialize(&conn->link);
if (!conn->ptid) {
free(conn);
ipc_answer_fast(callid, ENOMEM, 0, 0);
return;
}
key = conn->in_phone_hash;
futex_down(&conn_futex);
/* Add connection to hash table */
hash_table_insert(&conn_hash_table, &key, &conn->link);
futex_up(&conn_futex);
 
psthread_add_ready(conn->ptid);
}
 
/** Handle call to a task */
static void handle_call(ipc_callid_t callid, ipc_call_t *call)
{
if (route_call(callid, call))
return;
 
switch (IPC_GET_METHOD(*call)) {
case IPC_M_INTERRUPT:
break;
case IPC_M_CONNECT_ME_TO:
/* Open new connection with thread etc. */
new_connection(callid, call);
break;
default:
ipc_answer_fast(callid, EHANGUP, 0, 0);
}
}
 
/** Endless loop dispatching incoming calls and answers */
int async_manager()
{
ipc_call_t call;
ipc_callid_t callid;
 
while (1) {
if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
futex_up(&conn_futex); /* conn_futex is always held
* when entering manager thread
*/
continue;
}
callid = ipc_wait_cycle(&call,SYNCH_NO_TIMEOUT,SYNCH_BLOCKING);
 
if (callid & IPC_CALLID_ANSWERED)
continue;
handle_call(callid, &call);
}
}
 
static int async_manager_thread(void *arg)
{
futex_up(&conn_futex); /* conn_futex is always locked when entering
* manager */
async_manager();
}
 
/** Add one manager to manager list */
void async_create_manager(void)
{
pstid_t ptid;
 
ptid = psthread_create(async_manager_thread, NULL);
psthread_add_manager(ptid);
}
 
/** Remove one manager from manager list */
void async_destroy_manager(void)
{
psthread_remove_manager();
}
 
/** Initialize internal structures needed for async manager */
int _async_init(void)
{
if (!hash_table_create(&conn_hash_table, ASYNC_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) {
printf("%s: cannot create hash table\n", "async");
return ENOMEM;
}
}