Multiple producers/consumers queue

Hi, I have a problem with queuing jobs into fixed size circular queue when multiple threads can produce new work. I've read a lot about this problem on the web. One way to extend the code Casey wrote is to use two semaphores instead of one (one to wake up consumer threads when there's new job to do and other to wake up producer threads when there's free space in work queue) + mutex for accessing the queue. There are a lot of resources on this topic on the web, I'll put here the shortest video I was able to find:
https://www.youtube.com/watch?v=nxw2y27z0V4

But if I'm understanding this well, implicit in all of these examples is the fact that producers and consumers are separate threads. But in my example thread is both consumer and producer at the same time. To be more concrete, I'm trying to recursively explore a directory structure using multiple threads. I'm using win API FindFirstFileW / FindNextFileW. The code starts from the root directory, and for each directory inside, it puts a new job in the queue. So job == directory. Other threads kick in and they start to pull those jobs. But, if directory has nested directories, thread will want to put new job/s after it finished its job. When I try to run this code on lets say Windows directory, I quickly end up in situation where all threads want to put new job into the queue, but the queue is full so they go to sleep. I don't know if I'm missing something obvious, but I don't think this particular problem can be solved using general abstraction of fixed size circular work queue.

I have two ideas how I might solve this:
1) Use growing queue.
2) Instead of queuing the jobs, each thread dig multiple directories on its own. When it's finished it can go and help other unfinished threads.

Any opinion on this?

P.S. It would be awesome if Casey at some point implements multiple producers/consumers queue on the stream. If not for the game, then for educational purposes.

Thanks,
Vjekoslav

Edited by Vjekoslav on
vjekoslav
2) Instead of queuing the jobs, each thread dig multiple directories on its own. When it's finished it can go and help other unfinished threads.


Wouldn't you still need to queue jobs for other threads to be able to pick them up when they are finished ? Maybe I misunderstood.

I've never done a multiple producer queue, but I think having a growing queue that would never fail to add a job would just be better. It could be a linked list or any other structure that would allow you to reuse entries when a thread as finished processing them.

Edited by Simon Anciaux on
It just seems that you have too many jobs being added to the queue and not enough being removed. The queue ends up filling up eventually. It doesn't matter if producer / consumer threads are different threads or the same thread with "2 modes", as long as the code handles that case, i.e. if a thread can't push more work, maybe it's time to switch modes and start pulling work.

The main issue with your threading model is that for each directory you pull from the queue, you add n more directories. The more directories there are, the quicker the queue will fill. There needs to be a balance between the work being removed and the work being added. Or the queue needs to be a lot bigger.
mrmixer
Wouldn't you still need to queue jobs for other threads to be able to pick them up when they are finished ? Maybe I misunderstood.


The way I imagined it is, instead of having queue of jobs I would have a queue of sleeping threads. When thread does its own work it first checks if there are other threads available. If there are, it hands it over to that thread and continue. If there aren't, it just continues on its own, checking each time if there are threads that can help. Size of the queue would then be count of how many threads I have.
So lets say I have 4 threads. I start with one, it discovers 10 new directories. It assigns first three to the other threads and then it continues processing other 7 dirs. Whichever thread is done first it waits until it gets new job from other threads.

mrmixer
I've never done a multiple producer queue, but I think having a growing queue that would never fail to add a job would just be better. It could be a linked list or any other structure that would allow you to reuse entries when a thread as finished processing them.


I already have it working with growing queue. I allocate bigger chunk at start (1MB) and reallocate later if needed. I didn't want to complicate it with linked list. Job is a struct with 2 pointers (16 bytes on x64). My C drive has 170k dirs, so we're talking ~2.5Mb. A lot more space is used to store the actual data about files and directories. And its temporary memory anyway. When iteration is done, memory is freed.
The reason why I don't like this is, even if that is small size on modern pcs, it introduces another theoretical memory failure point which I need to handle. That's why fixed size is cleaner. It means if its able to start, it will finish to the end.


marcc
It just seems that you have too many jobs being added to the queue and not enough being removed. The queue ends up filling up eventually. It doesn't matter if producer / consumer threads are different threads or the same thread with "2 modes", as long as the code handles that case, i.e. if a thread can't push more work, maybe it's time to switch modes and start pulling work.

The main issue with your threading model is that for each directory you pull from the queue, you add n more directories. The more directories there are, the quicker the queue will fill. There needs to be a balance between the work being removed and the work being added. Or the queue needs to be a lot bigger.


Yeah I understand that. That's why I said that this particular problem can't be solved using general abstraction of fixed size circular work queue. Because thread can't only consume the job. It has to produce at the same time. Or it would have to store that data in some other temporary queue. I guess it can't work without growing queue. I just wanted to see if I'm missing something obvious.

Edited by Vjekoslav on