Move from parfor to parfeval?

I have a large simulation that uses a LSF cluster that supports Parallel Toolbox. Right now, the meat of the effort is in a parfor loop:
% Loop over cells of storm
celllist = find(~gotResult);
stormtmp = storm(celllist); % storm is array of handle-type classes
parfor i = 1:length(celllist)
icell = celllist(i);
fprintf('Cell %d...\n', icell);
matfileObj = pooldata.Value; %#ok<PFBNS>
ofac = zeros(sizevec,'single');
ofac = stormtmp(i).obsMatrix(grid,ofac,geom,rparms);
matfileObj.gotResult(1,icell) = true;
matfileObj.testOut(1,icell) = {ofac};
end
poolData is a parallel.Pool.Constant with a matfile object that is specific to each worker. At the end of the loop, I consolidate the results, clear the temporary files, and go to the next iteration of the model. This gives me robustness against cluster crashes which when the total job takes a month can be distressingly common (hence the checking for a gotResult at the start; I can have a partial result prior to a crash). The primary annoyance with parfor is that the allocation of units to workers happens rigidly at the start. With 60+ workers on 5-10 computers in a shared facility, there is no guarantee that they take a similar amount of time to finish. I find the last 25% or more of the execution time of each iteration is spent waiting for a dwindling number of my workers to finish their assignments. I've read the documentation for parfeval, and it seems to give me a way of more carefully managing each execution, but it's too convoluted to see how I get there. Any tips? It would seem that I would start with a find() to get the first N cells (N = # of workers) that need completing, and then enter a while loop using afterEach() where I could check for a valid result, and then get the next one on the list and assign it? Maybe I can do it with one main while loop and just remove entries from celllist() as they complete? Head is spinning...

1 Comment

So the problem is that lots of workers are sitting idle while waiting for the slow ones to finish their assignments in this parfor loop? Instead of waiting here, you'd like to have them start on their assignments for the next parfor loop for the next "iteration of the model".
If that's right, maybe a simpler approach is to change this parfor loop so that it runs through all the cell/model combinations, something like this:
parfor i = 1:length(celllist)*NofModelIterations
For each i you'd need a little logic to work out which model and which cell you wanted, plus invoke the right model, but that might not be hard.

Sign in to comment.

 Accepted Answer

To run using parfeval, you basically need to pull out the body of your parfor loop into a function, something like this:
celllist = find(~gotResult);
stormtmp = storm(celllist); % storm is array of handle-type classes
futures = [];
for i = 1:length(celllist)
% Schedule computation for each index in celllist. Each individual
% function evaluation is executed separately on the workers.
futures(i) = parfeval(@oneComputation, 0, i, stormtmp(i), poolData)
end
% You could simply wait for completion like this:
wait(futures)
function oneComputation(icell, stormEl, poolData)
matfileObj = pooldata.Value;
ofac = zeros(sizevec,'single');
ofac = stormEl.obsMatrix(grid,ofac,geom,rparms);
matfileObj.gotResult(1,icell) = true;
matfileObj.testOut(1,icell) = {ofac};
end
Note I simply added a call to wait(futures) after scheduling the work - as I understand it, the worker results are all stored in the mafileObj.
It might be worth taking this approach a step further. If each worker computation takes a "long" time, then you might be better off using batch jobs. This will share resources on your cluster better because you don't need to keep a parallel pool running, with possibly-idle workers. The API to batch is similar to the API to parfeval, and specifies a single function evaluation on a worker. One difference is that it doesn't support parallel.pool.Constant, so you'd need to build the matfileObj directly on the worker.

13 Comments

Thanks. Each computation on a worker takes about 15 minutes, so that's long relative to most overhead functions. I'm concerned that batching everything at once with parfeval won't solve the problem, since I would presume the assignment of futures to workers will occur at each futures' creation (this is probably how parfor works under the hood, right?). If it actually maintains a queue of futures and assigns them to workers as they come available that would solve my problem. Otherwise I'll have to code the queue management by hand. I could try batch; my quick reading is that it would submit a new job for each submission, which could interact reasonably with the LSF scheduler in that it would make all the jobs above my concurrent user limit pending and then schedule the new ones itself when each finishes.
parfeval does maintain a queue of work at the client, and sends it to a worker only when one becomes idle. parfor does something broadly similar, but it tries to be smart and divide the work up into chunks of a "reasonable" size - but it does this without any knowledge of how long the work will actually take, and in your case, it might not be making the best choice. (You can tweak this division using parforOptions if you wish, but I suspect other options might work better for you).
One other thing that occurred to me is that I think you're using the matfileObj simply to provide a degree of fault-tolerance - i.e. you're checkpointing the work on the worker, because if the pool goes down, unfortunately parfor loses everything that's happened so far. You might actually be able to simplify things by using batch submissions - each batch function evaluation automatically stores its results on disk, so there's no need to checkpoint yourself - just return the output data.
So, here's a sketch of how you might use batch in this way:
c = parcluster('lsf');
jobs = [];
for i = 1:length(celllist)
% Schedule computation for each index in celllist. Each individual
% function evaluation is executed separately on the workers.
jobs(i) = c.batch(@oneComputation, 1, {stormtmp(i)});
end
% You could simply wait for completion like this:
wait(jobs)
% If any of the jobs fail, you could simply re-submit. The output
% arguments are in jobs(idx).OutputArguments, and remain available
% until you explicitly call delete(jobs).
function out = oneComputation(icell, stormEl)
ofac = zeros(sizevec,'single');
out = stormEl.obsMatrix(grid,ofac,geom,rparms);
end
There is a slightly more efficient way of doing the above using createTask - this has a vectorised API, but it's quite fiddly to get right.
(Actually, with parfeval, you could also omit having the workers save stuff to the matfileObj by collecting the results at the client using fetchNext...)
Thanks; I could have each batch function write its portion of the answer to the master matfile, but would I hit contention issues (the storage directory is visible to both client and workers)? If contention is an issue, then I'd do like you show and have the batch function return the array (it's not super-huge, typically 400x100x3), and have the client fetch all the good outputs and write out the result into the single matfile. My concern is if the jobs won't clear from LSF until I delete them, the cluster won't assign the next job after I hit my simultaneous job limit until I delete the ones that finish. So instead of a basic wait(), I would need a timer-driven loop that checks for finished jobs, gets their results, saves the answer, and then deletes them.
When using batch with LSF, the LSF cluster should definitely start processing the next job in its queue as soon as one finishes. Deleting the job object from MATLAB just removes some files from disk (in your JobStorageLocation folder). With the batch workflow, it's designed so that you can create all the jobs, and then turn off your client MATLAB completely, and come back to it later (you can get back to the jobs that you created previously using the Jobs property of the cluster you get back from parcluster).
So some behavior I've been struggling with:
  • One of my arrays (grid) which is shared (read only) by all the worker clients is fairly big (~160 MB). I noticed initially that data was written to the subfolder for each Job. So, I tried writing it to a temporary MAT file (see below). That doesn't seem to help; the Jobxx.in.mat file is still about that size. When I passed it as a parameter that file was small but the Jobxx/Task1.in.mat file was large. This writing of data makes each job submission take a minute or more. I added some chunking to help.
  • After about 10 or so submissions from the loop I was getting a lot of immediate failures; unfortunately I had no logs left as to why. I hoped removing the big file writing would help but as per above it just changed which files were what size. Any tips for large "broadcast" variables?
NCells = size(ocells,5);
% How many cells to assign each job
chunksize = 30;
%% Store the grid, geom, and rparms into a common file
% so all the jobs can easily access this common data
save('dwellCommon.mat','grid','geom','rparms');
% Loop over cells of storm
celllist = find(~gotResult);
njobs = ceil(length(celllist)/chunksize);
for i = 1:njobs
% Create a batch job for each cell group
istart = (i-1)*chunksize + 1;
istop = min(i*chunksize,length(celllist));
cnums = celllist(istart:istop);
stormi = storm(cnums);
jobs(i) = batch(clus,@oneCell, 1, {stormi});
jobs(i).UserData = cnums;
end
fprintf('created %d batch jobs\n', length(jobs));
%% Monitor jobs and process the ones that finish
ndone = sum(gotResult);
hw = waitbar(ndone/NCells);
pause(60);
while ~isempty(jobs)
dlist = [];
for i = 1:length(jobs)
cnums = jobs(i).UserData;
switch jobs(i).State
case 'finished'
try
X = fetchOutputs(jobs(i));
ocells(:,:,:,1,cnums) = X{1};
gotResult(1,cnums) = true;
catch
fprintf('Job %d no valid result\n', jobs(i).ID);
end
dlist = [dlist i]; %#ok<*AGROW>
case 'failed'
fprintf('Jobs %d failed!\n', jobs(i).ID);
dlist = [dlist i];
otherwise
% Do nothing
end
end
% Delete the completed jobs and clear from the open list
delete(jobs(dlist));
jobs(dlist) = [];
ndone = sum(gotResult);
waitbar(ndone/NCells,hw);
pause(60);
end
close(hw)
end
function ofac = oneCell(stormi)
% Batch processing call for one cell
nCells = length(stormi);
% load the common data
load('dwellCommon.mat','grid','geom','rparms');
sizevec = [rparms.NDop rparms.NRng geom.NRx nCells];
ofac = zeros(sizevec,'single');
for icell = 1:nCells
ofac(:,:,:,icell) = stormi(icell).obsMatrix(grid,ofac(:,:,:,icell),geom,rparms);
end
end
Thinking about it this morning, perhaps it's because I used a nested function? Understand that variables in the parent function can be made visible to nested ones; therefore the grid array would still end up in the scope of the batch function. Trying putting it into its own file.
Well that didn't help. I even tried clearing those variables after writing them out and that didn't do anything either.
I'm not sure why you'd get jobs failing immediately with no logs. It might be worth contacting MathWorks support for help with that.
One way to "broadcast" the data is to use createJob and createTask (instead of batch), and then set the large data to be the JobData of a single job object. The idea is to create multiple tasks of a single job, and then have each task call job = getCurrentJob(); data = job.JobData; to access the "broadcast" data.
Thanks. I did find that the Task*.log files had what was happening on the Tasks that failed (failing Jobs tended to disappear too fast at least with batch). With createJob/Task I of course am responsible for cleanup. I did move to createJob and createTask and that's working much better. The crashes were either from a case where the job went to a RHEL6 (instead of RHEL7) machine (we have a mixture to support some older versions of tools for programs with legacy hardware). I just had to narrow my resource requirements. The other issue was that occasionally clients are failing in their initial call to the license manager for MATLAB_Distrib_Comp_Engine (do they temporarily take a license, or are they checking for the product availability? Once they are running I just see the one license checked out). Those failures are probably why I often had trouble starting large parallel pools, which is partly why I'm trying to get away from parfor/parfeval.
Each worker process on your cluster needs to check out a single MATLAB Parallel Server licence (known to the licence manager as MATLAB_Distrib_Comp_Engine ) for the duration of the task. This applies to all cases - parpool, batch, and createTask. I'm not an expert, but it I believe it is possible to teach LSF about the number of licences that are available to avoid workers trying to start up when all licences are in use.
OK, I'll check on this. It appears they check out the licenses briefly on task initiation, but then just the primary host holds on to one. So, if we have 64 licenses, I can start a Job with up to 64 Tasks but then some of the others will fail in the checkout. If a task is held pending for a while, then when it starts it's OK since 63 of the licenses get returned. Very strange.
OK, my admins informed me how the lmstat statistics don't reflect Parallel Engine license states right and pointed me to the right tool.
Hi Edric, thanks for the thoughtful and useful answers, but I think I uncovered a bug in how MATLAB invokes the LSF bsub command for the resulting job array. All independent jobs are resources with a numWorkers of 1, as shown below in submitIndSharedJob.m:
resourceArg = ThirdPartySchedulerUtils.generateResourceArgument( lsf.ResourceTemplate, 1, lsf.NumThreads );
However, a job array with a number of tasks is really a parallel job, and you should use the number of workers values (min, max) that are in the cluster definition. Right now, with bsub -n 1, the LSF scheduler either ignores or throws errors on the spanning instructions I try giving it and so I end up overloading one host and not spreading things out enough for good efficiency. I'm trying to also work the IBM LSF side to see if it's an issue on their end with job arrays, but I think the -n 1 isn't helping.
Your reaction/counter-arguments of course greatly appreciated. If you want to take this to email, please use the address in my profile. I know I also don't want to disable job arrays or make it numerous separate jobs; each job generates a 180 MB JobXX.in.mat file.

Sign in to comment.

More Answers (0)

Categories

Products

Release

R2021a

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!