/branches/rcu/kernel/test/synch/rcu1.c |
---|
27,6 → 27,10 |
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
*/ |
/* This is RCU thorough test. It should provide basic guidelines on how to use |
this implementation of RCU */ |
#include <synch/rcu.h> |
#include <print.h> |
#include <test.h> |
37,14 → 41,19 |
#include <preemption.h> |
#include <proc/thread.h> |
#define RCU_MAX_I 1000 |
//number of nodes in the list. The sum of the list will grow up to RCU_MAX_I^2 |
#define RCU_MAX_I 500 |
//number of reader and writer threads in the test |
#define READER_THREADS 10 |
#define WRITER_THREADS 10 |
#define THREADS_SLEEP_LENGTH (uint32_t)50 |
//a sleep separates iterations of reading |
#define THREADS_SLEEP_LENGTH 50 |
//shared flag specifying whether we should be quiet |
bool gquiet; |
volatile bool finished; |
//count of finished threads |
volatile int cfinished; |
//the list we will manage with RCU |
typedef struct data_struc { |
int number; |
struct data_struc* next; |
54,7 → 63,7 |
SPINLOCK_INITIALIZE(write_lock); |
/** this thread will try to read from the list */ |
static void reader(void* a) |
{ |
a = (a); |
62,20 → 71,22 |
int i = 0; |
while (true) |
{ |
i = 0; |
//entering read critical section |
rcu_read_lock(); |
for (cur = rcu_dereference_pointer(first).next; cur != first; cur = cur->next) { |
i += rcu_dereference_pointer(cur).number; |
//proper dereferencing |
i = rcu_dereference_pointer(first).number; |
for (cur = rcu_dereference_pointer(first).next; cur != NULL; cur = cur->next) { |
i += cur->number; |
} |
rcu_read_unlock(); |
thread_usleep(THREADS_SLEEP_LENGTH); |
if (i>RCU_MAX_I || finished) |
if (i>RCU_MAX_I*RCU_MAX_I || cfinished>0) |
{ |
printf("@"); |
break; |
} |
thread_usleep(THREADS_SLEEP_LENGTH); |
} |
finished = true; |
cfinished++; |
} |
static void writer(void* a) |
82,70 → 93,70 |
{ |
a = (a); |
data_t* cur; |
data_t* newdata, *oldata; |
data_t* newdata; |
data_t* oldata; |
rcu_callback_list_t* rcudata; |
int i = 0; |
printf("a1 "); |
rcudata = malloc(sizeof(rcu_callback_list_t),0); |
while (true) |
{ |
//we must allocate the rcu structure each time, because it gets freed after the callback |
//we allocate it outside any critical section, as it could block |
rcudata = malloc(sizeof(rcu_callback_list_t),0); |
rcu_read_lock(); |
i = rcu_dereference_pointer(first).number; |
rcu_read_lock(); |
printf("a2 "); |
for (cur = rcu_dereference_pointer(first).next; cur != first; cur = rcu_dereference_pointer(cur).next) { |
i += rcu_dereference_pointer(cur).number; |
for (cur = rcu_dereference_pointer(first).next; cur != NULL; cur = cur->next) { |
i += cur->number; |
} |
rcu_read_unlock(); |
if (!gquiet) |
printf("i%d ",i); |
if (i>RCU_MAX_I || finished) |
if (i>RCU_MAX_I*RCU_MAX_I || cfinished>0) |
{ |
printf("!"); |
break; |
} |
printf("a2b "); |
thread_usleep(THREADS_SLEEP_LENGTH); |
printf("a3 "); |
//insert a new member |
newdata = malloc(sizeof(data_t),0); |
printf("a4 "); |
newdata->number = (i/(RCU_MAX_I/2))+1; |
rcu_read_lock(); |
//prepend a new member |
//we have to acquire the lock for writing to the structure |
spinlock_lock(&write_lock); |
printf("a5 "); |
newdata->number = (i/10)+1; |
oldata = first; |
newdata->next = first; |
//rcu_assign_pointer takes care of the necessary write barriers |
rcu_assign_pointer(first, newdata); |
printf("a6 "); |
rcu_sync_callback(&rcu_callback_free, oldata, rcudata); |
if (!gquiet) |
printf("prepending:%x,n:%d ", newdata, newdata->number); |
spinlock_unlock(&write_lock); |
printf("a7 "); |
rcu_read_unlock(); |
thread_usleep(THREADS_SLEEP_LENGTH); |
printf("a8 "); |
if (!gquiet) |
printf(".",i); |
//replace a random member |
rcu_read_lock(); |
for (cur = rcu_dereference_pointer(first).next; rcu_dereference_pointer(cur).number % 5 != 4 && cur != first; cur = rcu_dereference_pointer(cur).next) { |
//we have to lock the spinlock now, because we'll use the cur pointer later |
//RCU doesn't provide guarantee that cur->next will point to a member of the list |
//note that read critical section DOES guarantee that the *cur will be allocated space |
//with current or past version of some member of the list |
//However, that is not sufficient as we would be changing old version, which would be freed later on |
spinlock_lock(&write_lock); |
for (cur = first; cur->next != NULL && cur->next->number >= (i/(RCU_MAX_I/2)); cur = rcu_dereference_pointer(cur).next) { |
} |
if (cur != first){ |
//delete the cur->next |
spinlock_lock(&write_lock); |
oldata = rcu_dereference_pointer(cur).next; |
newdata->next = rcu_dereference_pointer(rcu_dereference_pointer(cur).next).next; |
rcu_assign_pointer(rcu_dereference_pointer(cur).next, newdata); |
if (cur->next != NULL) { |
newdata = malloc(sizeof(data_t),0); |
//the change of number member could be done atomically, its here just to simulate some real work |
newdata->number = (i/(RCU_MAX_I/2))+5; |
newdata->next = cur->next->next; |
oldata = cur->next; |
if (!gquiet) |
printf("free:%x,n:%d ", cur->next, cur->next->number); |
rcu_assign_pointer(cur->next, newdata); |
//free the old member when it is safe (i.e. no references are held) |
rcu_sync_callback(&rcu_callback_free, oldata, rcudata); |
spinlock_unlock(&write_lock); |
if (!gquiet) |
printf(", ",i); |
} |
rcu_read_unlock(); |
thread_usleep(THREADS_SLEEP_LENGTH); |
} |
finished = true; |
cfinished++; |
} |
char * test_rcu1(bool quiet) |
154,29 → 165,39 |
data_t* cur, *oldata; |
int i; |
thread_t* t; |
thread_t** threads; |
//allocate and initialize the list |
first = malloc(sizeof(data_t),0); |
first->number = 10; |
first->next = first; |
threads = malloc(sizeof(thread_t*),0); |
finished = false; |
first->number = 0; |
cur = first; |
for (i=1;i<RCU_MAX_I;i++) { |
cur->next = malloc(sizeof(data_t),0); |
cur = cur->next; |
cur->number = i; |
} |
cur->next = NULL; |
//initialize the counter of finished threads |
cfinished=0; |
//start the writers |
for(i = 0; i< WRITER_THREADS; i++) { |
threads[i]=t=thread_create(writer,NULL, TASK, 0, "writerthread", false ); |
t=thread_create(writer,NULL, TASK, 0, "writerthread", false ); |
if (t != NULL) |
thread_ready(t); |
} |
//start the readers |
for(i = 0; i< READER_THREADS; i++) { |
threads[WRITER_THREADS+i]=t=thread_create(reader,NULL, TASK, 0, "readerthread", false ); |
t=thread_create(reader,NULL, TASK, 0, "readerthread", false ); |
if (t != NULL) |
thread_ready(t); |
} |
for (i=0;i<WRITER_THREADS+READER_THREADS;i++) |
thread_join(threads[i]); |
for(cur=first->next;cur->next!=first;) { |
//wait for completion |
while (cfinished<WRITER_THREADS+READER_THREADS); |
printf("\nfinished all threads!\n"); |
//free the list |
for(cur=first->next;cur!=NULL;) { |
oldata = cur->next; |
free(cur); |
cur = oldata; |
/branches/rcu/kernel/test/synch/rcu1.def |
---|
1,6 → 1,6 |
{ |
"rcu1", |
"RCU test (very basic)", |
"RCU test", |
&test_rcu1, |
true |
}, |
/branches/rcu/kernel/generic/include/proc/tasklet.h |
---|
30,7 → 30,8 |
/** @addtogroup genericddi |
* @{ |
*/ |
/** @file |
/** @file tasklet.h |
* @brief Tasklets declarations |
*/ |
#ifndef KERN_TASKLET_H_ |
/branches/rcu/kernel/generic/include/synch/rcu.h |
---|
29,7 → 29,8 |
/** @addtogroup sync |
* @{ |
*/ |
/** @file |
/** @file rcu.h |
* @brief declarations for RCU |
*/ |
#ifndef KERN_RCU_H_ |
41,9 → 42,14 |
#include <arch.h> |
#include <preemption.h> |
/** Structure for callbacks */ |
typedef struct rcu_callback_list { |
/** next in the list */ |
struct rcu_callback_list* next; |
/** pointer to callback function */ |
void (*func)(void*); |
/** argument to pass to the callback */ |
void* data; |
} rcu_callback_list_t; |
/branches/rcu/kernel/generic/src/synch/rcu.c |
---|
29,7 → 29,8 |
/** @addtogroup sync |
* @{ |
*/ |
/** @file |
/** @file rcu.c |
* @brief RCU synchronization primitive |
*/ |
#include <synch/rcu.h> |
45,16 → 46,22 |
/** Main data structure of the RCU implementation */ |
typedef struct { |
#ifdef CONFIG_SMP |
/** flags indicating whether the corresponding CPU has passed QS for this RCU batch */ |
bool* cpu_mask; |
#endif |
rcu_callback_list_t* next_batch, *current_batch, *done_batch; |
} rcu_global_t; |
/** RCU batch waiting for finishing of current batch, QS monitoring hasn't been started for this one */ |
rcu_callback_list_t* next_batch; |
/** RCU batch that waits for passing of QSs on all CPUs */ |
rcu_callback_list_t *current_batch; |
/** RCU batch that has passed all QSs and waits for invocation */ |
rcu_callback_list_t *done_batch; |
} rcu_cpu_data_t; |
/** An array of structures holding the callbacks and the progress of QS for each CPU*/ |
rcu_global_t* rcu_global=NULL; |
rcu_cpu_data_t* rcu_global=NULL; |
/** reference to the RCU tasklet, for scheduling it */ |
tasklet_descriptor_t* rcu_tasklet_desc; |
68,13 → 75,13 |
int i,j; |
#endif |
rcu_global = malloc(sizeof(rcu_global_t)*(config.cpu_count),0); |
rcu_global = malloc(sizeof(rcu_cpu_data_t)*(config.cpu_count),0); |
rcu_tasklet_desc = tasklet_register(&rcu_tasklet, NULL); |
#ifdef CONFIG_SMP |
/* |
* Note: I allocate the array for a case when every CPU connected will be active |
* In a case when there will be some inactive CPUs, I will use just the first cells. |
* In a case when there will be some inactive CPUs, I will use just the active ones |
*/ |
for (i=0;i<config.cpu_count;i++) { |
rcu_global[i].done_batch = NULL; |
119,9 → 126,10 |
/** |
* appends this callback func to the queue of waiting callbacks, the rest |
* Appends this callback func to the queue of waiting callbacks, the rest |
* is handled in rcu_run_callbacks and in the tasklet. This is a lock free variant, |
* which must be supplied with a preallocated rcu_callback_list_t structure |
* which is deallocated after the callback is called |
*/ |
void rcu_sync_callback(void (*func)(void* data), void* data, rcu_callback_list_t* rd) |
{ |
133,18 → 141,19 |
rd->func = func; |
rd->data = data; |
//disabling interrupts removes need for any synchronization - the list of callbacks is |
//always accessed only on current CPU |
ipl = interrupts_disable(); |
//append to the list of callbacks waiting for their batch to begin |
rd->next = rcu_global[CPU->id].next_batch; |
rcu_global[CPU->id].next_batch = rd; |
rcu_passQS(); |
interrupts_restore(ipl); |
rcu_passQS(); |
#endif |
} |
/** |
* RCU tasklet, tests passing through QSs, moves from current to done |
* RCU tasklet, tests passing through QSs, moves from current list to done list |
*/ |
void rcu_tasklet(void* data) |
{ |
154,14 → 163,16 |
int i; |
#endif |
ipl_t ipl; |
passed_all_QS = true; |
ipl = interrupts_disable(); |
rcu_passQS(); |
rcu_passQS(); |
passed_all_QS = true; |
#ifdef CONFIG_SMP |
//check whether all CPUs have passed through QS |
for (i = 0; i < config.cpu_active; i++) |
//check whether all CPUs have passed through QS of this CPU's current batch |
for (i = 0; i < config.cpu_count; i++) |
if (cpus[i].active) |
passed_all_QS &= rcu_global[CPU->id].cpu_mask[i]; |
#endif |
if (passed_all_QS) { |
168,6 → 179,7 |
//all CPUs have passed through QS -> grace period is over, we can schedule the call to RCU callback |
if (rcu_global[CPU->id].done_batch) { |
rd = rcu_global[CPU->id].done_batch; |
while (rd->next) rd = rd->next; |
//append the current list to done list |
rd->next = rcu_global[CPU->id].current_batch; |
186,8 → 198,10 |
{ |
#ifdef CONFIG_SMP |
int i; |
for (i=0;i<config.cpu_active;i++) |
for (i = 0; i < config.cpu_count; i++) |
if (cpus[i].active) |
//on all CPUs indicate that this CPU has gone through QS |
//this can overlap with clearing this flag in rcu_run_callbacks |
rcu_global[i].cpu_mask[CPU->id]=true; |
#endif |
} |
194,7 → 208,8 |
/** |
* Moves RCUs from next to current, schedules RCU tasklet, calls the callbacks, frees the rcu_callback_list_t |
* Moves RCU callbacks from next list to current list, schedules the RCU tasklet when needed, |
* calls the callbacks from done list, frees the rcu_callback_list_t |
*/ |
void rcu_run_callbacks(void) |
{ |
204,7 → 219,7 |
ipl = interrupts_disable(); |
if (rcu_global[CPU->id].next_batch) { |
//we cannot append to the current list because callbacks from next batch |
//we cannot append to the current list because the callbacks from next batch |
//haven't passed the QSs |
if (rcu_global[CPU->id].current_batch == NULL) { |
rcu_global[CPU->id].current_batch = rcu_global[CPU->id].next_batch; |
211,23 → 226,27 |
rcu_global[CPU->id].next_batch = NULL; |
#ifdef CONFIG_SMP |
//initialize our CPU mask |
for (i=0;i<config.cpu_active;i++) |
for (i = 0; i < config.cpu_count; i++) |
if (cpus[i].active) |
rcu_global[CPU->id].cpu_mask[i]=false; |
#endif |
//schedule tasklet for all CPUs |
for (i=0;i<config.cpu_active;i++) { |
for (i = 0; i < config.cpu_count; i++) |
if (cpus[i].active) |
tasklet_schedule_SMP(rcu_tasklet_desc, i); |
} |
} |
} |
//this CPU has passed QS |
rcu_passQS(); |
if (rcu_global[CPU->id].done_batch) { |
rd = rcu_global[CPU->id].done_batch; |
rcu_global[CPU->id].done_batch = NULL; |
//the callbacks (and free) can block, we must restore the interrupts |
interrupts_restore(ipl); |
while (rd) { |
//call the callback |
if (rd->func == NULL) |
panic_printf("RCU callback function NULL, desc:%x", rd); |
rd->func(rd->data); |
rd2 = rd->next; |
//free the structure |
/branches/rcu/kernel/generic/src/proc/tasklet.c |
---|
30,7 → 30,8 |
/** @addtogroup genericddi |
* @{ |
*/ |
/** @file |
/** @file tasklet.c |
* @brief Tasklet implementation |
*/ |
#include <arch.h> |
91,10 → 92,10 |
//create the tasklet_thread, it's wired to the current cpu, we'll migrate it ourselves |
thread_t* t= thread_create(&tasklet_thread, NULL, kernel_task, THREAD_FLAG_WIRED, "tasklet_thread", false); |
if (t==NULL) { |
//wtf? |
panic_printf("tasklet thread not created\n"); |
} else { |
spinlock_lock(&t->lock); |
//we'll default on the first CPU |
t->cpu = &cpus[0]; |
t->priority = TASKLET_THREAD_PRIORITY; |
spinlock_unlock(&t->lock); |
114,21 → 115,25 |
waitq_t wq; |
waitq_initialize(&wq); |
//the infinite loop |
while (true) { |
//execute any scheduled tasklets |
tasklet_do(); |
#ifdef CONFIG_SMP |
//check whether other CPUs have tasklets to execute |
if (config.cpu_active>1) { |
current_cpu = CPU->id; |
//find the first cpu with nonempty tasklet_list |
for (new_cpu = (current_cpu + 1) % config.cpu_active; new_cpu!=current_cpu && tasklet_list[new_cpu]==0 && cpus[new_cpu].active; |
new_cpu=(new_cpu + 1)% config.cpu_active); |
for (new_cpu = (current_cpu + 1) % config.cpu_count; new_cpu!=current_cpu && tasklet_list[new_cpu]==0 ; |
new_cpu=(new_cpu + 1)% config.cpu_count); |
if (new_cpu!=current_cpu) { |
//if we found a CPU with unsatisfied tasklet schedule to run there. It must be active! |
if (new_cpu!=current_cpu && cpus[new_cpu].active) { |
//we need to migrate this thread to CPU with id new_cpu |
cpu = &cpus[new_cpu]; |
spinlock_lock(&THREAD->lock); |
//put tasklet_thread on the new_cpu |
//move tasklet_thread on the new_cpu |
THREAD->cpu = cpu; |
spinlock_unlock(&THREAD->lock); |
} |
203,12 → 208,12 |
/** Executes scheduled enabled tasklets on current CPU */ |
/** Executes scheduled enabled tasklets on current CPU |
* this function could be called from other parts of kernel */ |
void tasklet_do(void) |
{ |
spinlock_lock(&tasklet_lock); |
tasklet_descriptor_t* t = tasklet_list[CPU->id]; |
//printf("."); |
if (t) { |
//empty the tasklet_list |
tasklet_list[CPU->id]=0; |
218,8 → 223,7 |
if (t->func) { |
t->state = TASKLET_STATE_RUNNING; |
t->func(t->data); |
//clear running flag, set not active - the tasklet can disable itself |
//thats why we don't just set it as not active |
//clear running flag, set not active |
t->state &= ~TASKLET_STATE_RUNNING; |
t->state |= TASKLET_STATE_NOTACTIVE; |
} else |
240,7 → 244,7 |
} |
/** Frees the tasklet structure when no longer needed. The function doesn't provide |
* any synchronization, the caller must be sure, the tasklet is not scheduled. |
* any synchronization, the caller must be sure, that the tasklet is not scheduled. |
* |
* @param tasklet to be freed |
*/ |