Handmade Hero»Forums»Code
Daniel Glinka
2 posts
Extend work queue to multiple producers - multiple consumers
Edited by Daniel Glinka on Reason: Fixed code
Hey guys, what would be the best approach to extend the single producer, multiple consumers work queue (episode 123 - 125) to multiple producers? I just started learning mutithreading in C and I was wondering if my approach is valid or if there is a better solution or some issue/bug I am not aware of.

Note: This is my Linux implementation. The corresponding HH code is: Github code

Producer code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
internal void
push_string(sem_t *semaphore_handle, char *string)
{
    uint32 original_entry_count = entry_count;
    uint32 new_entry_count = original_entry_count + 1;

    // Here we increment the entry_count before creating the entry, to ensure only one thread creates this entry.
    // We need the ready flag, to signal the consumer that the entry is ready/valid.
    uint32 entry_index = __sync_val_compare_and_swap(&entry_count, original_entry_count, new_entry_count);
    if(entry_index == original_entry_count)
    {
        Work_Queue_Entry *entry = entries + entry_index;
        entry->string_to_print = string;
        entry->ready = true;

        complete_past_writes_before_future_writes;

        sem_post(semaphore_handle);
    }
    else
    {
        // TODO(dgl): try again.
        printf("Failed to enqueue");
    }
}


Consumer code:
Note: This code already contains the fix of episode 126.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
internal bool32
do_worker_work(sem_t *semaphore_handle, int logical_thread_index)
{
    bool32 we_should_sleep = false;

    uint32 original_next_entry_to_do = next_entry_to_do;
    uint32 new_next_entry_to_do = original_next_entry_to_do + 1;
    if(original_next_entry_to_do < entry_count)
    {
        uint32 entry_index = __sync_val_compare_and_swap(&next_entry_to_do, original_next_entry_to_do, new_next_entry_to_do);
        if(entry_index == original_next_entry_to_do)
        {

            Work_Queue_Entry *entry = entries + entry_index;

             // Wait until entry is valid.
            while(!entry->ready) {};

            assert(entry->string_to_print, "String to print is null");
            printf("Thread: %u, String: %s\n", logical_thread_index, entry->string_to_print);
						
            // Try to enqueue from thread
            push_string(semaphore_handle, entry->string_to_print);

            __sync_fetch_and_add(&entry_completion_count, 1);
						
        }
    }
    else
    {
        we_should_sleep = true;
    }

    return(we_should_sleep);
}


Simon Anciaux
1337 posts
Extend work queue to multiple producers - multiple consumers
When asking a question like that it would help if you provided a complete minimal example that we could compile and run.

If it's working, try to create a stress test (multiple thread that continuously push new tasks for example) to try to make sure no task is left out, no task is performed several times, that task input is correct, and that the result are correct.

This thread discuss a potential issue with Casey's circular queue. And Casey's reply on github.
Daniel Glinka
2 posts
Extend work queue to multiple producers - multiple consumers
Thank you for your reply and the links. I did a little stress test and it worked fine. However I wanted to ensure my thought process was correct, because this kind of low level multithreading is new to me.