how to receive ROS Topics in parallel workers?
4 views (last 30 days)
Show older comments
Hello,
I try to run my ROS-Nodes and callback subscribed to topics in a parallel worker. Unfortunately my Callback is never launched.
Without the parallel computing the one and the same lines run fine in Matlab.
Second I try to follow Matlab activities by adding debugging lines into the code. Does anybody know how to debug the parallel workers and callbacks in a better way?
function app=myInit(app) % 1st we init parallel computing and ROS
% Parallel Computing Toolbox
p = gcp('nocreate'); % If no pool, do not create new one.
if isempty(p)
disp('Starting Parallel Computing Toolbox')
app.pool.obj = parpool;
else
disp('Parallel Computing Toolbox already running')
app.pool.obj = gcp('nocreate');
end
app.pool.n = 0; % count number of parallel tasks
% check if ROS is already running, if not, start ROS
[status,cmdout] = system('ps -ef|grep ros');
if status == 0
if contains(cmdout,'/opt/ros/melodic/bin/roscore')
disp('ROS is already running')
else
disp('Starting ROS...')
[status,cmdout] = system('roscore &');
pause(2);
end
else
warning(cmdout);
error(['checking ROS "ps -ef|grep ros" status: ' num2str(status)]);
end
%initialize ROS by creating ROS master in MATLAB and starting a global node connected to master.
app.iP.localIpAddr = '192.168.213.1';
rosinit(app.iP.localIpAddr, 'NodeName', '/MatlabInit')
app.iP.rosNode = ros.Node('/Matlab');
disp(' ')
app.iP.rtl = rostopic("list");
app.iP.d = rosdevice(app.iP.localIpAddr,'user', 'pw');
app.pool.q = parallel.pool.DataQueue();
if isfield(app.pool, 'future') == 0
app.pool.future = afterEach(app.pool.q, @disp);
end
end
function startTopic(app) % here, we start and stop our ROS Topics
if app.Button.Value
startMyRosTopics();
pause(5)
topicType = rostopic("type", 'myTopicType'); % this is fine
topicEcho = rostopic("echo", '/myTopic'); % this is also fine
app.pool.n = app.pool.n +1;
disp('calling parMonitor1')
app.pool.F(app.pool.n) = parfeval(@parMonitor1,1,app.pool.q, 'start');
disp('parMonitor1 called')
app.lP.parPoolFcnIdx1 = app.pool.n;
else
app.pool.F(1, app.lP.parPoolFcnIdx1).Error.message
disp('Error stack:')
if sum(size(app.pool.F(1, app.lP.parPoolFcnIdx1).Error)) == 0
disp('no Error @ parMonitor1')
else
for ii = 1:size(app.pool.F(1, app.lP.parPoolFcnIdx1).Error.stack,1)
disp(['File: ' app.pool.F(1, app.lP.parPoolFcnIdx1).Error.stack(ii).file ...
' Fcn Name: ' app.pool.F(1, app.lP.parPoolFcnIdx1).Error.stack(ii).name ...
' Line Number: ' num2str(app.pool.F(1, app.lP.parPoolFcnIdx1).Error.stack(ii).line)])
end
end
app.pool.F(1, app.lP.parPoolFcnIdx1).Diary
disp(' ')
disp('Fetching parFcn output...')
app.lP.out = fetchOutputs(app.pool.F(app.lP.parPoolFcnIdx1));
disp(app.lP.out);
% app.lP.out = rmfield(app.lP.out,'PCSub1');
% app.lP.out = rmfield(app.lP.out,'MonNode');
app.pool.F(app.lP.parPoolFcnIdx1) = parfeval(@parMonitor1,1,app.pool.q, 'stop');
app.lP.out = fetchOutputs(app.pool.F(app.lP.parPoolFcnIdx1));
disp(app.lP.out);
end
end
function out = parMonitor1(q, task)
% This function already runs in the parallel worker.
% It subscribes to the ROS-Topic and registeres the callback
persistent n
persistent nodeAndSub
if isempty(n)
n = 0;
end
n = n+1;
send(q, sprintf('parMonitor1 launched %d. times. Task: %s',n, task));
switch task
case 'start'
try
rosinit('192.168.213.1')
catch e
disp(e)
end
if contains(e.message, 'The global ROS node is already initialized and connected to the master')
send(q, sprintf('ROS was already runnning @ parMonitor1'))
else
send(q, sprintf('starting ROS'))
pause(10)
end
send(q, sprintf('parMonitor1 rosinit done'));
nodeAndSub.MonNode = ros.Node('MonNode','http://192.168.213.1:11311');
send(q, sprintf('ros.Node created'))
disp(nodeAndSub.MonNode)
pause(5);
try
send(q, sprintf('Subscribing to /myTopic in parMonitor1()'));
nodeAndSub.PCSub1 = ros.Subscriber(nodeAndSub.MonNode,'/myTopic',@Callback1);
send(q, sprintf('/myTopic subscribed'));
catch e
send(q, sprintf('Error at ros.Subscriber in parMonitor1()'));
send(q, sprintf('The identifier was:\n%s',e.identifier));
send(q, sprintf('There was an error! The message was:\n%s',e.message));
end
send(q, sprintf('ros.Subscriber called'))
case 'stop'
nodeAndSub.PCSub1 = [];
nodeAndSub.MonNode = [];
otherwise
send(q, sprintf('Error: parMonitor1 do not know what to do here'))
end
out = task;
send(q, sprintf('parMonitor1 done'))
end
function Callback1(~, message)
% This Callback should be executed periodically in a parallel worker.
persistent nn
here = 'hier';
save('CallbackWasHere.mat','here')
if isempty(nn)
nn = 0;
end
nn = nn+1;
debug = ['Callback1 was running n:' mnum2str(nn) ' times'];
filename = ['Callback1_' num2str(nn) '.mat'];
save(filename, 'debug','message')
end
I'd assume, the Callback1 function should run.
However, the files "CallbackWasHere.mat" and "Callback1_*.mat" were never created.
Does anybody know why?
Why is Callback1 never executed?
This is my workspace output:
Parallel Computing Toolbox already running
ROS is already running
The value of the ROS_IP environment variable, 192.168.213.1, will be used to set the advertised address for the ROS node.
Initializing global node /MatlabInit with NodeURI http://192.168.213.1:40537/
The value of the ROS_MASTER_URI environment variable, http://localhost:11311, will be used to connect to the ROS master.
The value of the ROS_IP environment variable, 192.168.213.1, will be used to set the advertised address for the ROS node.
calling parMonitor1
parMonitor1 called
parMonitor1 launched 3. times. Task: start
ROS was already runnning @ parMonitor1
parMonitor1 rosinit done
The value of the ROS_IP environment variable, 192.168.213.1, will be used to set the advertised address for the ROS node.
ros.Node created
Subscribing to /myTopic in parMonitor1()
/myTopic subscribed
ros.Subscriber called
parMonitor1 done
Error stack:
no Error @ parMonitor1
ans =
' ROSException with properties:
identifier: 'ros:mlros:node:GlobalNodeRunningError'
message: 'The global ROS node is already initialized and connected to the master at URI "http://192.168.213.1:11311". If you want to restart the global node, call "rosshutdown" first.'
cause: {}
stack: [2x1 struct]
Correction: []
The value of the ROS_IP environment variable, 192.168.213.1, will be used to set the advertised address for the ROS node.
Node with properties:
Name: '/MonNode'
MasterURI: 'http://192.168.213.1:11311'
NodeURI: 'http://192.168.213.1:33741/'
CurrentTime: [1x1 Time]
'
Fetching parFcn output...
start
parMonitor1 launched 4. times. Task: stop
parMonitor1 done
stop
Can anybody help, please?
How to debug inside parallel workers and callback functions?
How to check if they must be called and executed or not?
Is there any registry in Matlab where to check if the Callback is subscribed correctly?
Checking "rostopic bw /myTopic" in the linux shell, we see that the topic is present with an update rate of 1 Hz.
Thank you
6 Comments
Jagadeesh Konakalla
on 15 Jun 2021
Hi Andreas,
Good to know that your problem is resolved. But you mentioned that everything works without parallel toolbox.
Thanks,
Jagadeesh K.
See Also
Community Treasure Hunt
Find the treasures in MATLAB Central and discover how the community can help you!
Start Hunting!