How do I distribute N 3-dimensional (large) arrays for processing across N workers?

1 view (last 30 days)
I would like to accelerate processing of a large set of radar event data using Parallel Computing. I have a server with 48-cores and 512GB of RAM so all of the data I need to process will fit into the local computer's memory with enough cores to process each independent set of events. The data I want each core to process consists of 8 channels of IQ data which is a matrix of S samples x P pulses -- i.e., each I would like to distribute an 8 x S x P matrix to each worker.
Currently the data is loaded from Nx8 files into an Nx8xSxP matrix which I would like to distribute to N workers. The file reading is actually quite slow since it is done by a single processor so perhaps the first question is whether or I could have each worker load their own Nx8 set of files.
Otherwise, how do I distribute each 8xSxP matrix to my workers?
  4 Comments
Walter Roberson
Walter Roberson on 11 Jun 2020
You want to distribute 8 x S x P to each worker when you currently have N x 8 x S x P. Assuming that you are asking to
parfor n =1 : N
then if you were to address N8SP(n,:,:,:) inside the worker, then parfor is able to transfer only the appropriate portion of data to the worker instead of sending the whole array. However, think about the organization of data in memory: in memory, the first dimension is the one that varies most quickly in memory, so the memory order would be like 1111 2111 3111 1211 2211 3211 1311 2311 3311 and so on. Therefore to transfer N8SP(n,:,:,:) to the worker, parfor would have to transfer every N'th item in memory to the worker. That does not permit much in the way of hardware acceleration. The hardware cache line size on Intel i7 CPUs is 64 bytes, so if the 4 bytes at offset 0 are being sent to worker 1 and the 4 bytes at offset 4 are being sent to worker 2 and the 4 bytes at offset 8 are being sent to worker 3, then you have a whole bunch of cache line thrashing. This is the slowest possible memory arrangement for efficiency.
For the highest possible arrangement for efficiency, you would prefer that all of the memory being sent to a worker is already consecutive in memory. And the arrrangement that optimizes that is if the index you are slicing over is the last index in the array, ESPN(:,:,:,n).
I believe it tries to pass a copy of the matrix (or at least the worker's share of the data) to each worker
Yes, exactly. And for regular arrays, it needs to do that no matter what order the array is in memory in the client; having N as the last dimension reduces the work involved.
which takes a very long time and requires far greater time than simply processing the data in a serial for loop
Yes, that is a common problem for parfor. parfor workers need access to the data they are working on, and since they are separate processes, the memory has to be copied over to them.
There are configurations for using SPMD (not parfor) that in theory can use shared memory segments for labSend() instead of having to serialize and copy that; you don't hear much about those, but historically there were a couple of postings.
In the versions before Mathworks redesigned how complex arrays are handled, it was possible to use a File Exchange contribution to create shared memory segments that could be shared with the workers; it happened that the change of headers required for the redesigned complex arrays is not compatible with the FEX contribution; I don't know if anyone has worked out a substitute.
The overhead of sending data to a parfor worker (or a GPU) is a real one, and it is common for parfor to be slower than serial because of it.
This can lead to needing to redesign the program so that the appropriate data is created on the worker instead of transferred there. For exampe if the data is layers being read in from different files, then have the worker load its appropriate file. This has hidden performance restrictions, that are reduced if each of the files is stored on a different hardware controller (not just different hard drive); having the files stored on different hard drives on the same controller is better than having them all on the same hard drive but is still bottle-necked by controller bandwidth. Having the files all on the same hard-drive risks disk contention... but in some cases can still improve in performance. In particular if the files are fragmented then sometimes you can get a bit more performance by having two requests queued at the same time with the controller, as the controller can read data off of one track as it happens to spin underneath instead of wasting that time waiting for the drive to spin to collect parts of the first file. (But having fragments is still slower than if you have fully de-fragmented.)
Multiple workers each wanting to read a different file: "throw hardware at it" if practical.
If not practical, if everything has to be stored on the same drive, then typically drive performance is optimized with two processes asking for files at the same time (so that when the data for the first process is finished transferring, there is already a request in the controller that it can move on to handling immediately without having to wait for a round trip to MATLAB to decide what the next request is.) With more than two processes asking for file access at the same time on the same drive on the same controller, then is is common that performance starts to degrade due to contention for resources.
Chris Steenhoek
Chris Steenhoek on 12 Jun 2020
Edited: Chris Steenhoek on 12 Jun 2020
Thanks Walter. Great explanation on the memory considerations for the data organization. I certainly understand the need for the data to get created on the worker. This is the reason Edric's recommended approach "works" -- with the caveat of the disk controller limitations both he and you have pointed out (great info there too, btw).
I was hoping there was some way to allocate "shared" memory such that the workers would just get a pointer to their slice of the memory (rather than the data itself needing to be transfered to the workers before the parfor executes). The distributed/codistributed functions seem to almost be what I want but not quite.
I'll do some experiments on our server to see what my limitations are for having the workers load the data files within the parfor loop.

Sign in to comment.

Accepted Answer

Edric Ellis
Edric Ellis on 9 Jun 2020
The best approach probably depends on the operations you need to perform on this Nx8xSxP array. Are the operations that you wish to perform such that you can consider "slices" of the array independently? I.e. can each 8xSxP slice of the array be operated on independently? If so, you could consider an approach like this:
parfor i = 1:N
myData = zeros(8,S,P)
for f = 1:8
% Here, readData reads one file returning a matrix
% of size SxP
myData(f, :, :) = readData(i, f);
end
% Here, "compute" operates on 8xSxP array, giving some result
result(i) = compute(myData);
end
Even with this approach, be aware that the file reading might be slow because of the limitations of the disk hardware you're reading from. It this is a spinning disk, it might actually be counter-productive to try and have multiple workers attempting to read different files simultaneously.
If the operations you need to perform are not as easily "sliced" as in the example above, then it might be better to consider using "distributed arrays".
  3 Comments
Edric Ellis
Edric Ellis on 12 Jun 2020
Assuming a have a very large matrix which is NxCxSxP, is there a way to allocate it and load it such that each of the N workers gets a CxSxP slice of my overall matrix?
If you read the data on the client, then the parfor machinery knows how to copy only the necessary portions of NCSP needed by each worker. But it is a copy, and there's some additional transitory memory overhead there. As Walter so rightly points out, it's more efficient if you "slice" the matrix in the final dimension so that when parfor needs to copy out a bunch of slices to send to the workers, it's copying a contiguous block of memory.
If your workload is well-balanced (i.e. you can expect each slice operation to take the same amount of time, give or take), then you could try an approach using spmd which gives you more control. The idea here is similar to my original parfor suggestion, but spmd lets you co-ordinate things such that only a single worker is accessing the disk at any time. Here's a rough sketch:
N=4; C=2; S=1200; P=600;
spmd
% First, divide up the N pieces across the workers
partition = codistributor1d.defaultPartition(N);
% partition is now a vector of length numlabs specifying
% the number of values of slices each worker takes on.
% We need an offset for each worker - compute this using
% cumsum. The result is a vector telling us how many elements
% the preceding workers own.
nOffsetByLab = [0, cumsum(partition)];
% Next, we can force the workers to operate one at a time
% using labBarrier like so:
for activeLab = 1:numlabs
if labindex == activeLab
% My turn to load data
% The partition tells me how many values to load on this worker
myNumSlices = partition(labindex);
% Allocate my local piece of NCSP
myNCSP = complex(zeros(myNumSlices, C, S, P));
% The offset here tells me what the "global" index is in the
% first dimension
myNOffset = nOffsetByLab(labindex);
% Loop over my slices and "load" the data.
for nIdx = 1:myNumSlices
globalN = nIdx + myNOffset;
myNCSP(nIdx, :, :, :) = globalN .* ones(C,S,P);
end
end
% Force all workers to wait here
labBarrier
end
% At this point, each worker has myNumSlices x C x S x P array
% myNCSP, and can perform computations.
myResult = zeros(myNumSlices, 1);
for nIdx = 1:myNumSlices
myResult(nIdx) = sum(myNCSP(nIdx, :));
end
end
% At the end of the spmd block, myResult is a Composite. We
% can simply concatenate the portions of that to get the overall
% result
overallResult = vertcat(myResult{:});
This is quite a bit more complex than the simple parfor approach, but it ensures no large data transfer, and also that only one worker at a time is "loading" data...
Chris Steenhoek
Chris Steenhoek on 12 Jun 2020
Thanks Edric. I had a bit of time on our server today and did a quick test where I reordered my matrix from NCSP to CSPN and did the file reads inside the parfor. I started slow with N=9 and it was loading 9x 280MB files in parallel with seemingly zero issues. There's a file for each channel for each event so with N=9 and C=8, that's 72 of these files. My load time reduced from 467 seconds to 45 seconds which is somehow actually greater than a factor of 9 reduction. I didn't get a chance to push this to see how far it will scale (my ultimate goal is N=45) but it is certainly promising.
With the SPMD approach you've provided as a backup plan I feel very good about the path I'm on. I greatly appreciate the explanations and well commented examples that both you and Walter have provided. I'll try to post an update once I get this all worked out. In my experiment wiht the first 9 events, I found that the code I'm working with isn't exactly memory effiencent so I need to add some memory management to keep within my 512GB. That will probably keep me busy for a few days.
Many thanks.

Sign in to comment.

More Answers (0)

Categories

Find more on Loops and Conditional Statements in Help Center and File Exchange

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!