Intro to Spark Streaming with sparklyr

The sparklyr interface

As stated in the Spark’s official site, Spark Streaming makes it easy to build scalable fault-tolerant streaming applications. Because is part of the Spark API, it is possible to re-use query code that queries the current state of the stream, as well as joining the streaming data with historical data. Please see Spark’s official documentation for a deeper look into Spark Streaming.

The sparklyr interface provides the following:

  • Ability to run dplyr, SQL, spark_apply(), and PipelineModels against a stream
  • Read in multiple formats: CSV, text, JSON, parquet, Kafka, JDBC, and orc
  • Write stream results to Spark memory and the following file formats: CSV, text, JSON, parquet, Kafka, JDBC, and orc
  • An out-of-the box graph visualization to monitor the stream
  • A new reactiveSpark() function, that allows Shiny apps to poll the contents of the stream create Shiny apps that are able to read the contents of the stream

Interacting with a stream

A good way of looking at the way how Spark streams update is as a three stage operation:

  1. Input - Spark reads the data inside a given folder. The folder is expected to contain multiple data files, with new files being created containing the most current stream data.
  2. Processing - Spark applies the desired operations on top of the data. These operations could be data manipulations (dplyr, SQL), data transformations (sdf operations, PipelineModel predictions), or native R manipulations (spark_apply()).
  3. Output - The results of processing the input files are saved in a different folder.

In the same way all of the read and write operations in sparklyr for Spark Standalone, or in sparklyr’s local mode, the input and output folders are actual OS file system folders. For Hadoop clusters, these will be folder locations inside the HDFS.

Example 1 - Input/Output

The first intro example is a small script that can be used with a local master. The result should be to see the stream_view() app showing live the number of records processed for each iteration of test data being sent to the stream.

library(future)
library(sparklyr)

sc <- spark_connect(master = "local")

if(file.exists("source")) unlink("source", TRUE)
if(file.exists("source-out")) unlink("source-out", TRUE)

stream_generate_test(iterations = 1)
read_folder <- stream_read_csv(sc, "source") 
write_output <- stream_write_csv(read_folder, "source-out")
invisible(future(stream_generate_test(interval = 0.5)))

stream_view(write_output)

stream_stop(write_output)
spark_disconnect(sc)

Code breakdown

  1. Open the Spark connection

    library(sparklyr)
    sc <- spark_connect(master = "local")
  2. Optional step. This resets the input and output folders. It makes it easier to run the code multiple times in a clean manner.

    if(file.exists("source")) unlink("source", TRUE)
    if(file.exists("source-out")) unlink("source-out", TRUE)
  3. Produces a single test file inside the “source” folder. This allows the “read” function to infer CSV file definition.

    stream_generate_test(iterations = 1)
    list.files("source")
    [1] "stream_1.csv"
  4. Points the stream reader to the folder where the streaming files will be placed. Since it is primed with a single CSV file, it will use as the expected layout of subsequent files. By default, stream_read_csv() creates a single integer variable data frame.

    read_folder <- stream_read_csv(sc, "source")
  5. The output writer is what starts the streaming job. It will start monitoring the input folder, and then write the new results in the “source-out” folder. So as new records stream in, new files will be created in the “source-out” folder. Since there are no operations on the incoming data at this time, the output files will have the same exact raw data as the input files. The only difference is that the files and sub folders within “source-out” will be structured how Spark structures data folders.

    write_output <- stream_write_csv(read_folder, "source-out")
    list.files("source-out")
    [1] "_spark_metadata"                                     "checkpoint"
    [3] "part-00000-1f29719a-2314-40e1-b93d-a647a3d57154-c000.csv"
  6. The test generation function will run 100 files every 0.2 seconds. To run the tests “out-of-sync” with the current R session, the future package is used.

    library(future)
    
    invisible(
      future(
        stream_generate_test(interval = 0.2, iterations = 100)
        )
      )
  7. The stream_view() function can be used before the 50 tests are complete because of the use of the future package. It will monitor the status of the job that write_output is pointing to and provide information on the amount of data coming into the “source” folder and going out into the “source-out” folder.

    stream_view(write_output)
  8. The monitor will continue to run even after the tests are complete. To end the experiment, stop the Shiny app and then use the following to stop the stream and close the Spark session.

    stream_stop(write_output)
    
    spark_disconnect(sc)

Example 2 - Processing

The second example builds on the first. It adds a processing step that manipulates the input data before saving it to the output folder. In this case, a new binary field is added indicating if the value from x is over 400 or not. This time, while run the second code chunk in this example a few times during the stream tests to see the aggregated values change.

library(future)
library(sparklyr)
library(dplyr, warn.conflicts = FALSE)

sc <- spark_connect(master = "local")

if(file.exists("source")) unlink("source", TRUE)
if(file.exists("source-out")) unlink("source-out", TRUE)

stream_generate_test(iterations = 1)
read_folder <- stream_read_csv(sc, "source") 

process_stream <- read_folder %>%
  mutate(x = as.double(x)) %>%
  ft_binarizer(
    input_col = "x",
    output_col = "over",
    threshold = 400
  )

write_output <- stream_write_csv(process_stream, "source-out")
invisible(future(stream_generate_test(interval = 0.2, iterations = 100)))

Run this code a few times during the experiment:

spark_read_csv(sc, "stream", "source-out", memory = FALSE) %>%
  group_by(over) %>%
  tally()

The results would look similar to this. The n totals will increase as the experiment progresses.

# Source:   lazy query [?? x 2]
# Database: spark_connection
   over     n
  <dbl> <dbl>
1     0 40215
2     1 60006

Clean up after the experiment

stream_stop(write_output)
spark_disconnect(sc)

Code breakdown

  1. The processing starts with the read_folder variable that contains the input stream. It coerces the integer field x, into a type double. This is because the next function, ft_binarizer() does not accept integers. The binarizer determines if x is over 400 or not. This is a good illustration of how dplyr can help simplify the manipulation needed during the processing stage.

    process_stream <- read_folder %>%
      mutate(x = as.double(x)) %>%
      ft_binarizer(
        input_col = "x",
        output_col = "over",
        threshold = 400
      )
  2. The output now needs to write-out the processed data instead of the raw input data. Swap read_folder with process_stream.

    write_output <- stream_write_csv(process_stream, "source-out")
  3. The “source-out” folder can be treated as a if it was a single table within Spark. Using spark_read_csv(), the data can be mapped, but not brought into memory (memory = FALSE). This allows the current results to be further analyzed using regular dplyr commands.

    spark_read_csv(sc, "stream", "source-out", memory = FALSE) %>%
      group_by(over) %>%
      tally()

Example 3 - Aggregate in process and output to memory

Another option is to save the results of the processing into a in-memory Spark table. Unless intentionally saving it to disk, the table and its data will only exist while the Spark session is active.

The biggest advantage of using Spark memory as the target, is that it will allow for aggregation to happen during processing. This is an advantage because aggregation is not allowed for any file output, expect Kafka, on the input/process stage.

Using example 2 as the base, this example code will perform some aggregations to the current stream input and save only those summarized results into Spark memory:

library(future)
library(sparklyr)
library(dplyr, warn.conflicts = FALSE)

sc <- spark_connect(master = "local")

if(file.exists("source")) unlink("source", TRUE)

stream_generate_test(iterations = 1)
read_folder <- stream_read_csv(sc, "source") 

process_stream <- read_folder %>%
  stream_watermark() %>%
  group_by(timestamp) %>%
  summarise(
    max_x = max(x, na.rm = TRUE),
    min_x = min(x, na.rm = TRUE),
    count = n()
  )

write_output <- stream_write_memory(process_stream, name = "stream")

invisible(future(stream_generate_test()))

Run this command a different times while the experiment is running:

tbl(sc, "stream") 

Clean up after the experiment

stream_stop(write_output)

spark_disconnect(sc)

Code breakdown

  1. The stream_watermark() functions add a new timestamp variable that is then used in the group_by() command. This is required by Spark Stream to accept summarized results as output of the stream. The second step is to simply decide what kinds of aggregations we need to perform. In this case, a simply max, min and count are performed.

    process_stream <- read_folder %>%
      stream_watermark() %>%
      group_by(timestamp) %>%
      summarise(
        max_x = max(x, na.rm = TRUE),
        min_x = min(x, na.rm = TRUE),
        count = n()
      )
  2. The spark_write_memory() function is used to write the output to Spark memory. The results will appear as a table of the Spark session with the name assigned in the name argument, in this case the name selected is: “stream”.

    write_output <- stream_write_memory(process_stream, name = "stream")
  3. To query the current data in the “stream” table can be queried by using the dplyr tbl() command.

    tbl(sc, "stream") 

Example 4 - Shiny integration

sparklyr provides a new Shiny function called reactiveSpark(). It can take a Spark data frame, in this case the one created as a result of the stream processing, and then creates a Spark memory stream table, the same way a table is created in example 3.

library(future)
library(sparklyr)
library(dplyr, warn.conflicts = FALSE)
library(ggplot2)

sc <- spark_connect(master = "local")

if(file.exists("source")) unlink("source", TRUE)
if(file.exists("source-out")) unlink("source-out", TRUE)

stream_generate_test(iterations = 1)
read_folder <- stream_read_csv(sc, "source") 

process_stream <- read_folder %>%
  stream_watermark() %>%
  group_by(timestamp) %>%
  summarise(
    max_x = max(x, na.rm = TRUE),
    min_x = min(x, na.rm = TRUE),
    count = n()
  )

invisible(future(stream_generate_test(interval = 0.2, iterations = 100)))

library(shiny)
ui <- function(){
  tableOutput("table")
}
server <- function(input, output, session){
  
  ps <- reactiveSpark(process_stream)
  
  output$table <- renderTable({
    ps() %>%
      mutate(timestamp = as.character(timestamp)) 
    })
}
runGadget(ui, server)

Code breakdown

  1. Notice that there is no stream_write_... command. The reason is that reactiveSpark() function contains the stream_write_memory() function.

  2. This very basic Shiny app simply displays the output of a table in the ui section

    library(shiny)
    
    ui <- function(){
      tableOutput("table")
    }
  3. In the server section, the reactiveSpark() function will update every time there’s a change to the stream and return a data frame. The results are saved to a variable called ps() in this script. Treat the ps() variable as a regular table that can be piped from, as shown in the example. In this case, the timestamp variable is converted to string for to make it easier to read.

    server <- function(input, output, session){
    
      ps <- reactiveSpark(process_stream)
    
      output$table <- renderTable({
        ps() %>%
          mutate(timestamp = as.character(timestamp)) 
      })
    }
  4. Use runGadget() to display the Shiny app in the Viewer pane. This is optional, the app can be run using normal Shiny run functions.

    runGadget(ui, server)

Example 5 - ML Pipeline Model

This example uses a fitted Pipeline Model to process the input, and saves the predictions to the output. This approach would be used to apply Machine Learning on top of streaming data.

library(sparklyr)
library(dplyr, warn.conflicts = FALSE)

sc <- spark_connect(master = "local")

if(file.exists("source")) unlink("source", TRUE)
if(file.exists("source-out")) unlink("source-out", TRUE)

df <- data.frame(x = rep(1:1000), y = rep(2:1001))

stream_generate_test(df = df, iteration = 1)

model_sample <- spark_read_csv(sc, "sample", "source")

pipeline <- sc %>%
  ml_pipeline() %>%
  ft_r_formula(x ~ y) %>%
  ml_linear_regression()

fitted_pipeline <- ml_fit(pipeline, model_sample)

ml_stream <- stream_read_csv(
    sc = sc, 
    path = "source", 
    columns = c(x = "integer", y = "integer")
  )  %>%
  ml_transform(fitted_pipeline, .)  %>%
  select(- features) %>%
  stream_write_csv("source-out")

stream_generate_test(df = df, interval = 0.5)
spark_read_csv(sc, "stream", "source-out", memory = FALSE) 
### Source: spark<stream> [?? x 4]
##       x     y label prediction
## * <int> <int> <dbl>      <dbl>
## 1   276   277   276       276.
## 2   277   278   277       277.
## 3   278   279   278       278.
## 4   279   280   279       279.
## 5   280   281   280       280.
## 6   281   282   281       281.
## 7   282   283   282       282.
## 8   283   284   283       283.
## 9   284   285   284       284.
##10   285   286   285       285.
### ... with more rows

Code Breakdown

  1. Creates and fits a pipeline

    df <- data.frame(x = rep(1:1000), y = rep(2:1001))
    stream_generate_test(df = df, iteration = 1)
    model_sample <- spark_read_csv(sc, "sample", "source")
    
    pipeline <- sc %>%
      ml_pipeline() %>%
      ft_r_formula(x ~ y) %>%
      ml_linear_regression()
    
    fitted_pipeline <- ml_fit(pipeline, model_sample)
  2. This example pipelines the input, process and output in a single code segment. The ml_transform() function is used to create the predictions. Because the CSV format does not support list type fields, the features column is removed before the results are sent to the output.

    ml_stream <- stream_read_csv(
        sc = sc, 
        path = "source", 
        columns = c(x = "integer", y = "integer")
      )  %>%
      ml_transform(fitted_pipeline, .)  %>%
      select(- features) %>%
      stream_write_csv("source-out")
stream_stop(write_output)
spark_disconnect(sc)