switched to condition-variable based cancelation

This commit is contained in:
Allen Webster 2016-05-31 14:32:58 -04:00
parent 43f65dd5ef
commit a09851af12
3 changed files with 5754 additions and 5634 deletions

File diff suppressed because it is too large Load Diff

View File

@ -197,6 +197,9 @@ typedef Sys_Post_Job_Sig(System_Post_Job);
#define Sys_Cancel_Job_Sig(name) void name(Thread_Group_ID group_id, u32 job_id) #define Sys_Cancel_Job_Sig(name) void name(Thread_Group_ID group_id, u32 job_id)
typedef Sys_Cancel_Job_Sig(System_Cancel_Job); typedef Sys_Cancel_Job_Sig(System_Cancel_Job);
#define Sys_Check_Cancel_Sig(name) b32 name(Thread_Context *thread)
typedef Sys_Check_Cancel_Sig(System_Check_Cancel);
#define Sys_Grow_Thread_Memory_Sig(name) void name(Thread_Memory *memory) #define Sys_Grow_Thread_Memory_Sig(name) void name(Thread_Memory *memory)
typedef Sys_Grow_Thread_Memory_Sig(System_Grow_Thread_Memory); typedef Sys_Grow_Thread_Memory_Sig(System_Grow_Thread_Memory);
@ -248,9 +251,10 @@ struct System_Functions{
System_CLI_Update_Step *cli_update_step; System_CLI_Update_Step *cli_update_step;
System_CLI_End_Update *cli_end_update; System_CLI_End_Update *cli_end_update;
// threads: 5 // threads: 7
System_Post_Job *post_job; System_Post_Job *post_job;
System_Cancel_Job *cancel_job; System_Cancel_Job *cancel_job;
System_Check_Cancel *check_cancel;
System_Grow_Thread_Memory *grow_thread_memory; System_Grow_Thread_Memory *grow_thread_memory;
System_Acquire_Lock *acquire_lock; System_Acquire_Lock *acquire_lock;
System_Release_Lock *release_lock; System_Release_Lock *release_lock;

View File

@ -40,9 +40,11 @@
struct Thread_Context{ struct Thread_Context{
u32 job_id; u32 job_id;
b32 running; b32 running;
b32 cancel;
Work_Queue *queue; Work_Queue *queue;
u32 id; u32 id;
u32 group_id;
u32 windows_id; u32 windows_id;
HANDLE handle; HANDLE handle;
}; };
@ -50,6 +52,9 @@ struct Thread_Context{
struct Thread_Group{ struct Thread_Group{
Thread_Context *threads; Thread_Context *threads;
i32 count; i32 count;
i32 cancel_lock0;
i32 cancel_cv0;
}; };
#define UseWinDll 1 #define UseWinDll 1
@ -111,6 +116,18 @@ struct Sys_Bubble : public Bubble{
}; };
#endif #endif
enum CV_ID{
CANCEL_CV0,
CANCEL_CV1,
CANCEL_CV2,
CANCEL_CV3,
CANCEL_CV4,
CANCEL_CV5,
CANCEL_CV6,
CANCEL_CV7,
CV_COUNT
};
struct Win32_Vars{ struct Win32_Vars{
System_Functions system; System_Functions system;
App_Functions app; App_Functions app;
@ -128,6 +145,7 @@ struct Win32_Vars{
Work_Queue queues[THREAD_GROUP_COUNT]; Work_Queue queues[THREAD_GROUP_COUNT];
Thread_Group groups[THREAD_GROUP_COUNT]; Thread_Group groups[THREAD_GROUP_COUNT];
CRITICAL_SECTION locks[LOCK_COUNT]; CRITICAL_SECTION locks[LOCK_COUNT];
CONDITION_VARIABLE condition_vars[CV_COUNT];
Thread_Memory *thread_memory; Thread_Memory *thread_memory;
Win32_Coroutine coroutine_data[18]; Win32_Coroutine coroutine_data[18];
Win32_Coroutine *coroutine_free; Win32_Coroutine *coroutine_free;
@ -277,10 +295,29 @@ Sys_Release_Lock_Sig(system_release_lock){
LeaveCriticalSection(&win32vars.locks[id]); LeaveCriticalSection(&win32vars.locks[id]);
} }
internal void
system_wait_cv(i32 crit_id, i32 cv_id){
SleepConditionVariableCS(win32vars.condition_vars + cv_id,
win32vars.locks + crit_id,
INFINITE);
}
internal void
system_signal_cv(i32 crit_id, i32 cv_id){
AllowLocal(crit_id);
WakeConditionVariable(win32vars.condition_vars + cv_id);
}
internal DWORD internal DWORD
JobThreadProc(LPVOID lpParameter){ JobThreadProc(LPVOID lpParameter){
Thread_Context *thread = (Thread_Context*)lpParameter; Thread_Context *thread = (Thread_Context*)lpParameter;
Work_Queue *queue = thread->queue; Work_Queue *queue = win32vars.queues + thread->group_id;
Thread_Group *group = win32vars.groups + thread->group_id;
i32 thread_index = thread->id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
i32 cancel_cv = group->cancel_cv0 + thread_index;
for (;;){ for (;;){
u32 read_index = queue->read_position; u32 read_index = queue->read_position;
@ -324,6 +361,13 @@ JobThreadProc(LPVOID lpParameter){
PostMessage(win32vars.window_handle, WM_4coder_ANIMATE, 0, 0); PostMessage(win32vars.window_handle, WM_4coder_ANIMATE, 0, 0);
full_job->running_thread = 0; full_job->running_thread = 0;
thread->running = 0; thread->running = 0;
system_acquire_lock(cancel_lock);
if (thread->cancel){
thread->cancel = 0;
system_signal_cv(cancel_lock, cancel_cv);
}
system_release_lock(cancel_lock);
} }
} }
} }
@ -362,8 +406,18 @@ Sys_Post_Job_Sig(system_post_job){
return result; return result;
} }
// TODO(allen): I would like to get rid of job canceling // NOTE(allen): New job cancelling system:
// but I still don't know what exactly I would do without it. //
// Jobs are expected to periodically check their cancelation
// state, especially if they are taking a while.
//
// When the main thread asks to cancel a job it sets the cancel
// state and does not resume until the thread running the job
// signals that it is okay.
//
// Don't hold the frame lock while sleeping, as this can dead-lock
// the job thread and the main thread, and since main is sleeping
// they won't collide anyway.
internal internal
Sys_Cancel_Job_Sig(system_cancel_job){ Sys_Cancel_Job_Sig(system_cancel_job){
Work_Queue *queue = win32vars.queues + group_id; Work_Queue *queue = win32vars.queues + group_id;
@ -372,7 +426,6 @@ Sys_Cancel_Job_Sig(system_cancel_job){
u32 job_index; u32 job_index;
u32 thread_id; u32 thread_id;
Full_Job_Data *full_job; Full_Job_Data *full_job;
Thread_Context *thread;
job_index = job_id % QUEUE_WRAP; job_index = job_id % QUEUE_WRAP;
full_job = queue->jobs + job_index; full_job = queue->jobs + job_index;
@ -382,19 +435,47 @@ Sys_Cancel_Job_Sig(system_cancel_job){
InterlockedCompareExchange(&full_job->running_thread, InterlockedCompareExchange(&full_job->running_thread,
0, THREAD_NOT_ASSIGNED); 0, THREAD_NOT_ASSIGNED);
if (thread_id != THREAD_NOT_ASSIGNED){ if (thread_id != THREAD_NOT_ASSIGNED && thread_id != 0){
system_acquire_lock(CANCEL_LOCK0 + thread_id - 1); i32 thread_index = thread_id - 1;
thread = group->threads + thread_id - 1;
TerminateThread(thread->handle, 0); i32 cancel_lock = group->cancel_lock0 + thread_index;
u32 creation_flag = 0; i32 cancel_cv = group->cancel_cv0 + thread_index;
thread->handle = CreateThread(0, 0, JobThreadProc, thread, creation_flag, (LPDWORD)&thread->windows_id); Thread_Context *thread = group->threads + thread_index;
system_release_lock(CANCEL_LOCK0 + thread_id - 1);
thread->running = 0;
system_acquire_lock(cancel_lock);
thread->cancel = 1;
system_release_lock(FRAME_LOCK);
do{
system_wait_cv(cancel_lock, cancel_cv);
}while (thread->cancel == 1);
system_acquire_lock(FRAME_LOCK);
system_release_lock(cancel_lock);
} }
} }
internal void internal
system_grow_thread_memory(Thread_Memory *memory){ Sys_Check_Cancel_Sig(system_check_cancel){
b32 result = 0;
Thread_Group *group = win32vars.groups + thread->group_id;
i32 thread_index = thread->id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
system_acquire_lock(cancel_lock);
if (thread->cancel){
result = 1;
}
system_release_lock(cancel_lock);
return(result);
}
internal
Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){
void *old_data; void *old_data;
i32 old_size, new_size; i32 old_size, new_size;
@ -1178,6 +1259,7 @@ Win32LoadSystemCode(){
win32vars.system.post_job = system_post_job; win32vars.system.post_job = system_post_job;
win32vars.system.cancel_job = system_cancel_job; win32vars.system.cancel_job = system_cancel_job;
win32vars.system.check_cancel = system_check_cancel;
win32vars.system.grow_thread_memory = system_grow_thread_memory; win32vars.system.grow_thread_memory = system_grow_thread_memory;
win32vars.system.acquire_lock = system_acquire_lock; win32vars.system.acquire_lock = system_acquire_lock;
win32vars.system.release_lock = system_release_lock; win32vars.system.release_lock = system_release_lock;
@ -1621,18 +1703,22 @@ WinMain(HINSTANCE hInstance,
memset(background, 0, sizeof(background)); memset(background, 0, sizeof(background));
win32vars.groups[BACKGROUND_THREADS].threads = background; win32vars.groups[BACKGROUND_THREADS].threads = background;
win32vars.groups[BACKGROUND_THREADS].count = ArrayCount(background); win32vars.groups[BACKGROUND_THREADS].count = ArrayCount(background);
win32vars.groups[BACKGROUND_THREADS].cancel_lock0 = CANCEL_LOCK0;
win32vars.groups[BACKGROUND_THREADS].cancel_cv0 = CANCEL_CV0;
Thread_Memory thread_memory[ArrayCount(background)]; Thread_Memory thread_memory[ArrayCount(background)];
win32vars.thread_memory = thread_memory; win32vars.thread_memory = thread_memory;
win32vars.queues[BACKGROUND_THREADS].semaphore = win32vars.queues[BACKGROUND_THREADS].semaphore =
Win32Handle(CreateSemaphore(0, 0, Win32Handle(CreateSemaphore(0, 0,
win32vars.groups[BACKGROUND_THREADS].count, 0)); win32vars.groups[BACKGROUND_THREADS].count,
0));
u32 creation_flag = 0; u32 creation_flag = 0;
for (i32 i = 0; i < win32vars.groups[BACKGROUND_THREADS].count; ++i){ for (i32 i = 0; i < win32vars.groups[BACKGROUND_THREADS].count; ++i){
Thread_Context *thread = win32vars.groups[BACKGROUND_THREADS].threads + i; Thread_Context *thread = win32vars.groups[BACKGROUND_THREADS].threads + i;
thread->id = i + 1; thread->id = i + 1;
thread->group_id = BACKGROUND_THREADS;
Thread_Memory *memory = win32vars.thread_memory + i; Thread_Memory *memory = win32vars.thread_memory + i;
*memory = thread_memory_zero(); *memory = thread_memory_zero();