I'm delving into multithreading for the first time, and copy/pasted Casey's code from HMH126 into my own (with a couple of minor changes to make it valid C rather than C++)...
I first tried putting the test code straight into my normal project. If I stepped through the code slowly, it seemed to work fine, but occasionally it threw an exception at Win32DoNextWorkQueueEntry - the Queue pointer was pointing somewhere invalid.
As it's my first time debugging multithreaded code, I'm not too sure what I'm looking for.
I played around with the bug a bit last night in an otherwise empty file - looks like the main thread continues while one of the worker threads is still running. The main thread goes through the exit process, uninitializing the threads (it's in e.g. try_cor_exit_process(...) / _free_base(...)). This would make sense if it invalidates Queue, leading to a write violation...
It fails about half of the time.
Did I make a mistake in copying it over? Is this something that got fixed? I didn't spot any changes in a quick glance through the most recent episodes' files.
I've included my code below:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 | #include <types.h>
#include <stb_sprintf.h>
#include <windows.h>
typedef struct work_queue work_queue;
#define WORK_QUEUE_CALLBACK(name) void name(work_queue *Queue, void *Data)
typedef WORK_QUEUE_CALLBACK(work_queue_callback);
typedef struct work_queue_entry
{
work_queue_callback *Callback;
void *Data;
} work_queue_entry;
typedef struct work_queue
{
u32 volatile CompletionGoal;
u32 volatile CompletionCount;
u32 volatile NextEntryToRead;
u32 volatile NextEntryToWrite;
HANDLE SemaphoreHandle;
work_queue_entry Entries[256];
} work_queue;
internal void
Win32AddEntry(work_queue *Queue, work_queue_callback *Callback, void *Data)
{
u32 NewNextEntryToWrite = (Queue->NextEntryToWrite + 1) % ArrayCount(Queue->Entries);
Assert(NewNextEntryToWrite != Queue->NextEntryToRead);
work_queue_entry *Entry = Queue->Entries + Queue->NextEntryToWrite;
Entry->Callback = Callback;
Entry->Data = Data;
++Queue->CompletionGoal;
_WriteBarrier();
_mm_sfence();
Queue->NextEntryToWrite = NewNextEntryToWrite;
ReleaseSemaphore(Queue->SemaphoreHandle, 1, 0);
}
internal b32
Win32DoNextWorkQueueEntry(work_queue *Queue)
{
b32 ThreadShouldWait = 0;
u32 OriginalNextEntryToRead = Queue->NextEntryToRead;
u32 NewNextEntryToRead = (OriginalNextEntryToRead + 1) % ArrayCount(Queue->Entries);
if(OriginalNextEntryToRead != Queue->NextEntryToWrite)
{
u32 Index = InterlockedCompareExchange((LONG volatile *)&Queue->NextEntryToRead,
NewNextEntryToRead,
OriginalNextEntryToRead);
if(Index == OriginalNextEntryToRead)
{
work_queue_entry Entry = Queue->Entries[Index];
Entry.Callback(Queue, Entry.Data);
InterlockedIncrement((LONG volatile *)&Queue->CompletionCount);
}
}
else
{ ThreadShouldWait = 1; }
return ThreadShouldWait;
}
/// the calling thread also does work
internal void
Win32CompleteAllWork(work_queue *Queue)
{
while(Queue->CompletionGoal != Queue->CompletionCount)
{ Win32DoNextWorkQueueEntry(Queue); }
Queue->CompletionGoal = 0;
Queue->CompletionCount = 0;
}
typedef struct thread_info
{
uint LogicalThreadIndex;
work_queue *Queue;
} thread_info;
DWORD WINAPI
WorkQueueProc(LPVOID lpParameter)
{
thread_info *ThreadInfo = (thread_info *)lpParameter;
for(;;)
{
if(Win32DoNextWorkQueueEntry(ThreadInfo->Queue))
{ WaitForSingleObjectEx(ThreadInfo->Queue->SemaphoreHandle, INFINITE, FALSE); }
}
}
internal WORK_QUEUE_CALLBACK(DoWorkerWork)
{
Queue; // unused
char Buffer[256] = "Callback running";
stbsp_sprintf(Buffer, "Thread %u: %s", GetCurrentThreadId(), (char *)Data);
OutputDebugStringA(Buffer);
}
int CALLBACK
WinMain(HINSTANCE Instance, HINSTANCE PrevInstance, LPSTR CommandLine, int ShowCode)
{
Instance, PrevInstance, CommandLine, ShowCode;
thread_info ThreadInfo[7];// = {0};
u32 cThreads = ArrayCount(ThreadInfo);
work_queue Queue = {0};
u32 InitialCount = 0;
Queue.SemaphoreHandle = CreateSemaphoreEx(0, InitialCount, cThreads, 0, 0, SEMAPHORE_ALL_ACCESS);
for(u32 iThread = 0; iThread < cThreads; ++iThread)
{
thread_info *Info = ThreadInfo + iThread;
Info->Queue = &Queue;
Info->LogicalThreadIndex = iThread;
DWORD ThreadID;
HANDLE ThreadHandle = CreateThread(0, 0, WorkQueueProc, Info, 0, &ThreadID);
CloseHandle(ThreadHandle);
}
Win32AddEntry(&Queue, DoWorkerWork, "String A0\n");
Win32AddEntry(&Queue, DoWorkerWork, "String A1\n");
Win32AddEntry(&Queue, DoWorkerWork, "String A2\n");
Win32AddEntry(&Queue, DoWorkerWork, "String A3\n");
Win32AddEntry(&Queue, DoWorkerWork, "String A4\n");
Win32AddEntry(&Queue, DoWorkerWork, "String A5\n");
Win32AddEntry(&Queue, DoWorkerWork, "String A6\n");
Win32AddEntry(&Queue, DoWorkerWork, "String A7\n");
Win32AddEntry(&Queue, DoWorkerWork, "String A8\n");
Win32AddEntry(&Queue, DoWorkerWork, "String A9\n");
Win32CompleteAllWork(&Queue);
return 0;
}
|