Question on / possible problem with multithreading queue

I've been watching Casey's multithreading Queue implementation and I'm not sure about one part:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
internal bool32
Win32DoNextWorkQueueEntry(platform_work_queue *Queue)
{
    bool32 WeShouldSleep = false;

    uint32 OriginalNextEntryToRead = Queue->NextEntryToRead;
    uint32 NewNextEntryToRead = (OriginalNextEntryToRead + 1) % ArrayCount(Queue->Entries);
    if(OriginalNextEntryToRead != Queue->NextEntryToWrite)
    {
        uint32 Index = InterlockedCompareExchange((LONG volatile *)&Queue->NextEntryToRead,
                                                  NewNextEntryToRead,
                                                  OriginalNextEntryToRead);
        // -> SPOT X <-
        if(Index == OriginalNextEntryToRead)
        {
            platform_work_queue_entry Entry = Queue->Entries[Index];
            Entry.Callback(Queue, Entry.Data);
            InterlockedIncrement((LONG volatile *)&Queue->CompletionCount);
        }
    }
    else
    {
        WeShouldSleep = true;
    }

    return(WeShouldSleep);
}


At the -> SPOT X <- marked in the code, isn't there a possibility that the main thread will write to the queue at the given NewNextEntryToRead a new entry? Which would result in worker thread to pickup the newer entry, and the old entry will be lost. Now the code might work in the scenario it is used in HMH (considering we're waiting for all threads to finish every frame before we schedule new work), but other scenarios would end up having to deal with it.

Also would this fix it? I've moved reading of the Entry just before we do the exchange. Entry might be invalid at this point, but we test it's validity by the exchange, and only when it's confirmed we'd use it to do the work.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
internal bool32
Win32DoNextWorkQueueEntry(platform_work_queue *Queue)
{
    bool32 WeShouldSleep = false;

    uint32 OriginalNextEntryToRead = Queue->NextEntryToRead;
    uint32 NewNextEntryToRead = (OriginalNextEntryToRead + 1) % ArrayCount(Queue->Entries);
    if(OriginalNextEntryToRead != Queue->NextEntryToWrite)
    {
        // Get the entry before we increment the Read head.
        // NOTE: Original post had NewNextEntryToRead here, but it's been fixed after a suggestion of mrmixer. We want the non-incremented index here.
        platform_work_queue_entry Entry = Queue->Entries[OriginalNextEntryToRead];
        uint32 Index = InterlockedCompareExchange((LONG volatile *)&Queue->NextEntryToRead,
                                                  NewNextEntryToRead,
                                                  OriginalNextEntryToRead);
        if(Index == OriginalNextEntryToRead)
        {
            // We were the ones who updated the Read head, so we now have a valid Entry we can proceed with.
            Entry.Callback(Queue, Entry.Data);
            InterlockedIncrement((LONG volatile *)&Queue->CompletionCount);
        }
    }
    else
    {
        WeShouldSleep = true;
    }

    return(WeShouldSleep);
}


Edited by Martin Cohen on Reason: Bug in code.
I may be wrong because don't do multi-threading often, but here we want to read the current entry index, which is Queue->NextEntryToRead, in a local variable and than increment Queue->NextEntryToRead so another thread will not pick up the same entry.

OriginalNextEntryToRead contains the entry index we want to read.
NewNextEntryToRead contains the incremented value.

When we call InterlockedCompareExchange, the function will compare that Queue->NextEntryToRead has the same value as OriginalNextEntryToRead and if only if it does, it will set Queue->NextEntryToRead to NewNextEntryToRead. It will return in index the value of Queue->NextEntryToRead before it changed it. It's an atomic operation, so you are guaranteed that no other thread will change the value while the function is updating it.

So if index == OriginalNextEntryToRead after the intrelocked operation, you know that you are the only thread that has access to the the entry at index.

If index != OriginalNextEntryToRead than another thread as modified Queue->NextEntryToRead between the moment you read Queue->NextEntryToRead into OriginalNextEntryToRead and the moment you called InterlockedCompareExchange and you should not use the entry at index.

Could you point us to the episode this code is from, so we can refer to it if necessary.

Edited by Simon Anciaux on Reason: typo
With my fix, I am only using the Entry at given index ONLY if the InterlockedCompareExchange does the exchange, so technically it does the same thing, except it gets the entry before it exchanges the NextEntryToRead with an incremented value.

However my solution might have other problem: The entry at that index might have not been written to yet by the main thread?

I'll update the main post with address to the episode.
What I meant to say is that I don't think there is an error in the original code. But again I may be wrong and I remember that there was a bug in the multi-threading code but I don't remember in which episode it was addressed or if it concern the code we are discussing.

In your fix, you seem to want to access the NewNextEntryToRead, but we don't want that, we want to access NextEntryToRead.

Here is a time line of how work is picked up by threads.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/* At the start */
Queue->NextEntryToRead = 0;
Queue->NextEntryToWrite = 2;
/* There a 2 entry, at index 0, 1. If the main thread wants to add a new entry
it will write it at Queue->NextEntryToWrite, which is 2 in this example. */

/* Thread 1 enters Win32DoNextWorkQueueEntry. */
OriginalNextEntryToRead = 0;
NewNextEntryToRead = 1;
OriginalNextEntryToRead != Queue->NextEntryToWrite /* 0 != 1, there is work to do */
Index = InterlockedCompareExchange( &Queue->NextEntryToRead, 1, 0 );
Index == OrginalNextEntryToRead /* 0 == 0, we process the entry at index 0 */
/* Thread 1 finishes to work on entry 0 */

/* Thread 1 enters Win32DoNextWorkQueueEntry again */
OriginalNextEntryToRead = 1; /* Thread 1 local */
NewNextEntryToRead = 2; /* Thread 1 local */
/* Thread 2 enters Win32DoNextWorkQueueEntry before thread one reaches InterlockedCompareExchange. */
    OriginalNextEntryToRead = 1; /* Thread 2 local */
    NewNextEntryToRead = 2; /* Thread 2 local */
    OriginalNextEntryToRead != Queue->NextEntryToWrite /* 1 != 2, there is work to do */
    Index = InterlockedCompareExchange( &Queue->NextEntryToRead, 2, 1 );
    Index == OrginalNextEntryToRead /* 1 == 1, thread 2 process the entry, Queue->NextEntryToRead equals 2 */
/* Thread 1 */
Index = InterlockedCompareExchange( &Queue->NextEntryToRead, 2, 1 );
Index != OrginalNextEntryToRead /* 2 != 1, Queue->NextEntryToRead was change before we reached
InterlockedCompareExchange, we should not process the entry at index 1. */
/* Thread 1 exits Win32DoNextWorkQueueEntry */

/* At some point (doesn't matter when) the main thread added one more entry*/
Queue->NextEntryToWrite = 3;

/* Thread 1 enters Win32DoNextWorkQueueEntry */
OriginalNextEntryToRead = 2;
NewNextEntryToRead = 3;
OriginalNextEntryToRead != Queue->NextEntryToWrite /* 2 != 3, there is work to do */
Index = InterlockedCompareExchange( &Queue->NextEntryToRead, 3, 2 );
Index == OrginalNextEntryToRead /* 2 == 2, we process the entry */

Edited by Simon Anciaux on Reason: typo
Yes you're right, I'll add a comment to the original post. However the idea of getting the entry before we do increment has not yet been addressed.
I don't think accessing the Entry before the interlocked operation or after changes anything in the logic. But it might (I don't actually know) affect the CPU cache coherency since two thread might try to access the same memory. Since it's only a read it should be ok though.

Also InterlockedCompareExchange adds a memory fence, and it will force reads and writes to be finished before continuing (note that this is a different documentation that the one I linked in the first post).
martincohen
Also would this fix it? I've moved reading of the Entry just before we do the exchange.

This is exactly same code as before. InterlockedCompareExchange returns what Queue->NextEntryToRead currently has. And we are interested only in OriginaNextEntryToRead value. So your code will always use same index as original code.

Edited by Mārtiņš Možeiko on
It wasn't clear to me that the queue was a circular buffer. I think I see the problem you were mentioning, but I think it's prevented in the Win32AddEntry function.

I'll try to rephrase the problem to see if it was what you meant.

After a worker thread does the interlocked exchange and before he reads the entry, the main thread could overwrite the entry if the queue had looped around because Queue->NextEntryToWrite would no longer be equal to Queue->NextEntryToRead. In that case I think your fix would have solved the problem.

But in Win32AddEntry the condition to be able to add an entry is for Queue->NextEntryToWrite + 1 to not be equal to Queue->NextEntryToRead.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
internal void
Win32AddEntry(platform_work_queue *Queue, platform_work_queue_callback *Callback, void *Data)
{
    // TODO(casey): Switch to InterlockedCompareExchange eventually
    // so that any thread can add?
    uint32 NewNextEntryToWrite = (Queue->NextEntryToWrite + 1) % ArrayCount(Queue->Entries);
    Assert(NewNextEntryToWrite != Queue->NextEntryToRead);
    platform_work_queue_entry *Entry = Queue->Entries + Queue->NextEntryToWrite;
    Entry->Callback = Callback;
    Entry->Data = Data;
    ++Queue->CompletionGoal;
    _WriteBarrier();
    _mm_sfence();
    Queue->NextEntryToWrite = NewNextEntryToWrite;
    ReleaseSemaphore(Queue->SemaphoreHandle, 1, 0);
}


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/* If the queue has 3 entry. 0 means the entry is not valid, x means it's valid. */

/* - Starting state - */
/* Queue is [ 0 0 0 ] */
NextEntryToWrite = 0;
NextEntryToRead = 0;

/* Win32AddEntry is called */
NextEntryToWrite + 1 != NextEntryToRead; /* 1 != 0 */
/* Queue is [ x 0 0 ] */
NextEntryToWrite = 1;

/* Win32AddEntry is called */
NextEntryToWrite + 1 != NextEntryToRead; /* 2 != 0 */
/* Queue is [ x x 0 ] */
NextEntryToWrite = 2;

/* Win32AddEntry is called */
NextEntryToWrite + 1 == NextEntryToRead; /* 0 == 0 : we can't add at index 2, the code would assert. */
/* Queue is [ x x 0 ] */

/* Worker thread does the interlocked exchange but doesn't read the data yet. */
NextEntryToRead = 1;

/* Win32AddEntry is called */
NextEntryToWrite + 1 != NextEntryToRead; /* 0 != 1 */
/* Queue is [ x x x ] */
NextEntryToWrite = 0;

/* Win32AddEntry is called */
NextEntryToWrite + 1 == NextEntryToRead; /* 1 == 1 : we can't add at index 0, the code would assert. */
/* Queue is [ x x x ] */

/* The worker thread is still safe to read the data at index 0. */


EDIT: After thinking a little more, a second worker thread advancing the NextEntryToRead value before the first thread read the entry would cause a problem. So reading the entry before the interlocked operation is probably the solution as martincohen said.

Edited by Simon Anciaux on
@mmozeiko: The difference is when you're getting the new value of the entry. Increasing the NextEntryToDo would basically "unlock" the main thread (if the code in the MT was done more general), at least that was my feeling that I wanted to discuss.

@mrmixer: Yeh, the queue is circular. I've been also looking into other code online, here, for example, Dmitry Vyukov is solving similar problem (except it's not SPMC, but MPMC) and also reads the cell before he increments the counter with interlocked*.

After a while of reading through Dmitry's implementation to understand it, I decided to adopt his ideas from the MPMC queue for now, although eventually I'll be implementing SPMC and MPSC variants of it.