1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | internal bool32 Win32DoNextWorkQueueEntry(platform_work_queue *Queue) { bool32 WeShouldSleep = false; uint32 OriginalNextEntryToRead = Queue->NextEntryToRead; uint32 NewNextEntryToRead = (OriginalNextEntryToRead + 1) % ArrayCount(Queue->Entries); if(OriginalNextEntryToRead != Queue->NextEntryToWrite) { uint32 Index = InterlockedCompareExchange((LONG volatile *)&Queue->NextEntryToRead, NewNextEntryToRead, OriginalNextEntryToRead); // -> SPOT X <- if(Index == OriginalNextEntryToRead) { platform_work_queue_entry Entry = Queue->Entries[Index]; Entry.Callback(Queue, Entry.Data); InterlockedIncrement((LONG volatile *)&Queue->CompletionCount); } } else { WeShouldSleep = true; } return(WeShouldSleep); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | internal bool32 Win32DoNextWorkQueueEntry(platform_work_queue *Queue) { bool32 WeShouldSleep = false; uint32 OriginalNextEntryToRead = Queue->NextEntryToRead; uint32 NewNextEntryToRead = (OriginalNextEntryToRead + 1) % ArrayCount(Queue->Entries); if(OriginalNextEntryToRead != Queue->NextEntryToWrite) { // Get the entry before we increment the Read head. // NOTE: Original post had NewNextEntryToRead here, but it's been fixed after a suggestion of mrmixer. We want the non-incremented index here. platform_work_queue_entry Entry = Queue->Entries[OriginalNextEntryToRead]; uint32 Index = InterlockedCompareExchange((LONG volatile *)&Queue->NextEntryToRead, NewNextEntryToRead, OriginalNextEntryToRead); if(Index == OriginalNextEntryToRead) { // We were the ones who updated the Read head, so we now have a valid Entry we can proceed with. Entry.Callback(Queue, Entry.Data); InterlockedIncrement((LONG volatile *)&Queue->CompletionCount); } } else { WeShouldSleep = true; } return(WeShouldSleep); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | /* At the start */ Queue->NextEntryToRead = 0; Queue->NextEntryToWrite = 2; /* There a 2 entry, at index 0, 1. If the main thread wants to add a new entry it will write it at Queue->NextEntryToWrite, which is 2 in this example. */ /* Thread 1 enters Win32DoNextWorkQueueEntry. */ OriginalNextEntryToRead = 0; NewNextEntryToRead = 1; OriginalNextEntryToRead != Queue->NextEntryToWrite /* 0 != 1, there is work to do */ Index = InterlockedCompareExchange( &Queue->NextEntryToRead, 1, 0 ); Index == OrginalNextEntryToRead /* 0 == 0, we process the entry at index 0 */ /* Thread 1 finishes to work on entry 0 */ /* Thread 1 enters Win32DoNextWorkQueueEntry again */ OriginalNextEntryToRead = 1; /* Thread 1 local */ NewNextEntryToRead = 2; /* Thread 1 local */ /* Thread 2 enters Win32DoNextWorkQueueEntry before thread one reaches InterlockedCompareExchange. */ OriginalNextEntryToRead = 1; /* Thread 2 local */ NewNextEntryToRead = 2; /* Thread 2 local */ OriginalNextEntryToRead != Queue->NextEntryToWrite /* 1 != 2, there is work to do */ Index = InterlockedCompareExchange( &Queue->NextEntryToRead, 2, 1 ); Index == OrginalNextEntryToRead /* 1 == 1, thread 2 process the entry, Queue->NextEntryToRead equals 2 */ /* Thread 1 */ Index = InterlockedCompareExchange( &Queue->NextEntryToRead, 2, 1 ); Index != OrginalNextEntryToRead /* 2 != 1, Queue->NextEntryToRead was change before we reached InterlockedCompareExchange, we should not process the entry at index 1. */ /* Thread 1 exits Win32DoNextWorkQueueEntry */ /* At some point (doesn't matter when) the main thread added one more entry*/ Queue->NextEntryToWrite = 3; /* Thread 1 enters Win32DoNextWorkQueueEntry */ OriginalNextEntryToRead = 2; NewNextEntryToRead = 3; OriginalNextEntryToRead != Queue->NextEntryToWrite /* 2 != 3, there is work to do */ Index = InterlockedCompareExchange( &Queue->NextEntryToRead, 3, 2 ); Index == OrginalNextEntryToRead /* 2 == 2, we process the entry */ |
martincohen
Also would this fix it? I've moved reading of the Entry just before we do the exchange.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | internal void Win32AddEntry(platform_work_queue *Queue, platform_work_queue_callback *Callback, void *Data) { // TODO(casey): Switch to InterlockedCompareExchange eventually // so that any thread can add? uint32 NewNextEntryToWrite = (Queue->NextEntryToWrite + 1) % ArrayCount(Queue->Entries); Assert(NewNextEntryToWrite != Queue->NextEntryToRead); platform_work_queue_entry *Entry = Queue->Entries + Queue->NextEntryToWrite; Entry->Callback = Callback; Entry->Data = Data; ++Queue->CompletionGoal; _WriteBarrier(); _mm_sfence(); Queue->NextEntryToWrite = NewNextEntryToWrite; ReleaseSemaphore(Queue->SemaphoreHandle, 1, 0); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | /* If the queue has 3 entry. 0 means the entry is not valid, x means it's valid. */ /* - Starting state - */ /* Queue is [ 0 0 0 ] */ NextEntryToWrite = 0; NextEntryToRead = 0; /* Win32AddEntry is called */ NextEntryToWrite + 1 != NextEntryToRead; /* 1 != 0 */ /* Queue is [ x 0 0 ] */ NextEntryToWrite = 1; /* Win32AddEntry is called */ NextEntryToWrite + 1 != NextEntryToRead; /* 2 != 0 */ /* Queue is [ x x 0 ] */ NextEntryToWrite = 2; /* Win32AddEntry is called */ NextEntryToWrite + 1 == NextEntryToRead; /* 0 == 0 : we can't add at index 2, the code would assert. */ /* Queue is [ x x 0 ] */ /* Worker thread does the interlocked exchange but doesn't read the data yet. */ NextEntryToRead = 1; /* Win32AddEntry is called */ NextEntryToWrite + 1 != NextEntryToRead; /* 0 != 1 */ /* Queue is [ x x x ] */ NextEntryToWrite = 0; /* Win32AddEntry is called */ NextEntryToWrite + 1 == NextEntryToRead; /* 1 == 1 : we can't add at index 0, the code would assert. */ /* Queue is [ x x x ] */ /* The worker thread is still safe to read the data at index 0. */ |