Multiple producer/consumer work queue

Hi there,

I was looking over the HH source for the work queue and noticed this comment in Win32AddEntry: "TODO(casey): Switch to InterlockedCompareExchange eventually so that any thread can add?"

I'm interested in creating a multiple producer/consumer queue so thought I'd try it out, but quickly realised that it's quite a tricky problem to solve. Given the code...

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
internal void
Win32AddEntry(platform_work_queue *Queue, platform_work_queue_callback *Callback, void *Data)
{
    // TODO(casey): Switch to InterlockedCompareExchange eventually
    // so that any thread can add?
    uint32 NewNextEntryToWrite = (Queue->NextEntryToWrite + 1) % ArrayCount(Queue->Entries);
    Assert(NewNextEntryToWrite != Queue->NextEntryToRead);
    platform_work_queue_entry *Entry = Queue->Entries + Queue->NextEntryToWrite;
    Entry->Callback = Callback;
    Entry->Data = Data;
    ++Queue->CompletionGoal;
    _WriteBarrier();
    Queue->NextEntryToWrite = NewNextEntryToWrite;
    ReleaseSemaphore(Queue->SemaphoreHandle, 1, 0);
}


We can't use an ICE on NextEntryToWrite to see if we should add the task, because it has to be the last thing written otherwise a consumer might pick up a task that hasn't been written yet as soon as NextEntryToWrite gets updated by the ICE. It seems to me that we need to atomically set the task entry and NextEntryToWrite all at the same time to keep the queue in a valid state.

I was just wondering if anyone had any insights on this?
I think I've discovered a problem with the threading code as well. It's possible for entries to be overwritten before they're executed. It's very unlikely, but still possible if a lot of entries are being added at once.

Here's how:

First, suppose that the queue only has two slots in it (for simplicity). Queue state starts like so:

NextEntryToWrite = 0
NextEntryToRead = 0
Queue: [Empty slot][Empty slot]

Thread A adds an entry to the queue, queue state becomes:

NextEntryToWrite = 1
NextEntryToRead = 0
Queue: [Task 1][Empty slot]

This triggers thread B to wake up and call Win32DoNextWorkQueueEntry. And since OriginalNextEntryToRead != Queue->NextEntryToWrite, the InterlockedCompareExchange happens, and lets suppose it succeeds. But right after that call, suppose that thread B gets interrupted for some reason. So the state of the queue becomes:

NextEntryToWrite = 1
NextEntryToRead = 1 <- Because the ICE succeeded
Queue: [Task 1][Empty slot] <- Note that Task 1 is still in the queue because thread B was interrupted before it executed it.

Now suppose that thread A adds another task to the queue. NewNextEntryToWrite will be 0, which is not equal to NextEntryToRead so the assert won't trigger. The task is added and the state becomes:

NextEntryToWrite = 0
NextEntryToRead = 1
Queue: [Task 1][Task 2]

This triggers thread C to wake up and do Task 2. Suppose it succeeds:

NextEntryToWrite = 0
NextEntryToRead = 0
Queue: [Task 1][Empty slot]

Now if thread A adds another task again (which is possible because the queue isn't full), as long as B hasn't resumed yet, it'll overwrite the task that B is supposed to do.

So yeah, I think this is all correct. And it's only a problem if you're adding a lot of tasks so the queue wraps around, and only if a thread gets interrupted right after the ICE call, and only if other threads pick up later tasks, but it is possible (I think :P).
I am pretty sure that we actually already said that the queue always had to be big enough that it never overruns. I seem to remember talking about that explicitly, and also this assert would seem to indicate that I baked that requirement right into the code:

1
    Assert(NewNextEntryToWrite != Queue->NextEntryToRead);


So I think the understanding here was that the write pointer is never allowed to approach the read pointer, because you actually don't even need the race condition you are describing to happen for a problem, right - the write pointer will just keep on going and romp pending reads all day long if the queue overflows!

It's not particularly difficult to do a system that is better (one that does not have overflow problems, and that allows multiple producer/consumer), but I don't know whether we want to really get into multithreading on HH. We'll see! The hard part tends not to be the queueing, but the data access part.

- Casey