stream_lag
Apply lag function to columns of a Spark Streaming DataFrame
Description
Given a streaming Spark dataframe as input, this function will return another streaming dataframe that contains all columns in the input and column(s) that are shifted behind by the offset(s) specified in ...
(see example)
Usage
stream_lag(x, cols, thresholds = NULL)
Arguments
Argument | Description |
---|---|
x | An object coercable to a Spark Streaming DataFrame. |
cols | A list of expressions of the form |
Examples
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.2.0")
streaming_path <- tempfile("days_df_")
days_df <- tibble::tibble(
today = weekdays(as.Date(seq(7), origin = "1970-01-01"))
)
num_iters <- 7
stream_generate_test(
df = days_df,
path = streaming_path,
distribution = rep(nrow(days_df), num_iters),
iterations = num_iters
)
stream_read_csv(sc, streaming_path) %>%
stream_lag(cols = c(yesterday = today ~ 1, two_days_ago = today ~ 2)) %>%
collect() %>%
print(n = 10L)