Documentation

This is machine translation

Translated by Microsoft
Mouseover text to see original. Click the button below to return to the English version of the page.

Note: This page has been translated by MathWorks. Click here to see
To view all translated materials including this page, select Country from the country navigator on the bottom of this page.

Example on Deploying Tall Arrays to a Spark Enabled Hadoop Cluster

Supported Platform: Linux® only.

This example shows how to deploy a MATLAB® application containing tall arrays to a Spark™ enabled Hadoop® cluster.

Goal: Compute the mean arrival delay and the biggest arrival delays of airlines from the given dataset.

Dataset:airlinesmall.csv
Description:

Airline departure and arrival information from 1987-2008.

Location:/usr/local/MATLAB/R2019a/toolbox/matlab/demos

Note

You can follow the same instructions to deploy tall array Spark applications to Cloudera® CDH. To see an example on MATLAB Answers™, click here.

To use Cloudera CDH encryption zones, add the JAR file commons-codec-1.9.jar to the static classpath of MATLAB Runtime. Location of the file: $HADOOP_PREFIX/lib/commons-codec-1.9.jar, where $HADOOP_PREFIX is the location where Hadoop is installed.

Note

If you are using Spark version 1.6 or higher, you will need to increase the Java® heap size in MATLAB to at least 512MB. For information on how to increase the Java heap size in MATLAB, see Java Heap Memory Preferences (MATLAB).

Prerequisites

  1. Start this example by creating a new work folder that is visible to the MATLAB search path.

  2. Install the MATLAB Runtime in a folder that is accessible by every worker node in the Hadoop cluster. This example uses /usr/local/MATLAB/MATLAB_Runtime/v## as the location of the MATLAB Runtime folder.

    If you don’t have the MATLAB Runtime, you can download it from the website at: https://www.mathworks.com/products/compiler/mcr.

    Note

    Replace all references to the MATLAB Runtime version v## in this example with the MATLAB Runtime version number corresponding to your MATLAB release. For example, MATLAB R2017b has MATLAB Runtime version number v92. For information about MATLAB Runtime version numbers corresponding MATLAB releases, see this list.

  3. Copy the file airlinesmall.csv into Hadoop Distributed File System (HDFS™) folder /user/<username>/datasets. Here <username> refers to your user name in HDFS.

    $ ./hadoop fs -copyFromLocal airlinesmall.csv hdfs://host:54310/user/<username>/datasets

Procedure

  1. Set up the environment variable, HADOOP_PREFIX to point at your Hadoop install folder. These properties are necessary for submitting jobs to your Hadoop cluster.

    setenv('HADOOP_PREFIX','/usr/lib/hadoop')

    The HADOOP_PREFIX environment variable must be set when using the MATLAB datastore function to point to data on HDFS. Setting this environment variable has nothing to do with Spark. See Relationship Between Spark and Hadoop for more information.

    If you plan on using a dataset that’s on your local machine as opposed to one on HDFS, then you can skip this step.

    Note

    This example uses /usr/lib/hadoop as directory where Hadoop is installed. Your Hadoop installation directory maybe different.

  2. Specify Spark properties.

    Use a containers.Map object to specify Spark properties.

    sparkProperties = containers.Map( ...
     {'spark.executor.cores', ...
     'spark.executor.memory', ...
     'spark.yarn.executor.memoryOverhead', ...
     'spark.dynamicAllocation.enabled', ...
     'spark.shuffle.service.enabled', ...
     'spark.eventLog.enabled', ...
     'spark.eventLog.dir'}, ...
     {'1', ...
      '2g', ...
      '1024', ...
      'true', ...
      'true', ...
      'true', ...
      'hdfs://host:54310/user/<username>/log'});

    For more information on Spark properties, expand the prop value of the 'SparkProperties' name-value pair in the Input Arguments section of the SparkConf class. The SparkConf class is part of the MATLAB API for Spark, which provides an alternate way to deploy MATLAB applications to Spark. For more information, see Deploy Applications Using the MATLAB API for Spark.

  3. Configure your MATLAB application containing tall arrays with Spark parameters.

    Use the class matlab.mapreduce.DeploySparkMapReducer to configure your MATLAB application containing tall arrays with Spark parameters as key-value pairs.

    conf = matlab.mapreduce.DeploySparkMapReducer( ...
          'AppName','myTallApp', ...
          'Master','yarn-client', ...
          'SparkProperties',sparkProperties);

    For more information, see matlab.mapreduce.DeploySparkMapReducer.

  4. Define the Spark execution environment.

    Use the mapreducer function to define the Spark execution environment.

    mapreducer(conf)

    For more information, see mapreducer.

  5. Include your MATLAB application code containing tall arrays.

    Use the MATLAB function datastore to create a datastore object pointing to the file airlinesmall.csv in HDFS. Pass the datastore object as an input argument to the tall function. This will create a tall array. You can perform operations on the tall array to compute the mean arrival delay and the biggest arrival delays.

    % Create a |datastore| for a collection of tabular text files representing airline data. 
    % Select the variables of interest, specify a categorical data type for the 
    % |Origin| and |Dest| variables.
    % ds = datastore('airlinesmall.csv') % if using a dataset on your local machine
    ds = datastore('hdfs:///<username>/datasets/airlinesmall.csv');
    ds.TreatAsMissing = 'NA';
    ds.SelectedVariableNames = {'Year','Month','ArrDelay','DepDelay','Origin','Dest'};
    ds.SelectedFormats(5:6) = {'%C','%C'};
    
    % Create Tall Array
    % Tall arrays are like normal MATLAB arrays, except that they can have any
    % number of rows. When a |tall| array is backed by a |datastore|, the underlying class of
    % the tall array is based on the type of datastore.
    tt = tall(ds);
    
    % Remove Rows with Missing Data or NaN Values
    idx = any(ismissing(tt),2); 
    tt(idx,:) = [];
    
    % Compute Mean Delay
    meanArrivalDelay = mean(tt.DepDelay,'omitnan');
    biggestDelays = topkrows(tt,10,'ArrDelay');
    
    % Gather Results
    % The |gather| function forces evaluation of all queued operations and
    % brings the resulting output back into memory. 
    [meanArrivalDelay,biggestDelays] = gather(meanArrivalDelay,biggestDelays)
    
    % Delete mapreducer object
    delete(conf);
  6. Create a Spark application.

    Use the mcc command with the -vCW options to create a Spark application.

    >> mcc -vCW 'Spark:myTallApp' deployTallArrayToSpark.m

    The following files are created.

    FilesDescription
    run_myTallApp.shShell script to run application. The script invokes spark-submit to launch the application on the cluster.
    myTallApp.jarApplication JAR. The application JAR contains packaged MATLAB code and other dependencies.
    readme.txtReadme file containing details on how to run the application.
    requiredMCRProducts.txt 
    mccExcludedFiles.log 

    For more information, see mcc.

  7. Run the application from a Linux shell using the following command:

    $ ./run_myTallApp.sh /usr/local/MATLAB/MATLAB_Runtime/v##

    /usr/local/MATLAB/MATLAB_Runtime/v## is an argument indicating the location of the MATLAB Runtime.

  8. You will see the following output:

    meanArrivalDelay =
        7.1201
    
    biggestDelays =
    
      10x5 table
    
        Year    Month    ArrDelay    Origin    Dest
        ____    _____    ________    ______    ____
    
        1995    11       1014        HNL       LAX 
        2007     4        914        JFK       DTW 
        2001     4        887        MCO       DTW 
        2008     7        845        CMH       ORD 
        1988     3        772        ORD       LEX 
        2008     4        710        EWR       RDU 
        1998    10        679        MCI       DFW 
        2006     6        603        ABQ       PHX 
        2008     6        586        PIT       LGA 
        2007     4        568        RNO       SLC
    

Optionally, if you want to analyze or view the results generated by your application in MATLAB, you need to write the results to a file on HDFS using the write function for tall arrays. You can then read the file using the datastore function.

To write the results to file on HDFS, add the following line of code to your MATLAB application just before the delete(conf) statement and then package your application:

write('hdfs:///user/<username>/results', tall(biggestDelays));

Replace <username> with your user name.

You can only save one variable to a file using the write function for tall arrays. Therefore, you will need to write to multiple files if you want to save multiple variables.

To view the results in MATLAB after executing the application against a Spark enabled cluster, use the datastore function as follows:

>> ds = datastore('hdfs:///user/<username>/results')
>> readall(ds)

You may need to set the environment variable HADOOP_PREFIX using the function setenv in case you are unable to view the results using the datastore function.

Note

If the tall array application being deployed is a MATLAB function as opposed to a MATLAB script, use the following execution syntax:

$ ./run_<applicationName>.sh \
  <MATLAB_Runtime_Location> \
  [Spark arguments] \
  [Application arguments]
For example:
$ ./run_myTallApp.sh \
   /usr/local/MATLAB/MATLAB_Runtime/v92 \
   yarn-client \
   hdfs://host:54310/user/<username>/datasets/airlinesmall.csv \
   hdfs://host:54310/user/<username>/result

Code:

 deployTallArrayToSpark.m