unbounded job queue

This commit is contained in:
Allen Webster 2016-07-19 19:47:27 -04:00
parent d0dbf70e48
commit 6dd5b3b87b
4 changed files with 353 additions and 29 deletions

View File

@ -2702,12 +2702,19 @@ compute_this_indent(Buffer *buffer, Indent_Parse_State indent,
prev_token.start + prev_token.size > this_line_start){ prev_token.start + prev_token.size > this_line_start){
if (prev_token.type == CPP_TOKEN_COMMENT){ if (prev_token.type == CPP_TOKEN_COMMENT){
Hard_Start_Result hard_start = buffer_find_hard_start(buffer, this_line_start, tab_width); Hard_Start_Result hard_start = buffer_find_hard_start(buffer, this_line_start, tab_width);
i32 line_pos = hard_start.char_pos - this_line_start;
this_indent = line_pos + indent.comment_shift; if (hard_start.all_whitespace){
if (this_indent < 0){ this_indent = previous_indent;
this_indent = 0; did_special_behavior = true;
}
else{
i32 line_pos = hard_start.char_pos - this_line_start;
this_indent = line_pos + indent.comment_shift;
if (this_indent < 0){
this_indent = 0;
}
did_special_behavior = true;
} }
did_special_behavior = true;
} }
else if (prev_token.type == CPP_TOKEN_STRING_CONSTANT){ else if (prev_token.type == CPP_TOKEN_STRING_CONSTANT){
this_indent = previous_indent; this_indent = previous_indent;

View File

@ -26,6 +26,7 @@ typedef int16_t i16;
typedef i32 bool32; typedef i32 bool32;
typedef i8 bool8; typedef i8 bool8;
typedef i32 b32; typedef i32 b32;
typedef i16 b16;
typedef i8 b8; typedef i8 b8;
typedef uint8_t byte; typedef uint8_t byte;

View File

@ -167,20 +167,26 @@ typedef Job_Callback_Sig(Job_Callback);
struct Job_Data{ struct Job_Data{
Job_Callback *callback; Job_Callback *callback;
void *data[2]; void *data[2];
//i32 memory_request;
}; };
struct Full_Job_Data{ struct Full_Job_Data{
Job_Data job; Job_Data job;
u32 job_memory_index;
u32 running_thread; u32 running_thread;
b32 finished;
u32 id; u32 id;
}; };
struct Unbounded_Work_Queue{
Full_Job_Data *jobs;
i32 count, max, skip;
u32 next_job_id;
};
#define QUEUE_WRAP 256
struct Work_Queue{ struct Work_Queue{
Full_Job_Data jobs[256]; Full_Job_Data jobs[QUEUE_WRAP];
Plat_Handle semaphore; Plat_Handle semaphore;
volatile u32 write_position; volatile u32 write_position;
volatile u32 read_position; volatile u32 read_position;
@ -188,9 +194,6 @@ struct Work_Queue{
#define THREAD_NOT_ASSIGNED 0xFFFFFFFF #define THREAD_NOT_ASSIGNED 0xFFFFFFFF
#define JOB_ID_WRAP (ArrayCount(queue->jobs) * 4)
#define QUEUE_WRAP (ArrayCount(queue->jobs))
#define Sys_Post_Job_Sig(name) u32 name(Thread_Group_ID group_id, Job_Data job) #define Sys_Post_Job_Sig(name) u32 name(Thread_Group_ID group_id, Job_Data job)
typedef Sys_Post_Job_Sig(System_Post_Job); typedef Sys_Post_Job_Sig(System_Post_Job);

View File

@ -64,6 +64,8 @@ struct Thread_Group{
Thread_Context *threads; Thread_Context *threads;
i32 count; i32 count;
Unbounded_Work_Queue queue;
i32 cancel_lock0; i32 cancel_lock0;
i32 cancel_cv0; i32 cancel_cv0;
}; };
@ -320,6 +322,7 @@ system_signal_cv(i32 crit_id, i32 cv_id){
WakeConditionVariable(win32vars.condition_vars + cv_id); WakeConditionVariable(win32vars.condition_vars + cv_id);
} }
#if 0
internal DWORD internal DWORD
JobThreadProc(LPVOID lpParameter){ JobThreadProc(LPVOID lpParameter){
Thread_Context *thread = (Thread_Context*)lpParameter; Thread_Context *thread = (Thread_Context*)lpParameter;
@ -344,13 +347,16 @@ JobThreadProc(LPVOID lpParameter){
u32 write_index = queue->write_position; u32 write_index = queue->write_position;
if (read_index != write_index){ if (read_index != write_index){
u32 next_read_index = (read_index + 1) % JOB_ID_WRAP; // NOTE(allen): Previously I was wrapping by the job wrap then
// wrapping by the queue wrap. That was super stupid what was that?
// Now it just wraps by the queue wrap.
u32 next_read_index = (read_index + 1) % QUEUE_WRAP;
u32 safe_read_index = u32 safe_read_index =
InterlockedCompareExchange(&queue->read_position, InterlockedCompareExchange(&queue->read_position,
next_read_index, read_index); next_read_index, read_index);
if (safe_read_index == read_index){ if (safe_read_index == read_index){
Full_Job_Data *full_job = queue->jobs + (safe_read_index % QUEUE_WRAP); Full_Job_Data *full_job = queue->jobs + safe_read_index;
// NOTE(allen): This is interlocked so that it plays nice // NOTE(allen): This is interlocked so that it plays nice
// with the cancel job routine, which may try to cancel this job // with the cancel job routine, which may try to cancel this job
// at the same time that we try to run it // at the same time that we try to run it
@ -394,13 +400,12 @@ Sys_Post_Job_Sig(system_post_job){
u32 result = 0; u32 result = 0;
while (!success){ while (!success){
u32 write_index = queue->write_position; u32 write_index = queue->write_position;
u32 next_write_index = (write_index + 1) % JOB_ID_WRAP; u32 next_write_index = (write_index + 1) % QUEUE_WRAP;
u32 safe_write_index = u32 safe_write_index =
InterlockedCompareExchange(&queue->write_position, InterlockedCompareExchange(&queue->write_position,
next_write_index, write_index); next_write_index, write_index);
if (safe_write_index == write_index){ if (safe_write_index == write_index){
result = write_index; result = write_index;
write_index = write_index % QUEUE_WRAP;
queue->jobs[write_index].job = job; queue->jobs[write_index].job = job;
queue->jobs[write_index].running_thread = THREAD_NOT_ASSIGNED; queue->jobs[write_index].running_thread = THREAD_NOT_ASSIGNED;
queue->jobs[write_index].id = result; queue->jobs[write_index].id = result;
@ -430,15 +435,11 @@ Sys_Cancel_Job_Sig(system_cancel_job){
Work_Queue *queue = win32vars.queues + group_id; Work_Queue *queue = win32vars.queues + group_id;
Thread_Group *group = win32vars.groups + group_id; Thread_Group *group = win32vars.groups + group_id;
u32 job_index; u32 job_index = job_id % QUEUE_WRAP;
u32 thread_id; Full_Job_Data *full_job = queue->jobs + job_index;
Full_Job_Data *full_job;
job_index = job_id % QUEUE_WRAP;
full_job = queue->jobs + job_index;
Assert(full_job->id == job_id); Assert(full_job->id == job_id);
thread_id = u32 thread_id =
InterlockedCompareExchange(&full_job->running_thread, InterlockedCompareExchange(&full_job->running_thread,
0, THREAD_NOT_ASSIGNED); 0, THREAD_NOT_ASSIGNED);
@ -505,7 +506,7 @@ INTERNAL_get_thread_states(Thread_Group_ID id, bool8 *running, i32 *pending){
Work_Queue *queue = win32vars.queues + id; Work_Queue *queue = win32vars.queues + id;
u32 write = queue->write_position; u32 write = queue->write_position;
u32 read = queue->read_position; u32 read = queue->read_position;
if (write < read) write += JOB_ID_WRAP; if (write < read) write += QUEUE_WRAP;
*pending = (i32)(write - read); *pending = (i32)(write - read);
Thread_Group *group = win32vars.groups + id; Thread_Group *group = win32vars.groups + id;
@ -515,6 +516,315 @@ INTERNAL_get_thread_states(Thread_Group_ID id, bool8 *running, i32 *pending){
} }
#endif #endif
#else
internal DWORD
JobThreadProc(LPVOID lpParameter){
Thread_Context *thread = (Thread_Context*)lpParameter;
Work_Queue *queue = win32vars.queues + thread->group_id;
Thread_Group *group = win32vars.groups + thread->group_id;
i32 thread_index = thread->id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
i32 cancel_cv = group->cancel_cv0 + thread_index;
Thread_Memory *thread_memory = win32vars.thread_memory + thread_index;
if (thread_memory->size == 0){
i32 new_size = Kbytes(64);
thread_memory->data = Win32GetMemory(new_size);
thread_memory->size = new_size;
}
for (;;){
u32 read_index = queue->read_position;
u32 write_index = queue->write_position;
if (read_index != write_index){
// NOTE(allen): Previously I was wrapping by the job wrap then
// wrapping by the queue wrap. That was super stupid what was that?
// Now it just wraps by the queue wrap.
u32 next_read_index = (read_index + 1) % QUEUE_WRAP;
u32 safe_read_index =
InterlockedCompareExchange(&queue->read_position,
next_read_index, read_index);
if (safe_read_index == read_index){
Full_Job_Data *full_job = queue->jobs + safe_read_index;
// NOTE(allen): This is interlocked so that it plays nice
// with the cancel job routine, which may try to cancel this job
// at the same time that we try to run it
i32 safe_running_thread =
InterlockedCompareExchange(&full_job->running_thread,
thread->id, THREAD_NOT_ASSIGNED);
if (safe_running_thread == THREAD_NOT_ASSIGNED){
thread->job_id = full_job->id;
thread->running = 1;
full_job->job.callback(&win32vars.system,
thread, thread_memory, full_job->job.data);
PostMessage(win32vars.window_handle, WM_4coder_ANIMATE, 0, 0);
full_job->running_thread = 0;
thread->running = 0;
system_acquire_lock(cancel_lock);
if (thread->cancel){
thread->cancel = 0;
system_signal_cv(cancel_lock, cancel_cv);
}
system_release_lock(cancel_lock);
}
}
}
else{
WaitForSingleObject(Win32Handle(queue->semaphore), INFINITE);
}
}
}
internal void
initialize_unbounded_queue(Unbounded_Work_Queue *source_queue){
i32 max = 512;
source_queue->jobs = (Full_Job_Data*)system_get_memory(max*sizeof(Full_Job_Data));
source_queue->count = 0;
source_queue->max = max;
source_queue->skip = 0;
}
inline i32
get_work_queue_available_space(i32 write, i32 read){
// NOTE(allen): The only time that queue->write_position == queue->read_position
// is allowed is when the queue is empty. Thus if
// queue->write_position+1 == queue->read_position the available space is zero.
// So these computations both end up leaving one slot unused. The only way I can
// think to easily eliminate this is to have read and write wrap at twice the size
// of the underlying array but modulo their values into the array then if write
// has caught up with read it still will not be equal... but lots of modulos... ehh.
i32 available_space = 0;
if (write >= read){
available_space = QUEUE_WRAP - (write - read) - 1;
}
else{
available_space = (read - write) - 1;
}
return(available_space);
}
#define UNBOUNDED_SKIP_MAX 128
internal void
flush_to_direct_queue(Unbounded_Work_Queue *source_queue, Work_Queue *queue, i32 thread_count){
// NOTE(allen): It is understood that read_position may be changed by other
// threads but it will only make more space in the queue if it is changed.
// Meanwhile write_position should not ever be changed by anything but the
// main thread in this system, so it will not be interlocked.
u32 read_position = queue->read_position;
u32 write_position = queue->write_position;
u32 available_space = get_work_queue_available_space(write_position, read_position);
u32 available_jobs = source_queue->count - source_queue->skip;
u32 writable_count = Min(available_space, available_jobs);
if (writable_count > 0){
u32 count1 = writable_count;
if (count1+write_position > QUEUE_WRAP){
count1 = QUEUE_WRAP - write_position;
}
u32 count2 = writable_count - count1;
Full_Job_Data *job_src1 = source_queue->jobs + source_queue->skip;
Full_Job_Data *job_src2 = job_src1 + count1;
Full_Job_Data *job_dst1 = queue->jobs + write_position;
Full_Job_Data *job_dst2 = queue->jobs;
Assert((job_src1->id % QUEUE_WRAP) == write_position);
memcpy(job_dst1, job_src1, sizeof(Full_Job_Data)*count1);
memcpy(job_dst2, job_src2, sizeof(Full_Job_Data)*count2);
queue->write_position = (write_position + writable_count) % QUEUE_WRAP;
source_queue->skip += writable_count;
if (source_queue->skip == source_queue->count){
source_queue->skip = source_queue->count = 0;
}
else if (source_queue->skip > UNBOUNDED_SKIP_MAX){
u32 left_over = source_queue->count - source_queue->skip;
memmove(queue->jobs, queue->jobs + source_queue->skip, left_over);
source_queue->count = left_over;
source_queue->skip = 0;
}
}
i32 semaphore_release_count = writable_count;
if (semaphore_release_count > thread_count){
semaphore_release_count = thread_count;
}
// NOTE(allen): platform dependent portion...
// TODO(allen): pull out the duplicated part once I see
// that this is pretty much the same on linux.
for (i32 i = 0; i < semaphore_release_count; ++i){
ReleaseSemaphore(Win32Handle(queue->semaphore), 1, 0);
}
}
internal void
flush_thread_group(i32 group_id){
Thread_Group *group = win32vars.groups + group_id;
Work_Queue *queue = win32vars.queues + group_id;
Unbounded_Work_Queue *source_queue = &group->queue;
flush_to_direct_queue(source_queue, queue, group->count);
}
// Note(allen): post_job puts the job on the unbounded queue.
// The unbounded queue is entirely managed by the main thread.
// The thread safe queue is bounded in size so the unbounded
// queue is periodically flushed into the direct work queue.
internal
Sys_Post_Job_Sig(system_post_job){
Thread_Group *group = win32vars.groups + group_id;
Unbounded_Work_Queue *queue = &group->queue;
u32 result = queue->next_job_id++;
while (queue->count >= queue->max){
i32 new_max = queue->max*2;
Full_Job_Data *new_jobs = (Full_Job_Data*)
system_get_memory(new_max*sizeof(Full_Job_Data));
memcpy(new_jobs, queue->jobs, queue->count);
system_free_memory(queue->jobs);
queue->jobs = new_jobs;
queue->max = new_max;
}
Full_Job_Data full_job;
full_job.job = job;
full_job.running_thread = THREAD_NOT_ASSIGNED;
full_job.id = result;
queue->jobs[queue->count++] = full_job;
Work_Queue *direct_queue = win32vars.queues + group_id;
flush_to_direct_queue(queue, direct_queue, group->count);
return(result);
}
internal
Sys_Cancel_Job_Sig(system_cancel_job){
Thread_Group *group = win32vars.groups + group_id;
Unbounded_Work_Queue *source_queue = &group->queue;
b32 handled_in_unbounded = false;
if (source_queue->skip < source_queue->count){
Full_Job_Data *first_job = source_queue->jobs + source_queue->skip;
if (first_job->id <= job_id){
u32 index = source_queue->skip + (job_id - first_job->id);
Full_Job_Data *job = source_queue->jobs + index;
job->running_thread = 0;
handled_in_unbounded = true;
}
}
if (!handled_in_unbounded){
Work_Queue *queue = win32vars.queues + group_id;
Full_Job_Data *job = queue->jobs + (job_id % QUEUE_WRAP);
Assert(job->id == job_id);
u32 thread_id =
InterlockedCompareExchange(&job->running_thread,
0, THREAD_NOT_ASSIGNED);
if (thread_id != THREAD_NOT_ASSIGNED && thread_id != 0){
i32 thread_index = thread_id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
i32 cancel_cv = group->cancel_cv0 + thread_index;
Thread_Context *thread = group->threads + thread_index;
system_acquire_lock(cancel_lock);
thread->cancel = 1;
system_release_lock(FRAME_LOCK);
do{
system_wait_cv(cancel_lock, cancel_cv);
}while (thread->cancel == 1);
system_acquire_lock(FRAME_LOCK);
system_release_lock(cancel_lock);
}
}
}
internal
Sys_Check_Cancel_Sig(system_check_cancel){
b32 result = 0;
Thread_Group *group = win32vars.groups + thread->group_id;
i32 thread_index = thread->id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
system_acquire_lock(cancel_lock);
if (thread->cancel){
result = 1;
}
system_release_lock(cancel_lock);
return(result);
}
internal
Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){
void *old_data;
i32 old_size, new_size;
system_acquire_lock(CANCEL_LOCK0 + memory->id - 1);
old_data = memory->data;
old_size = memory->size;
new_size = LargeRoundUp(memory->size*2, Kbytes(4));
memory->data = system_get_memory(new_size);
memory->size = new_size;
if (old_data){
memcpy(memory->data, old_data, old_size);
system_free_memory(old_data);
}
system_release_lock(CANCEL_LOCK0 + memory->id - 1);
}
#if FRED_INTERNAL
internal void
INTERNAL_get_thread_states(Thread_Group_ID id, bool8 *running, i32 *pending){
Thread_Group *group = win32vars.groups + id;
Unbounded_Work_Queue *source_queue = &group->queue;
Work_Queue *queue = win32vars.queues + id;
u32 write = queue->write_position;
u32 read = queue->read_position;
if (write < read) write += QUEUE_WRAP;
*pending = (i32)(write - read) + source_queue->count - source_queue->skip;
for (i32 i = 0; i < group->count; ++i){
running[i] = (group->threads[i].running != 0);
}
}
#endif
#endif
// //
// Coroutines // Coroutines
@ -1675,9 +1985,9 @@ OpenGLDebugCallback(GLenum source, GLenum type, GLuint id, GLenum severity, GLsi
int int
WinMain(HINSTANCE hInstance, WinMain(HINSTANCE hInstance,
HINSTANCE hPrevInstance, HINSTANCE hPrevInstance,
LPSTR lpCmdLine, LPSTR lpCmdLine,
int nCmdShow){ int nCmdShow){
int argc = __argc; int argc = __argc;
char **argv = __argv; char **argv = __argv;
@ -1739,6 +2049,8 @@ WinMain(HINSTANCE hInstance,
thread->handle = CreateThread(0, 0, JobThreadProc, thread, creation_flag, (LPDWORD)&thread->windows_id); thread->handle = CreateThread(0, 0, JobThreadProc, thread, creation_flag, (LPDWORD)&thread->windows_id);
} }
initialize_unbounded_queue(&win32vars.groups[BACKGROUND_THREADS].queue);
ConvertThreadToFiber(0); ConvertThreadToFiber(0);
win32vars.coroutine_free = win32vars.coroutine_data; win32vars.coroutine_free = win32vars.coroutine_data;
for (i32 i = 0; i+1 < ArrayCount(win32vars.coroutine_data); ++i){ for (i32 i = 0; i+1 < ArrayCount(win32vars.coroutine_data); ++i){
@ -2189,13 +2501,15 @@ WinMain(HINSTANCE hInstance,
HDC hdc = GetDC(win32vars.window_handle); HDC hdc = GetDC(win32vars.window_handle);
Win32RedrawScreen(hdc); Win32RedrawScreen(hdc);
ReleaseDC(win32vars.window_handle, hdc); ReleaseDC(win32vars.window_handle, hdc);
win32vars.first = 0; win32vars.first = 0;
if (result.animating){ if (result.animating){
PostMessage(win32vars.window_handle, WM_4coder_ANIMATE, 0, 0); PostMessage(win32vars.window_handle, WM_4coder_ANIMATE, 0, 0);
} }
flush_thread_group(BACKGROUND_THREADS);
u64 timer_end = Win32HighResolutionTime(); u64 timer_end = Win32HighResolutionTime();
u64 end_target = timer_start + frame_useconds; u64 end_target = timer_start + frame_useconds;
@ -2217,7 +2531,6 @@ WinMain(HINSTANCE hInstance,
// application at some point. // application at some point.
int main(int argc, char **argv){ int main(int argc, char **argv){
HINSTANCE hInstance = GetModuleHandle(0); HINSTANCE hInstance = GetModuleHandle(0);
}
#endif #endif
// BOTTOM // BOTTOM