Prepare Data

sparklyr provide multiple methods to prepare data inside Spark:

This article will introduce each method and provide a simple example.

Exercise

For the exercise start a local session of Spark. We’ll start by copying a data set from R into the Spark cluster (note that you may need to install the nycflights13)

library(sparklyr)
sc <- spark_connect(master = "local")
flights_tbl <- copy_to(sc, nycflights13::flights, "spark_flights")

Using dplyr

We can use familiar dplyr commands to prepare data inside Spark. The commands run inside Spark, so there are no unnecessary data transfers between R and Spark.

In this example, we can see how easy it is to summarize the flights data without having to know how to write Spark SQL:

delay <- flights_tbl %>%
  group_by(tailnum) %>%
  summarise(
    count = n(), 
    dist = mean(distance, na.rm = TRUE), 
    delay = mean(arr_delay, na.rm = TRUE)
    ) %>%
  filter(count > 20, dist < 2000, !is.na(delay)) 

delay
# Source: spark<?> [?? x 4]
   tailnum count  dist  delay
   <chr>   <dbl> <dbl>  <dbl>
 1 N24211    130 1330.  7.7  
 2 N793JB    283 1529.  4.72 
 3 N657JB    285 1286.  5.03 
 4 N633AA     24 1587. -0.625
 5 N9EAMQ    248  675.  9.24 
 6 N3GKAA     77 1247.  4.97 
 7 N997DL     63  868.  4.90 
 8 N318NB    202  814. -1.12 
 9 N651JB    261 1408.  7.58 
10 N841UA     96 1208.  2.10 
# … with more rows

sparklyr and dplyr translate the R commands into Spark SQL for us. To see the resulting query use show_query():

dplyr::show_query(delay)
<SQL>
SELECT *
FROM (SELECT `tailnum`, COUNT(*) AS `count`, AVG(`distance`) AS `dist`, AVG(`arr_delay`) AS `delay`
FROM `spark_flights`
GROUP BY `tailnum`) `q01`
WHERE ((`count` > 20.0) AND (`dist` < 2000.0) AND (NOT(((`delay`) IS NULL))))

Notice that the delay variable does not contain data. It only contains the dplyr commands that are to run against the Spark connection.

For additional documentation on using dplyr with Spark see the Manipulating Data with dplyr article in this site

Using SQL

It’s also possible to execute SQL queries directly against tables within a Spark cluster. The spark_connection() object implements a DBI interface for Spark, so you can use dbGetQuery() to execute SQL and return the result as an R data frame:

library(DBI)

dbGetQuery(sc, "SELECT carrier, sched_dep_time, dep_time, dep_delay FROM spark_flights LIMIT 5")
  carrier sched_dep_time dep_time dep_delay
1      UA            515      517         2
2      UA            529      533         4
3      AA            540      542         2
4      B6            545      544        -1
5      DL            600      554        -6

Using Feature Transformers

Both of the previous methods rely on SQL statements. Spark provides commands that make some data transformation more convenient, and without the use of SQL.

For example, the ft_binarizer() command simplifies the creation of a new column that indicates if the value of another column is above a certain threshold.

flights_tbl %>% 
  ft_binarizer("dep_delay", "over_one", threshold = 1) %>% 
  select(dep_delay, over_one) %>% 
  head(5)
# Source: spark<?> [?? x 2]
  dep_delay over_one
      <dbl>    <dbl>
1         2        1
2         4        1
3         2        1
4        -1        0
5        -6        0

Find a full list of the Spark Feature Transformers available through sparklyr here: Reference - FT.

Disconnect from Spark

Lastly, cleanup your session by disconnecting Spark: