Hey guys, what would be the best approach to extend the single producer, multiple consumers work queue (episode 123 - 125) to multiple producers? I just started learning mutithreading in C and I was wondering if my approach is valid or if there is a better solution or some issue/bug I am not aware of.
Note: This is my Linux implementation. The corresponding HH code is:
Github code
Producer code.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | internal void
push_string(sem_t *semaphore_handle, char *string)
{
uint32 original_entry_count = entry_count;
uint32 new_entry_count = original_entry_count + 1;
// Here we increment the entry_count before creating the entry, to ensure only one thread creates this entry.
// We need the ready flag, to signal the consumer that the entry is ready/valid.
uint32 entry_index = __sync_val_compare_and_swap(&entry_count, original_entry_count, new_entry_count);
if(entry_index == original_entry_count)
{
Work_Queue_Entry *entry = entries + entry_index;
entry->string_to_print = string;
entry->ready = true;
complete_past_writes_before_future_writes;
sem_post(semaphore_handle);
}
else
{
// TODO(dgl): try again.
printf("Failed to enqueue");
}
}
|
Consumer code:
Note: This code already contains the fix of
episode 126.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 | internal bool32
do_worker_work(sem_t *semaphore_handle, int logical_thread_index)
{
bool32 we_should_sleep = false;
uint32 original_next_entry_to_do = next_entry_to_do;
uint32 new_next_entry_to_do = original_next_entry_to_do + 1;
if(original_next_entry_to_do < entry_count)
{
uint32 entry_index = __sync_val_compare_and_swap(&next_entry_to_do, original_next_entry_to_do, new_next_entry_to_do);
if(entry_index == original_next_entry_to_do)
{
Work_Queue_Entry *entry = entries + entry_index;
// Wait until entry is valid.
while(!entry->ready) {};
assert(entry->string_to_print, "String to print is null");
printf("Thread: %u, String: %s\n", logical_thread_index, entry->string_to_print);
// Try to enqueue from thread
push_string(semaphore_handle, entry->string_to_print);
__sync_fetch_and_add(&entry_completion_count, 1);
}
}
else
{
we_should_sleep = true;
}
return(we_should_sleep);
}
|