Subversion Repositories HelenOS

Compare Revisions

Ignore whitespace Rev 2429 → Rev 2430

/branches/rcu/kernel/test/synch/rcu1.c
27,6 → 27,10
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
 
 
/* This is RCU thorough test. It should provide basic guidelines on how to use
this implementation of RCU */
 
#include <synch/rcu.h>
#include <print.h>
#include <test.h>
37,14 → 41,19
#include <preemption.h>
#include <proc/thread.h>
 
 
#define RCU_MAX_I 1000
//number of nodes in the list. The sum of the list will grow up to RCU_MAX_I^2
#define RCU_MAX_I 500
//number of reader and writer threads in the test
#define READER_THREADS 10
#define WRITER_THREADS 10
#define THREADS_SLEEP_LENGTH (uint32_t)50
//a sleep separates iterations of reading
#define THREADS_SLEEP_LENGTH 50
 
//shared flag specifying whether we should be quiet
bool gquiet;
volatile bool finished;
//count of finished threads
volatile int cfinished;
//the list we will manage with RCU
typedef struct data_struc {
int number;
struct data_struc* next;
54,7 → 63,7
SPINLOCK_INITIALIZE(write_lock);
 
 
 
/** this thread will try to read from the list */
static void reader(void* a)
{
a = (a);
62,20 → 71,22
int i = 0;
while (true)
{
i = 0;
//entering read critical section
rcu_read_lock();
for (cur = rcu_dereference_pointer(first).next; cur != first; cur = cur->next) {
i += rcu_dereference_pointer(cur).number;
//proper dereferencing
i = rcu_dereference_pointer(first).number;
for (cur = rcu_dereference_pointer(first).next; cur != NULL; cur = cur->next) {
i += cur->number;
}
rcu_read_unlock();
thread_usleep(THREADS_SLEEP_LENGTH);
if (i>RCU_MAX_I || finished)
if (i>RCU_MAX_I*RCU_MAX_I || cfinished>0)
{
printf("@");
break;
}
thread_usleep(THREADS_SLEEP_LENGTH);
}
finished = true;
cfinished++;
}
 
static void writer(void* a)
82,70 → 93,70
{
a = (a);
data_t* cur;
data_t* newdata, *oldata;
data_t* newdata;
data_t* oldata;
rcu_callback_list_t* rcudata;
int i = 0;
printf("a1 ");
rcudata = malloc(sizeof(rcu_callback_list_t),0);
while (true)
{
//we must allocate the rcu structure each time, because it gets freed after the callback
//we allocate it outside any critical section, as it could block
rcudata = malloc(sizeof(rcu_callback_list_t),0);
rcu_read_lock();
i = rcu_dereference_pointer(first).number;
rcu_read_lock();
printf("a2 ");
for (cur = rcu_dereference_pointer(first).next; cur != first; cur = rcu_dereference_pointer(cur).next) {
i += rcu_dereference_pointer(cur).number;
for (cur = rcu_dereference_pointer(first).next; cur != NULL; cur = cur->next) {
i += cur->number;
}
rcu_read_unlock();
if (!gquiet)
printf("i%d ",i);
if (i>RCU_MAX_I || finished)
if (i>RCU_MAX_I*RCU_MAX_I || cfinished>0)
{
printf("!");
break;
}
printf("a2b ");
thread_usleep(THREADS_SLEEP_LENGTH);
 
printf("a3 ");
//insert a new member
newdata = malloc(sizeof(data_t),0);
printf("a4 ");
newdata->number = (i/(RCU_MAX_I/2))+1;
rcu_read_lock();
//prepend a new member
//we have to acquire the lock for writing to the structure
spinlock_lock(&write_lock);
printf("a5 ");
newdata->number = (i/10)+1;
oldata = first;
newdata->next = first;
//rcu_assign_pointer takes care of the necessary write barriers
rcu_assign_pointer(first, newdata);
printf("a6 ");
rcu_sync_callback(&rcu_callback_free, oldata, rcudata);
if (!gquiet)
printf("prepending:%x,n:%d ", newdata, newdata->number);
spinlock_unlock(&write_lock);
printf("a7 ");
rcu_read_unlock();
thread_usleep(THREADS_SLEEP_LENGTH);
printf("a8 ");
if (!gquiet)
printf(".",i);
 
 
//replace a random member
rcu_read_lock();
for (cur = rcu_dereference_pointer(first).next; rcu_dereference_pointer(cur).number % 5 != 4 && cur != first; cur = rcu_dereference_pointer(cur).next) {
//we have to lock the spinlock now, because we'll use the cur pointer later
//RCU doesn't provide guarantee that cur->next will point to a member of the list
//note that read critical section DOES guarantee that the *cur will be allocated space
//with current or past version of some member of the list
//However, that is not sufficient as we would be changing old version, which would be freed later on
spinlock_lock(&write_lock);
for (cur = first; cur->next != NULL && cur->next->number >= (i/(RCU_MAX_I/2)); cur = rcu_dereference_pointer(cur).next) {
}
if (cur != first){
//delete the cur->next
spinlock_lock(&write_lock);
oldata = rcu_dereference_pointer(cur).next;
newdata->next = rcu_dereference_pointer(rcu_dereference_pointer(cur).next).next;
rcu_assign_pointer(rcu_dereference_pointer(cur).next, newdata);
if (cur->next != NULL) {
newdata = malloc(sizeof(data_t),0);
//the change of number member could be done atomically, its here just to simulate some real work
newdata->number = (i/(RCU_MAX_I/2))+5;
newdata->next = cur->next->next;
oldata = cur->next;
if (!gquiet)
printf("free:%x,n:%d ", cur->next, cur->next->number);
rcu_assign_pointer(cur->next, newdata);
//free the old member when it is safe (i.e. no references are held)
rcu_sync_callback(&rcu_callback_free, oldata, rcudata);
spinlock_unlock(&write_lock);
if (!gquiet)
printf(", ",i);
 
}
rcu_read_unlock();
thread_usleep(THREADS_SLEEP_LENGTH);
rcu_read_unlock();
}
finished = true;
cfinished++;
}
 
char * test_rcu1(bool quiet)
154,29 → 165,39
data_t* cur, *oldata;
int i;
thread_t* t;
thread_t** threads;
 
//allocate and initialize the list
first = malloc(sizeof(data_t),0);
first->number = 10;
first->next = first;
threads = malloc(sizeof(thread_t*),0);
finished = false;
first->number = 0;
cur = first;
for (i=1;i<RCU_MAX_I;i++) {
cur->next = malloc(sizeof(data_t),0);
cur = cur->next;
cur->number = i;
}
cur->next = NULL;
 
//initialize the counter of finished threads
cfinished=0;
 
//start the writers
for(i = 0; i< WRITER_THREADS; i++) {
threads[i]=t=thread_create(writer,NULL, TASK, 0, "writerthread", false );
t=thread_create(writer,NULL, TASK, 0, "writerthread", false );
if (t != NULL)
thread_ready(t);
}
 
//start the readers
for(i = 0; i< READER_THREADS; i++) {
threads[WRITER_THREADS+i]=t=thread_create(reader,NULL, TASK, 0, "readerthread", false );
t=thread_create(reader,NULL, TASK, 0, "readerthread", false );
if (t != NULL)
thread_ready(t);
}
 
for (i=0;i<WRITER_THREADS+READER_THREADS;i++)
thread_join(threads[i]);
 
for(cur=first->next;cur->next!=first;) {
//wait for completion
while (cfinished<WRITER_THREADS+READER_THREADS);
printf("\nfinished all threads!\n");
//free the list
for(cur=first->next;cur!=NULL;) {
oldata = cur->next;
free(cur);
cur = oldata;
/branches/rcu/kernel/test/synch/rcu1.def
1,6 → 1,6
{
"rcu1",
"RCU test (very basic)",
"RCU test",
&test_rcu1,
true
},
/branches/rcu/kernel/generic/include/proc/tasklet.h
30,7 → 30,8
/** @addtogroup genericddi
* @{
*/
/** @file
/** @file tasklet.h
* @brief Tasklets declarations
*/
 
#ifndef KERN_TASKLET_H_
/branches/rcu/kernel/generic/include/synch/rcu.h
29,7 → 29,8
/** @addtogroup sync
* @{
*/
/** @file
/** @file rcu.h
* @brief declarations for RCU
*/
 
#ifndef KERN_RCU_H_
41,9 → 42,14
#include <arch.h>
#include <preemption.h>
 
 
/** Structure for callbacks */
typedef struct rcu_callback_list {
/** next in the list */
struct rcu_callback_list* next;
/** pointer to callback function */
void (*func)(void*);
/** argument to pass to the callback */
void* data;
} rcu_callback_list_t;
 
/branches/rcu/kernel/generic/src/synch/rcu.c
29,7 → 29,8
/** @addtogroup sync
* @{
*/
/** @file
/** @file rcu.c
* @brief RCU synchronization primitive
*/
 
#include <synch/rcu.h>
45,16 → 46,22
 
 
 
 
/** Main data structure of the RCU implementation */
typedef struct {
#ifdef CONFIG_SMP
/** flags indicating whether the corresponding CPU has passed QS for this RCU batch */
bool* cpu_mask;
#endif
rcu_callback_list_t* next_batch, *current_batch, *done_batch;
} rcu_global_t;
/** RCU batch waiting for finishing of current batch, QS monitoring hasn't been started for this one */
rcu_callback_list_t* next_batch;
/** RCU batch that waits for passing of QSs on all CPUs */
rcu_callback_list_t *current_batch;
/** RCU batch that has passed all QSs and waits for invocation */
rcu_callback_list_t *done_batch;
} rcu_cpu_data_t;
 
/** An array of structures holding the callbacks and the progress of QS for each CPU*/
rcu_global_t* rcu_global=NULL;
rcu_cpu_data_t* rcu_global=NULL;
/** reference to the RCU tasklet, for scheduling it */
tasklet_descriptor_t* rcu_tasklet_desc;
 
68,13 → 75,13
int i,j;
#endif
 
rcu_global = malloc(sizeof(rcu_global_t)*(config.cpu_count),0);
rcu_global = malloc(sizeof(rcu_cpu_data_t)*(config.cpu_count),0);
rcu_tasklet_desc = tasklet_register(&rcu_tasklet, NULL);
 
#ifdef CONFIG_SMP
/*
* Note: I allocate the array for a case when every CPU connected will be active
* In a case when there will be some inactive CPUs, I will use just the first cells.
* In a case when there will be some inactive CPUs, I will use just the active ones
*/
for (i=0;i<config.cpu_count;i++) {
rcu_global[i].done_batch = NULL;
119,9 → 126,10
 
 
/**
* appends this callback func to the queue of waiting callbacks, the rest
* Appends this callback func to the queue of waiting callbacks, the rest
* is handled in rcu_run_callbacks and in the tasklet. This is a lock free variant,
* which must be supplied with a preallocated rcu_callback_list_t structure
* which is deallocated after the callback is called
*/
void rcu_sync_callback(void (*func)(void* data), void* data, rcu_callback_list_t* rd)
{
133,18 → 141,19
rd->func = func;
rd->data = data;
 
//disabling interrupts removes need for any synchronization - the list of callbacks is
//always accessed only on current CPU
ipl = interrupts_disable();
//append to the list of callbacks waiting for their batch to begin
rd->next = rcu_global[CPU->id].next_batch;
rcu_global[CPU->id].next_batch = rd;
rcu_passQS();
interrupts_restore(ipl);
 
rcu_passQS();
#endif
}
 
/**
* RCU tasklet, tests passing through QSs, moves from current to done
* RCU tasklet, tests passing through QSs, moves from current list to done list
*/
void rcu_tasklet(void* data)
{
154,20 → 163,23
int i;
#endif
ipl_t ipl;
passed_all_QS = true;
 
 
ipl = interrupts_disable();
 
rcu_passQS();
passed_all_QS = true;
#ifdef CONFIG_SMP
//check whether all CPUs have passed through QS
for (i = 0; i < config.cpu_active; i++)
passed_all_QS &= rcu_global[CPU->id].cpu_mask[i];
//check whether all CPUs have passed through QS of this CPU's current batch
for (i = 0; i < config.cpu_count; i++)
if (cpus[i].active)
passed_all_QS &= rcu_global[CPU->id].cpu_mask[i];
#endif
if (passed_all_QS) {
//all CPUs have passed through QS -> grace period is over, we can schedule the call to RCU callback
if (rcu_global[CPU->id].done_batch) {
rd = rcu_global[CPU->id].done_batch;
 
while (rd->next) rd = rd->next;
//append the current list to done list
rd->next = rcu_global[CPU->id].current_batch;
186,15 → 198,18
{
#ifdef CONFIG_SMP
int i;
for (i=0;i<config.cpu_active;i++)
//on all CPUs indicate that this CPU has gone through QS
rcu_global[i].cpu_mask[CPU->id]=true;
for (i = 0; i < config.cpu_count; i++)
if (cpus[i].active)
//on all CPUs indicate that this CPU has gone through QS
//this can overlap with clearing this flag in rcu_run_callbacks
rcu_global[i].cpu_mask[CPU->id]=true;
#endif
}
 
 
/**
* Moves RCUs from next to current, schedules RCU tasklet, calls the callbacks, frees the rcu_callback_list_t
* Moves RCU callbacks from next list to current list, schedules the RCU tasklet when needed,
* calls the callbacks from done list, frees the rcu_callback_list_t
*/
void rcu_run_callbacks(void)
{
204,7 → 219,7
 
ipl = interrupts_disable();
if (rcu_global[CPU->id].next_batch) {
//we cannot append to the current list because callbacks from next batch
//we cannot append to the current list because the callbacks from next batch
//haven't passed the QSs
if (rcu_global[CPU->id].current_batch == NULL) {
rcu_global[CPU->id].current_batch = rcu_global[CPU->id].next_batch;
211,13 → 226,14
rcu_global[CPU->id].next_batch = NULL;
#ifdef CONFIG_SMP
//initialize our CPU mask
for (i=0;i<config.cpu_active;i++)
rcu_global[CPU->id].cpu_mask[i]=false;
for (i = 0; i < config.cpu_count; i++)
if (cpus[i].active)
rcu_global[CPU->id].cpu_mask[i]=false;
#endif
//schedule tasklet for all CPUs
for (i=0;i<config.cpu_active;i++) {
tasklet_schedule_SMP(rcu_tasklet_desc, i);
}
for (i = 0; i < config.cpu_count; i++)
if (cpus[i].active)
tasklet_schedule_SMP(rcu_tasklet_desc, i);
}
}
//this CPU has passed QS
225,9 → 241,12
if (rcu_global[CPU->id].done_batch) {
rd = rcu_global[CPU->id].done_batch;
rcu_global[CPU->id].done_batch = NULL;
//the callbacks (and free) can block, we must restore the interrupts
interrupts_restore(ipl);
while (rd) {
//call the callback
if (rd->func == NULL)
panic_printf("RCU callback function NULL, desc:%x", rd);
rd->func(rd->data);
rd2 = rd->next;
//free the structure
/branches/rcu/kernel/generic/src/proc/tasklet.c
30,7 → 30,8
/** @addtogroup genericddi
* @{
*/
/** @file
/** @file tasklet.c
* @brief Tasklet implementation
*/
 
#include <arch.h>
91,10 → 92,10
//create the tasklet_thread, it's wired to the current cpu, we'll migrate it ourselves
thread_t* t= thread_create(&tasklet_thread, NULL, kernel_task, THREAD_FLAG_WIRED, "tasklet_thread", false);
if (t==NULL) {
//wtf?
panic_printf("tasklet thread not created\n");
} else {
spinlock_lock(&t->lock);
spinlock_lock(&t->lock);
//we'll default on the first CPU
t->cpu = &cpus[0];
t->priority = TASKLET_THREAD_PRIORITY;
spinlock_unlock(&t->lock);
114,21 → 115,25
waitq_t wq;
waitq_initialize(&wq);
//the infinite loop
while (true) {
//execute any scheduled tasklets
tasklet_do();
#ifdef CONFIG_SMP
//check whether other CPUs have tasklets to execute
if (config.cpu_active>1) {
current_cpu = CPU->id;
//find the first cpu with nonempty tasklet_list
for (new_cpu = (current_cpu + 1) % config.cpu_active; new_cpu!=current_cpu && tasklet_list[new_cpu]==0 && cpus[new_cpu].active;
new_cpu=(new_cpu + 1)% config.cpu_active);
for (new_cpu = (current_cpu + 1) % config.cpu_count; new_cpu!=current_cpu && tasklet_list[new_cpu]==0 ;
new_cpu=(new_cpu + 1)% config.cpu_count);
 
if (new_cpu!=current_cpu) {
//if we found a CPU with unsatisfied tasklet schedule to run there. It must be active!
if (new_cpu!=current_cpu && cpus[new_cpu].active) {
//we need to migrate this thread to CPU with id new_cpu
cpu = &cpus[new_cpu];
 
spinlock_lock(&THREAD->lock);
//put tasklet_thread on the new_cpu
//move tasklet_thread on the new_cpu
THREAD->cpu = cpu;
spinlock_unlock(&THREAD->lock);
}
203,12 → 208,12
 
 
 
/** Executes scheduled enabled tasklets on current CPU */
/** Executes scheduled enabled tasklets on current CPU
* this function could be called from other parts of kernel */
void tasklet_do(void)
{
spinlock_lock(&tasklet_lock);
tasklet_descriptor_t* t = tasklet_list[CPU->id];
//printf(".");
if (t) {
//empty the tasklet_list
tasklet_list[CPU->id]=0;
218,8 → 223,7
if (t->func) {
t->state = TASKLET_STATE_RUNNING;
t->func(t->data);
//clear running flag, set not active - the tasklet can disable itself
//thats why we don't just set it as not active
//clear running flag, set not active
t->state &= ~TASKLET_STATE_RUNNING;
t->state |= TASKLET_STATE_NOTACTIVE;
} else
240,7 → 244,7
}
 
/** Frees the tasklet structure when no longer needed. The function doesn't provide
* any synchronization, the caller must be sure, the tasklet is not scheduled.
* any synchronization, the caller must be sure, that the tasklet is not scheduled.
*
* @param tasklet to be freed
*/