library(sparklyr)
sc <- spark_connect(master = "local")
flights_tbl <- copy_to(sc, nycflights13::flights, "spark_flights")
Prepare Data
sparklyr
provide multiple methods to prepare data inside Spark:
- Using
dplyr
commands - Using SQL
- Using Spark’s feature transformers
This article will introduce each method and provide a simple example.
Exercise
For the exercise start a local session of Spark. We’ll start by copying a data set from R into the Spark cluster (note that you may need to install the nycflights13
)
Using dplyr
We can use familiar dplyr
commands to prepare data inside Spark. The commands run inside Spark, so there are no unnecessary data transfers between R and Spark.
In this example, we can see how easy it is to summarize the flights data without having to know how to write Spark SQL:
delay <- flights_tbl %>%
group_by(tailnum) %>%
summarise(
count = n(),
dist = mean(distance, na.rm = TRUE),
delay = mean(arr_delay, na.rm = TRUE)
) %>%
filter(count > 20, dist < 2000, !is.na(delay))
delay
# Source: spark<?> [?? x 4]
tailnum count dist delay
<chr> <dbl> <dbl> <dbl>
1 N24211 130 1330. 7.7
2 N793JB 283 1529. 4.72
3 N657JB 285 1286. 5.03
4 N633AA 24 1587. -0.625
5 N9EAMQ 248 675. 9.24
6 N3GKAA 77 1247. 4.97
7 N997DL 63 868. 4.90
8 N318NB 202 814. -1.12
9 N651JB 261 1408. 7.58
10 N841UA 96 1208. 2.10
# … with more rows
sparklyr
and dplyr
translate the R commands into Spark SQL for us. To see the resulting query use show_query()
:
dplyr::show_query(delay)
<SQL>
SELECT *
FROM (SELECT `tailnum`, COUNT(*) AS `count`, AVG(`distance`) AS `dist`, AVG(`arr_delay`) AS `delay`
FROM `spark_flights`
GROUP BY `tailnum`) `q01`
WHERE ((`count` > 20.0) AND (`dist` < 2000.0) AND (NOT(((`delay`) IS NULL))))
Notice that the delay
variable does not contain data. It only contains the dplyr
commands that are to run against the Spark connection.
For additional documentation on using dplyr with Spark see the Manipulating Data with dplyr
article in this site
Using SQL
It’s also possible to execute SQL queries directly against tables within a Spark cluster. The spark_connection()
object implements a DBI interface for Spark, so you can use dbGetQuery()
to execute SQL and return the result as an R data frame:
library(DBI)
dbGetQuery(sc, "SELECT carrier, sched_dep_time, dep_time, dep_delay FROM spark_flights LIMIT 5")
carrier sched_dep_time dep_time dep_delay
1 UA 515 517 2
2 UA 529 533 4
3 AA 540 542 2
4 B6 545 544 -1
5 DL 600 554 -6
Using Feature Transformers
Both of the previous methods rely on SQL statements. Spark provides commands that make some data transformation more convenient, and without the use of SQL.
For example, the ft_binarizer()
command simplifies the creation of a new column that indicates if the value of another column is above a certain threshold.
flights_tbl %>%
ft_binarizer("dep_delay", "over_one", threshold = 1) %>%
select(dep_delay, over_one) %>%
head(5)
# Source: spark<?> [?? x 2]
dep_delay over_one
<dbl> <dbl>
1 2 1
2 4 1
3 2 1
4 -1 0
5 -6 0
Find a full list of the Spark Feature Transformers available through sparklyr
here: Reference - FT.
Disconnect from Spark
Lastly, cleanup your session by disconnecting Spark:
spark_disconnect(sc)