Is it possible to use parfeval and backgroundPool to write to a Mongo database in a background thread?
7 views (last 30 days)
Show older comments
I am processing a large table of data, for example 100000x100. After processing this data, I want to write each row of the table into a mongodb database using the mongoc driver. It takes over 150s, so I tried various ways to optimize the code. What I am doing now is below:
c = parallel.pool.Constant(@() mongoc('serverIP', 1000, 'dbName'));
batchSize = 10000;
numBatches = ceil(height(dataTable)/batchSize);
parfor i=1:numBatches
start = (i -1)* batchSize + 1;
stop = min(i*batchSize, height(dataTable));
%Get table batch
insertData = table2struct(dataTable(start:stop, :));
insert(c.Value, 'collection', insertData);
end
The above code works and I am able to write to the mongo database in about 20 seconds, but this is still too slow. So I want to know if there is a way I can do this write in the background by using parfeval. I have tried the following but get errors with parallel.pool.Constant or I get errors saying MongoConnection is not a supported thread based worker. I am confused by this since I am able to use 'parfor' with a MongoConnection, so why can't I use parfeval?
c = parallel.pool.Constant(@() mongoc('serverIP', 1000, 'dbName')););
% f = parfeval(backgroundPool, @(data, const) TableInsert(data, const.Value), 1, dataTable, c);
% f = parfeval(backgroundPool, TableInsert, 1, dataTable, c);
f = parfeval(backgroundPool, @TableInsert, 1, dataTable, c.Value);
o = fetchOutputs(f);
fprintf(o);
function complete = TableInsert(dataTable, C)
tic;
batchSize = 10000;
numBatches = ceil(height(dataTable)/batchSize);
% mconn = C.Value;
for i=1:numBatches
start = (i -1)* batchSize + 1;
stop = min(i*batchSize, height(dataTable));
%Get table batch
insertData = table2struct(dataTable(start:stop, :));
insert(C, 'collection', insertData);
end
toc;
complete = sprintf('Table insert was completed in: %f\n', toc);
end
In other attempts to speed up the large data write, I have used pymongo instead of using mongoc. This is definitely faster, but I still am having trouble passing off the write to a background worker. Below is what I have tried using pymongo:
% Create const connection to mongoclient using pymongo
C = parallel.pool.Constant(@() py.pymongo.MongoClient('xx.xx.xxx.xx:xxxxx')); % ip, port, database name
f = parfeval(backgroundPool, @(data, c) pythonTableInsert(data, c.Value), 1, dataTable, C);
% Get output
o = fetchOutputs(f);
fprintf(o);
function complete = pythonTableInsert(dataTable, C)
tic;
% Batch insert
batchSize = 10000;
numBatches = ceil(height(dataTable)/batchSize);
parfor i=1:numBatches
start = (i -1)* batchSize + 1;
stop = min(i*batchSize, height(dataTable));
%Get table batch
insertData = table2struct(dataTable(start:stop, :));
% Convert to 1xN cell array where each cell is a 1x1 struct
insertList = num2cell(insertData)';
% Insert
collection = C.get_database('database').get_collection('collection');
collection.insert_many(insertList);
end
toc;
complete = sprintf('Python insert was completed in: %f\n', toc);
end
0 Comments
Answers (1)
Edric Ellis
on 21 Nov 2023
In your first case using parfor, if you haven't changed any settings, you will be using a process pool. (The default profile is "Processes", and with default settings, a pool will be created there when you first hit parfor).
If you want to run stuff in the background, you need to run a bunch of parfeval requests, a bit like this:
c = parallel.pool.Constant(@() mongoc('serverIP', 1000, 'dbName'));
batchSize = 10000;
numBatches = ceil(height(dataTable)/batchSize);
for i=1:numBatches
start = (i -1)* batchSize + 1;
stop = min(i*batchSize, height(dataTable));
fut(i) = parfeval(@iRunInsert, 0, c, dataTable(start:stop,:));
end
function iRunInsert(c, data)
insert(c.Value, 'collection', data);
end
4 Comments
Edric Ellis
on 22 Nov 2023
Hm, that seems like a long time simply to issue the parfeval calls - I would normally expect each call to parfeval to run in a few milliseconds or so (in the absence of large amounts of data being transmitted). What's the value of numBatches? It might be worth running using the profile command in effect to see what's going on. It's possible that splitting up dataTable is taking a long time.
See Also
Categories
Find more on Parallel Computing Fundamentals in Help Center and File Exchange
Community Treasure Hunt
Find the treasures in MATLAB Central and discover how the community can help you!
Start Hunting!