I first tried putting the test code straight into my normal project. If I stepped through the code slowly, it seemed to work fine, but occasionally it threw an exception at Win32DoNextWorkQueueEntry - the Queue pointer was pointing somewhere invalid.
As it's my first time debugging multithreaded code, I'm not too sure what I'm looking for.
I played around with the bug a bit last night in an otherwise empty file - looks like the main thread continues while one of the worker threads is still running. The main thread goes through the exit process, uninitializing the threads (it's in e.g. try_cor_exit_process(...) / _free_base(...)). This would make sense if it invalidates Queue, leading to a write violation...
It fails about half of the time.
Did I make a mistake in copying it over? Is this something that got fixed? I didn't spot any changes in a quick glance through the most recent episodes' files.
I've included my code below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | #include <types.h> #include <stb_sprintf.h> #include <windows.h> typedef struct work_queue work_queue; #define WORK_QUEUE_CALLBACK(name) void name(work_queue *Queue, void *Data) typedef WORK_QUEUE_CALLBACK(work_queue_callback); typedef struct work_queue_entry { work_queue_callback *Callback; void *Data; } work_queue_entry; typedef struct work_queue { u32 volatile CompletionGoal; u32 volatile CompletionCount; u32 volatile NextEntryToRead; u32 volatile NextEntryToWrite; HANDLE SemaphoreHandle; work_queue_entry Entries[256]; } work_queue; internal void Win32AddEntry(work_queue *Queue, work_queue_callback *Callback, void *Data) { u32 NewNextEntryToWrite = (Queue->NextEntryToWrite + 1) % ArrayCount(Queue->Entries); Assert(NewNextEntryToWrite != Queue->NextEntryToRead); work_queue_entry *Entry = Queue->Entries + Queue->NextEntryToWrite; Entry->Callback = Callback; Entry->Data = Data; ++Queue->CompletionGoal; _WriteBarrier(); _mm_sfence(); Queue->NextEntryToWrite = NewNextEntryToWrite; ReleaseSemaphore(Queue->SemaphoreHandle, 1, 0); } internal b32 Win32DoNextWorkQueueEntry(work_queue *Queue) { b32 ThreadShouldWait = 0; u32 OriginalNextEntryToRead = Queue->NextEntryToRead; u32 NewNextEntryToRead = (OriginalNextEntryToRead + 1) % ArrayCount(Queue->Entries); if(OriginalNextEntryToRead != Queue->NextEntryToWrite) { u32 Index = InterlockedCompareExchange((LONG volatile *)&Queue->NextEntryToRead, NewNextEntryToRead, OriginalNextEntryToRead); if(Index == OriginalNextEntryToRead) { work_queue_entry Entry = Queue->Entries[Index]; Entry.Callback(Queue, Entry.Data); InterlockedIncrement((LONG volatile *)&Queue->CompletionCount); } } else { ThreadShouldWait = 1; } return ThreadShouldWait; } /// the calling thread also does work internal void Win32CompleteAllWork(work_queue *Queue) { while(Queue->CompletionGoal != Queue->CompletionCount) { Win32DoNextWorkQueueEntry(Queue); } Queue->CompletionGoal = 0; Queue->CompletionCount = 0; } typedef struct thread_info { uint LogicalThreadIndex; work_queue *Queue; } thread_info; DWORD WINAPI WorkQueueProc(LPVOID lpParameter) { thread_info *ThreadInfo = (thread_info *)lpParameter; for(;;) { if(Win32DoNextWorkQueueEntry(ThreadInfo->Queue)) { WaitForSingleObjectEx(ThreadInfo->Queue->SemaphoreHandle, INFINITE, FALSE); } } } internal WORK_QUEUE_CALLBACK(DoWorkerWork) { Queue; // unused char Buffer[256] = "Callback running"; stbsp_sprintf(Buffer, "Thread %u: %s", GetCurrentThreadId(), (char *)Data); OutputDebugStringA(Buffer); } int CALLBACK WinMain(HINSTANCE Instance, HINSTANCE PrevInstance, LPSTR CommandLine, int ShowCode) { Instance, PrevInstance, CommandLine, ShowCode; thread_info ThreadInfo[7];// = {0}; u32 cThreads = ArrayCount(ThreadInfo); work_queue Queue = {0}; u32 InitialCount = 0; Queue.SemaphoreHandle = CreateSemaphoreEx(0, InitialCount, cThreads, 0, 0, SEMAPHORE_ALL_ACCESS); for(u32 iThread = 0; iThread < cThreads; ++iThread) { thread_info *Info = ThreadInfo + iThread; Info->Queue = &Queue; Info->LogicalThreadIndex = iThread; DWORD ThreadID; HANDLE ThreadHandle = CreateThread(0, 0, WorkQueueProc, Info, 0, &ThreadID); CloseHandle(ThreadHandle); } Win32AddEntry(&Queue, DoWorkerWork, "String A0\n"); Win32AddEntry(&Queue, DoWorkerWork, "String A1\n"); Win32AddEntry(&Queue, DoWorkerWork, "String A2\n"); Win32AddEntry(&Queue, DoWorkerWork, "String A3\n"); Win32AddEntry(&Queue, DoWorkerWork, "String A4\n"); Win32AddEntry(&Queue, DoWorkerWork, "String A5\n"); Win32AddEntry(&Queue, DoWorkerWork, "String A6\n"); Win32AddEntry(&Queue, DoWorkerWork, "String A7\n"); Win32AddEntry(&Queue, DoWorkerWork, "String A8\n"); Win32AddEntry(&Queue, DoWorkerWork, "String A9\n"); Win32CompleteAllWork(&Queue); return 0; } |