Creating Extensions for sparklyr
Introduction
The sparklyr package provides a dplyr interface to Spark DataFrames as well as an R interface to Spark’s distributed machine learning pipelines. However, since Spark is a general-purpose cluster computing system there are many other R interfaces that could be built (e.g. interfaces to custom machine learning pipelines, interfaces to 3rd party Spark packages, etc.).
The facilities used internally by sparklyr for its dplyr and machine learning interfaces are available to extension packages. This guide describes how you can use these tools to create your own custom R interfaces to Spark.
Examples
Here’s an example of an extension function that calls the text file line counting function available via the SparkContext:
The count_lines
function takes a spark_connection
(sc
) argument which enables it to obtain a reference to the SparkContext
object, and in turn call the textFile().count()
method.
You can use this function with an existing sparklyr connection as follows:
library(sparklyr)
sc <- spark_connect(master = "local")
count_lines(sc, "hdfs://path/data.csv")
Here are links to some additional examples of extension packages:
Package | Description |
---|---|
spark.sas7bdat |
Read in SAS data in parallel into Apache Spark. |
rsparkling |
Extension for using H2O machine learning algorithms against Spark Data Frames. |
sparkhello |
Simple example of including a custom JAR file within an extension package. |
rddlist |
Implements some methods of an R list as a Spark RDD (resilient distributed dataset). |
sparkwarc |
Load WARC files into Apache Spark with sparklyr. |
sparkavro |
Load Avro data into Spark with sparklyr. It is a wrapper of spark-avro |
crassy |
Connect to Cassandra with sparklyr using the Spark-Cassandra-Connector . |
sparklygraphs |
R interface for GraphFrames which aims to provide the functionality of GraphX. |
sparklyr.nested |
Extension for working with nested data. |
sparklyudf |
Simple example registering an Scala UDF within an extension package. |
mleap |
R Interface to MLeap. |
sparkbq |
Sparklyr extension package to connect to Google BigQuery. |
sparkgeo |
Sparklyr extension package providing geospatial analytics capabilities. |
sparklytd |
Spaklyr plugin for td-spark to connect TD from R. |
sparkts |
Extensions for the spark-timeseries framework. |
sparkxgb |
R interface for XGBoost on Spark. |
sparktf |
R interface to Spark TensorFlow Connector. |
geospark |
R interface to GeoSpark to perform spatial analysis in Spark. |
mmlspark |
Microsoft Machine Learning for Apache Spark. |
Core Types
Three classes are defined for representing the fundamental types of the R to Java bridge:
Function | Description |
---|---|
spark_connection |
Connection between R and the Spark shell process |
spark_jobj |
Instance of a remote Spark object |
spark_dataframe |
Instance of a remote Spark DataFrame object |
S3 methods are defined for each of these classes so they can be easily converted to or from objects that contain or wrap them. Note that for any given spark_jobj
it’s possible to discover the underlying spark_connection
.
Calling Spark from R
There are several functions available for calling the methods of Java objects and static methods of Java classes:
Function | Description |
---|---|
invoke |
Call a method on an object |
invoke_new |
Create a new object by invoking a constructor |
invoke_static |
Call a static method on an object |
For example, to create a new instance of the java.math.BigInteger
class and then call the longValue()
method on it you would use code like this:
billionBigInteger <- invoke_new(sc, "java.math.BigInteger", "1000000000")
billion <- invoke(billionBigInteger, "longValue")
Note the sc
argument: that’s the spark_connection
object which is provided by the front-end package (e.g. sparklyr).
The previous example can be re-written to be more compact and clear using magrittr pipes:
billion <- sc %>%
invoke_new("java.math.BigInteger", "1000000000") %>%
invoke("longValue")
This syntax is similar to the method-chaining syntax often used with Scala code so is generally preferred.
Calling a static method of a class is also straightforward. For example, to call the Math::hypot()
static function you would use this code:
hypot <- sc %>%
invoke_static("java.lang.Math", "hypot", 10, 20)
Wrapper Functions
Creating an extension typically consists of writing R wrapper functions for a set of Spark services. In this section we’ll describe the typical form of these functions as well as how to handle special types like Spark DataFrames.
Here’s the wrapper function for textFile().count()
which we defined earlier:
count_lines <- function(sc, file) {
spark_context(sc) %>%
invoke("textFile", file, 1L) %>%
invoke("count")
}
The count_lines
function takes a spark_connection
(sc
) argument which enables it to obtain a reference to the SparkContext
object, and in turn call the textFile().count()
method.
The following functions are useful for implementing wrapper functions of various kinds:
Function | Description |
---|---|
spark_connection |
Get the Spark connection associated with an object (S3) |
spark_jobj |
Get the Spark jobj associated with an object (S3) |
spark_dataframe |
Get the Spark DataFrame associated with an object (S3) |
spark_context |
Get the SparkContext for a spark_connection
|
hive_context |
Get the HiveContext for a spark_connection
|
spark_version |
Get the version of Spark (as a numeric_version ) for a spark_connection
|
The use of these functions is illustrated in this simple example:
analyze <- function(x, features) {
# normalize whatever we were passed (e.g. a dplyr tbl) into a DataFrame
df <- spark_dataframe(x)
# get the underlying connection so we can create new objects
sc <- spark_connection(df)
# create an object to do the analysis and call its `analyze` and `summary`
# methods (note that the df and features are passed to the analyze function)
summary <- sc %>%
invoke_new("com.example.tools.Analyzer") %>%
invoke("analyze", df, features) %>%
invoke("summary")
# return the results
summary
}
The first argument is an object that can be accessed using the Spark DataFrame API (this might be an actual reference to a DataFrame or could rather be a dplyr tbl
which has a DataFrame reference inside it).
After using the spark_dataframe
function to normalize the reference, we extract the underlying Spark connection associated with the data frame using the spark_connection
function. Finally, we create a new Analyzer
object, call it’s analyze
method with the DataFrame and list of features, and then call the summary
method on the results of the analysis.
Accepting a spark_jobj
or spark_dataframe
as the first argument of a function makes it very easy to incorporate into magrittr pipelines so this pattern is highly recommended when possible.
Dependencies
When creating R packages which implement interfaces to Spark you may need to include additional dependencies. Your dependencies might be a set of Spark Packages or might be a custom JAR file. In either case, you’ll need a way to specify that these dependencies should be included during the initialization of a Spark session. A Spark dependency is defined using the spark_dependency
function:
Function | Description |
---|---|
spark_dependency |
Define a Spark dependency consisting of JAR files and Spark packages |
Your extension package can specify it’s dependencies by implementing a function named spark_dependencies
within the package (this function should not be publicly exported). For example, let’s say you were creating an extension package named sparkds that needs to include a custom JAR as well as the Redshift and Apache Avro packages:
spark_dependencies <- function(spark_version, scala_version, ...) {
spark_dependency(
jars = c(
system.file(
sprintf("java/sparkds-%s-%s.jar", spark_version, scala_version),
package = "sparkds"
)
),
packages = c(
sprintf("com.databricks:spark-redshift_%s:0.6.0", scala_version),
sprintf("com.databricks:spark-avro_%s:2.0.1", scala_version)
)
)
}
.onLoad <- function(libname, pkgname) {
sparklyr::register_extension(pkgname)
}
The spark_version
argument is provided so that a package can support multiple Spark versions for it’s JARs. Note that the argument will include just the major and minor versions (e.g. 1.6
or 2.0
) and will not include the patch level (as JARs built for a given major/minor version are expected to work for all patch levels).
The scala_version
argument is provided so that a single package can support multiple Scala compiler versions for it’s JARs and packages (currently Scala 1.6 downloadable binaries are compiled with Scala 2.10 and Scala 2.0 downloadable binaries are compiled with Scala 2.11).
The ...
argument is unused but nevertheless should be included to ensure compatibility if new arguments are added to spark_dependencies
in the future.
The .onLoad
function registers your extension package so that it’s spark_dependencies
function will be automatically called when new connections to Spark are made via spark_connect
:
library(sparklyr)
library(sparkds)
sc <- spark_connect(master = "local")
Compiling JARs
The sparklyr package includes a utility function (compile_package_jars
) that will automatically compile a JAR file from your Scala source code for the required permutations of Spark and Scala compiler versions. To use the function just invoke it from the root directory of your R package as follows:
sparklyr::compile_package_jars()
Note that a prerequisite to calling compile_package_jars
is the installation of the Scala 2.10 and 2.11 compilers to one of the following paths:
- /opt/scala
- /opt/local/scala
- /usr/local/scala
- ~/scala (Windows-only)
See the sparkhello repository for a complete example of including a custom JAR within an extension package.
CRAN
When including a JAR file within an R package distributed on CRAN, you should follow the guidelines provided in Writing R Extensions:
Java code is a special case: except for very small programs, .java files should be byte-compiled (to a .class file) and distributed as part of a .jar file: the conventional location for the .jar file(s) is
inst/java
. It is desirable (and required under an Open Source license) to make the Java source files available: this is best done in a top-leveljava
directory in the package – the source files should not be installed.
Data Types
The ensure_*
family of functions can be used to enforce specific data types that are passed to a Spark routine. For example, Spark routines that require an integer will not accept an R numeric element. Use these functions ensure certain parameters are scalar integers, or scalar doubles, and so on.
- ensure_scalar_integer
- ensure_scalar_double
- ensure_scalar_boolean
- ensure_scalar_character
In order to match the correct data types while calling Scala code from R, or retrieving results from Scala back to R, consider the following types mapping:
From R | Scala | To R |
---|---|---|
NULL | void | NULL |
integer | Int | integer |
character | String | character |
logical | Boolean | logical |
double | Double | double |
numeric | Double | double |
Float | double | |
Decimal | double | |
Long | double | |
raw | Array[Byte] | raw |
Date | Date | Date |
POSIXlt | Time | |
POSIXct | Time | POSIXct |
list | Array[T] | list |
environment | Map[String, T] | |
jobj | Object | jobj |
Compiling
Most Spark extensions won’t need to define their own compilation specification, and can instead rely on the default behavior of compile_package_jars
. For users who would like to take more control over where the scalac compilers should be looked up, use the spark_compilation_spec
fucnction. The Spark compilation specification is used when compiling Spark extension Java Archives, and defines which versions of Spark, as well as which versions of Scala, should be used for compilation.