Parallel processing of data stream

3 views (last 30 days)
Radek
Radek on 28 Mar 2024
Answered: Radek on 10 Apr 2024
I am trying to process big data in parallel using parallel toolbox. The idea is quite simple. First, I would have a function running in main process (client) that reads the data batch by batch and put them into a queue (let's call it input queue). Next, I would create bunch of workers that load the data from this queue, process it and send the results to different, output queue. Finally, I would have a final worker that reads the data from this output queue serialize it and most probably save it (or whatewer).
The problem with this approach I am having is following. Ideally I would have a single input queue. However in order for the workers to be able to pull the queue it has to be created within such worker. Ok, let's make separate queues for individual workers. However now, the length of such queues is invisible to the main thread which I need for two reasons. First, I would send the data to the most empty queue to equalize load on all workers. Second, I would like to monitor the length of the queues to not overfill them (like having a fix-length buffer).
Can you reccommend me a pattern to deal with such problems. I believe it is somehow typical problem that many people actualy solve. Thanks.

Accepted Answer

Radek
Radek on 10 Apr 2024
Just for the record...
The solution is (obviously) to use spmd block. To communicate between workers one can use functions spmdSend() and spmdReceive(). The problem is, that theese functions were introduced in R2022b, while my version is R2021b. However Matlab very succesfully hides that theese functions actually replace former ones labSend() and labRecieve() (and others), that are therefore available in my version.
So I would probably recommend Matlab to state in documentation (there is even Version History section) if some function was introduced as a replacement for some other. Or, to allow users to switch to documentation version relevant for ther Matlab version (and I know you can look in to your local documentation).

More Answers (1)

Strider
Strider on 28 Mar 2024
I do not see a need for multiple ques or having the client load everything in serial. I would let the workers do the loading and saving. If you have to send back to client to save, as in this case, then the below should work for you.
I have never done anything with concern over the que length. I suppose you could have some kind of check in the mySavefunc, or just control the size by limiting the number of pool workers if out of memory is an issue.
saveQue = parallel.pool.DataQueue;
files = dir('*.csv'); % assuming some pattern you know
fn = fullfile({files.folder},{files.name});
afterEach(@mySavefunc, saveQue);
parfor k = 1:numel(fn)
% load it in a worker
rawData = myLoadfun(fn{k});
% do the work
processedData = myProcessfun(rawData);
% send back to client
send(saveQue,processedData);
end
  1 Comment
Radek
Radek on 2 Apr 2024
Edited: Radek on 2 Apr 2024
The reasons why I need (want?) to load the data in client rather than in workers are: First, the 'read_functions' use a lot of file related function calls (fopen, fseek, fread, ...) which (to my limitted knowledge) are not supported why doing parallel processing (actually, I have couple of 'read_functions' as an ordinary functions, they seem to work, but others as class methods and they complain that my fid is not actually fid; obviously everything works just fine in single thread.) The second (and more important) reason is that the data are serialized in the stream. To let workers to read the data in random order would have serious performance implications.
To be able to monitor the queue length is to save memory, obviously. The processing is much slower than loading the data and data won't fit within memory either. Therefore monitoring the queue length seems like elegant solution. I am not sure how limiting number of workes on any check within a worker thread can stop clinet overfilling the queue.
Also I think it will be beneficial to see the queue length (if I have to have individual queue for each worker) to balance the load as some batches may take much longer to process than others.
Finally, sending processed data back to the clinet works just fine. Distributing data from client to workers is where I am facing some issues.

Sign in to comment.

Products


Release

R2021b

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!