Bug in multithreaded circular FIFO queue? (HMH Ep 126)

I'm delving into multithreading for the first time, and copy/pasted Casey's code from HMH126 into my own (with a couple of minor changes to make it valid C rather than C++)...

I first tried putting the test code straight into my normal project. If I stepped through the code slowly, it seemed to work fine, but occasionally it threw an exception at Win32DoNextWorkQueueEntry - the Queue pointer was pointing somewhere invalid.

As it's my first time debugging multithreaded code, I'm not too sure what I'm looking for.

I played around with the bug a bit last night in an otherwise empty file - looks like the main thread continues while one of the worker threads is still running. The main thread goes through the exit process, uninitializing the threads (it's in e.g. try_cor_exit_process(...) / _free_base(...)). This would make sense if it invalidates Queue, leading to a write violation...

It fails about half of the time.

Did I make a mistake in copying it over? Is this something that got fixed? I didn't spot any changes in a quick glance through the most recent episodes' files.

I've included my code below:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#include <types.h>
#include <stb_sprintf.h>
#include <windows.h>

typedef struct work_queue work_queue;
#define WORK_QUEUE_CALLBACK(name) void name(work_queue *Queue, void *Data)
typedef WORK_QUEUE_CALLBACK(work_queue_callback);

typedef struct work_queue_entry
{
	work_queue_callback *Callback;
	void *Data;
} work_queue_entry;

typedef struct work_queue
{
	u32 volatile CompletionGoal;
	u32 volatile CompletionCount;

	u32 volatile NextEntryToRead;
	u32 volatile NextEntryToWrite;
	HANDLE SemaphoreHandle;

	work_queue_entry Entries[256];
} work_queue;

internal void
Win32AddEntry(work_queue *Queue, work_queue_callback *Callback, void *Data)
{
    u32 NewNextEntryToWrite = (Queue->NextEntryToWrite + 1) % ArrayCount(Queue->Entries);
    Assert(NewNextEntryToWrite != Queue->NextEntryToRead);
    work_queue_entry *Entry = Queue->Entries + Queue->NextEntryToWrite;
    Entry->Callback = Callback;
    Entry->Data = Data;
    ++Queue->CompletionGoal;
    _WriteBarrier();
    _mm_sfence();
    Queue->NextEntryToWrite = NewNextEntryToWrite;
    ReleaseSemaphore(Queue->SemaphoreHandle, 1, 0);
}

internal b32
Win32DoNextWorkQueueEntry(work_queue *Queue)
{
    b32 ThreadShouldWait = 0;

    u32 OriginalNextEntryToRead = Queue->NextEntryToRead;
    u32 NewNextEntryToRead = (OriginalNextEntryToRead + 1) % ArrayCount(Queue->Entries);
    if(OriginalNextEntryToRead != Queue->NextEntryToWrite)
    {
        u32 Index = InterlockedCompareExchange((LONG volatile *)&Queue->NextEntryToRead,
                                                  NewNextEntryToRead,
                                                  OriginalNextEntryToRead);
        if(Index == OriginalNextEntryToRead)
        {        
            work_queue_entry Entry = Queue->Entries[Index];
            Entry.Callback(Queue, Entry.Data);
            InterlockedIncrement((LONG volatile *)&Queue->CompletionCount);
        }
    }
    else
    { ThreadShouldWait = 1; }

    return ThreadShouldWait;
}

/// the calling thread also does work
internal void
Win32CompleteAllWork(work_queue *Queue)
{
    while(Queue->CompletionGoal != Queue->CompletionCount)
    { Win32DoNextWorkQueueEntry(Queue); }
    Queue->CompletionGoal = 0;
    Queue->CompletionCount = 0;
}

typedef struct thread_info
{
    uint LogicalThreadIndex;
    work_queue *Queue;
} thread_info;

DWORD WINAPI
WorkQueueProc(LPVOID lpParameter)
{
    thread_info *ThreadInfo = (thread_info *)lpParameter;
    for(;;)
    {
        if(Win32DoNextWorkQueueEntry(ThreadInfo->Queue))
        { WaitForSingleObjectEx(ThreadInfo->Queue->SemaphoreHandle, INFINITE, FALSE); }
    }
}

internal WORK_QUEUE_CALLBACK(DoWorkerWork)
{
	Queue; // unused
    char Buffer[256] = "Callback running";
    stbsp_sprintf(Buffer, "Thread %u: %s", GetCurrentThreadId(), (char *)Data);
    OutputDebugStringA(Buffer);
}

int CALLBACK
WinMain(HINSTANCE Instance, HINSTANCE PrevInstance, LPSTR CommandLine, int ShowCode)
{
	Instance, PrevInstance, CommandLine, ShowCode;
    thread_info ThreadInfo[7];// = {0};
	u32 cThreads = ArrayCount(ThreadInfo);
	work_queue Queue = {0};

	u32 InitialCount = 0;
	Queue.SemaphoreHandle = CreateSemaphoreEx(0, InitialCount, cThreads, 0, 0, SEMAPHORE_ALL_ACCESS);

	for(u32 iThread = 0; iThread < cThreads; ++iThread)
	{
		thread_info *Info = ThreadInfo + iThread;
		Info->Queue = &Queue;
		Info->LogicalThreadIndex = iThread;

		DWORD ThreadID;
		HANDLE ThreadHandle = CreateThread(0, 0, WorkQueueProc, Info, 0, &ThreadID);
		CloseHandle(ThreadHandle);
	}

	Win32AddEntry(&Queue, DoWorkerWork, "String A0\n");
	Win32AddEntry(&Queue, DoWorkerWork, "String A1\n");
	Win32AddEntry(&Queue, DoWorkerWork, "String A2\n");
	Win32AddEntry(&Queue, DoWorkerWork, "String A3\n");
	Win32AddEntry(&Queue, DoWorkerWork, "String A4\n");
	Win32AddEntry(&Queue, DoWorkerWork, "String A5\n");
	Win32AddEntry(&Queue, DoWorkerWork, "String A6\n");
	Win32AddEntry(&Queue, DoWorkerWork, "String A7\n");
	Win32AddEntry(&Queue, DoWorkerWork, "String A8\n");
	Win32AddEntry(&Queue, DoWorkerWork, "String A9\n");

	Win32CompleteAllWork(&Queue);

	return 0;
}

Edited by Andrew Reece on Reason: code entry missing content
At the very least you'll need to add Win32CompleteAllWork to the bottom of WinMain so the program doesn't terminate before completing the work items.
I deleted some of the text lines at the end just to make the example shorter; it's possible that I deleted that as well.
I'm at work at the moment, so I'll check that when I get back home. Hopefully it's that simple!

Thanks for the sanity check!
Right, tried it again after getting home. Turns out I copied it badly into the forum. It still fails in the same way with Win32CompleteAllWork at the end.
I built your code and am seeing the same thing. Looks like main is exiting before all the threads have terminated. To fix this you would need to do the following: i) terminate the threads when there is no more work -- at the moment, the threads will never return due to the for(;;) and ii) wait for all threads to terminate before exiting WinMain, maybe with WaitForMultipleObjects on the thread handles.

Hope that helps.