Main Content

eventStreamProcessor

Apply stream analytic function to event stream

    This object requires Streaming Data Framework for MATLAB® Production Server™.

    Description

    Use an EventStreamProcessor object to apply a stream analytic function to an event stream. Using EventStreamProcessor object functions, you can automatically direct events from an event stream to a streaming analytic function, enabling you to process large amounts of data in event streams.

    You can run the streaming analytic function on a known number of event windows synchronously, similar to a for-loop. You can also run it with a desktop-hosted server to simulate asynchronous deployment in a production environment.

    EventStreamProcessor functions can process streaming data sequentially in batches by collecting events into windows of configurable size. When a window is full of the requested number of events, the window of events is passed to the stream processing analytic function. You can then save any results that the analytic function produces and optionally publish them to a different stream.

    A stream processing function can be stateful or stateless. For stateful functions, the EventStreamProcessor object maintains state between calls to the stream processing function. If the stream processing function changes the state, the function can return the state as a second output argument. The EventStreamProcessor object preserves these changes for the next function iteration.

    Creation

    Description

    esp = eventStreamProcessor(inputStream,streamFcn) creates an EventStreamProcessor object, which applies the stream function streamFcn to the event stream inputStream, and sets the InputStream and StreamFunction properties, respectively, of this object.

    example

    esp = eventStreamProcessor(inputStream,streamFcn,initialState) creates an EventStreamProcessor object that additionally initializes persistent state with the function initialState and sets the InitialState property. If streamFcn is stateful, then initialState is required.

    example

    esp = eventStreamProcessor(___,Name=Value) sets object properties using one or more name-value arguments. Name is a property name and Value is the corresponding value. You can specify multiple name-value arguments in any order as Name1=Value1,...,NameN=ValueN.

    Properties

    expand all

    Name of the deployable archive generated by the package function, specified as a string. The default archive name is the name of the streaming function.

    Data Types: string

    Name of the event variable used to group events, specified as a string array or character vector.

    If GroupVariable is nonempty, each event window is split into groups in which the event variables have the same value. Each group is then sent to the streaming function separately. GroupVariable is often set to the event key so that events from each event source are processed independently.

    Data Types: string | char

    Function that creates the initial state for the streaming analytic function, specified as a function handle. If the streaming analytic function is stateful, this property must be set when you create the object.

    Event stream from which the streaming analytic function reads events, specified as a KafkaStream, InMemoryStream, or TestStream object.

    Event stream to which the streaming analytic function writes events, specified as a KafkaStream, InMemoryStream, or TestStream object.

    Note

    If you are packaging your stream processing function into a deployable archive using the package function, do not leave OutputStream set to an InMemoryStream object. This object is not supported by the package function as an output stream.

    Streaming analytic function, specified as a function handle.

    Data Types: function handle

    Position in an event stream to read from, specified as one of these values:

    • "Beginning" — First event available in stream

    • "End" — Just past the last event in the stream

    • "Current" — Just past the current event in the stream

    Data Types: string

    Flag to clear persistent state after calling the seek function, specified as a logical scalar.

    Data Types: logical

    Object Functions

    executeExecute event stream processing function on specific number of event windows
    packagePackage stream processing function into deployable archive configured by EventStreamProcessor
    seekSet position in event stream to begin processing events
    startStart processing event streams using local test server
    startServerStart local test server
    stopStop processing event streams using local test server
    stopServerShut down local test server

    Examples

    collapse all

    Assume that you have a Kafka® server running at the network address kafka.host.com:9092 that has a topic RecamanSequence.

    Create an object connected to the RecamanSequence topic.

    ks = kafkaStream("kafka.host.com",9092,"RecamanSequence");

    Assume that you have a streaming analytic function recamanSum and a function to initialize persistent state called initRecamanSum.

    Create an EventStreamProcessor object to run the recamanSum function and initializes persistent state with the initRecamanSum function.

    esp = eventStreamProcessor(ks,@recamanSum,@initRecamanSum);
    esp = 
    
      EventStreamProcessor with properties:
    
          StreamFunction: @recamanSum
             InputStream: [1×1 matlab.io.stream.event.KafkaStream]
            OutputStream: [1×1 matlab.io.stream.event.InMemoryStream]
            InitialState: @initRecamanSum
           GroupVariable: [0×0 string]
            ReadPosition: Beginning
             ArchiveName: "recamanSum"
        ResetStateOnSeek: 1

    Iterate the streaming analytic function over the stream ten times.

    execute(esp,10);

    Examine the results.

    result = readtimetable(esp.OutputStream)

    Assume that you have a Kafka server running at the network address kafka.host.com:9092 that has a topic RecamanSequence.

    Also assume that you have a streaming analytic function recamanSum and a function initRecamanSum to initialize persistent state.

    Create a KafkaStream object connected to the RecamanSequence topic.

    ks = kafkaStream("kafka.host.com",9092,"RecamanSequence");

    Create another KafkaStream object to write the results of the streaming analytic function to a different topic called RecamanSequenceResults.

    outKS = kafkaStream("kafka.host.com",9092,"RecamanSequenceResults");

    Create an EventStreamProcessor object that runs the recamanSum function and initializes persistent state with the initRecamanSum function.

    esp = eventStreamProcessor(ks,@recamanSum,@initRecamanSum,OutputStream=outKS);
    esp = 
    
      EventStreamProcessor with properties:
    
          StreamFunction: @recamanSum
             InputStream: [1×1 matlab.io.stream.event.KafkaStream]
            OutputStream: [1×1 matlab.io.stream.event.KafkaStream]
            InitialState: @initRecamanSum
           GroupVariable: [0×0 string]
            ReadPosition: Beginning
             ArchiveName: "recamanSum"
        ResetStateOnSeek: 1

    Using the MATLAB editor, you can set breakpoints in the recamanSum function to examine the incoming streaming data when you start the server.

    Start the test server.

    Note

    To use the test server, you require MATLAB Compiler SDK™.

    startServer(esp);

    Doing so opens the Production Server Compiler app. When the app opens, you must start the server manually.

    To start the test server from the app, click Test Client, and then click Start. For an example on how to use the app, see Test Client Data Integration Against MATLAB (MATLAB Compiler SDK).

    Navigate back to the MATLAB command prompt to start processing events.

    start(esp);

    In the Production Server Compiler app, the test server receives data.

    From the MATLAB editor, if you set breakpoints, you can use the debugger to examine the data, state, and results of the function processing. Click Continue to continue debugging or Stop when you finish debugging.

    From the MATLAB command prompt, stop the server.

    stop(esp);

    Read results from the output stream.

    results = readtimetable(outKS);

    Version History

    Introduced in R2022b