Note: This is my Linux implementation. The corresponding HH code is: Github code
Producer code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | internal void push_string(sem_t *semaphore_handle, char *string) { uint32 original_entry_count = entry_count; uint32 new_entry_count = original_entry_count + 1; // Here we increment the entry_count before creating the entry, to ensure only one thread creates this entry. // We need the ready flag, to signal the consumer that the entry is ready/valid. uint32 entry_index = __sync_val_compare_and_swap(&entry_count, original_entry_count, new_entry_count); if(entry_index == original_entry_count) { Work_Queue_Entry *entry = entries + entry_index; entry->string_to_print = string; entry->ready = true; complete_past_writes_before_future_writes; sem_post(semaphore_handle); } else { // TODO(dgl): try again. printf("Failed to enqueue"); } } |
Consumer code:
Note: This code already contains the fix of episode 126.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | internal bool32 do_worker_work(sem_t *semaphore_handle, int logical_thread_index) { bool32 we_should_sleep = false; uint32 original_next_entry_to_do = next_entry_to_do; uint32 new_next_entry_to_do = original_next_entry_to_do + 1; if(original_next_entry_to_do < entry_count) { uint32 entry_index = __sync_val_compare_and_swap(&next_entry_to_do, original_next_entry_to_do, new_next_entry_to_do); if(entry_index == original_next_entry_to_do) { Work_Queue_Entry *entry = entries + entry_index; // Wait until entry is valid. while(!entry->ready) {}; assert(entry->string_to_print, "String to print is null"); printf("Thread: %u, String: %s\n", logical_thread_index, entry->string_to_print); // Try to enqueue from thread push_string(semaphore_handle, entry->string_to_print); __sync_fetch_and_add(&entry_completion_count, 1); } } else { we_should_sleep = true; } return(we_should_sleep); } |