Skip to content

Commit

Permalink
Merge pull request #23 from lyft/branch-3.0
Browse files Browse the repository at this point in the history
Update to Spark 3.0.2
  • Loading branch information
catalinii authored Feb 17, 2021
2 parents 9607109 + c156570 commit a339cf3
Show file tree
Hide file tree
Showing 566 changed files with 7,949 additions and 2,334 deletions.
275 changes: 180 additions & 95 deletions .github/workflows/build_and_test.yml

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions .github/workflows/publish_snapshot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: Publish Snapshot

on:
push:
branches:
- branch-3.0

jobs:
publish-snapshot:
if: github.repository == 'apache/spark'
runs-on: ubuntu-latest
steps:
- name: Checkout Spark repository
uses: actions/checkout@master
- name: Cache Maven local repository
uses: actions/cache@v2
with:
path: ~/.m2/repository
key: snapshot-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
snapshot-maven-
- name: Install Java 8
uses: actions/setup-java@v1
with:
java-version: 8
- name: Publish snapshot
env:
ASF_USERNAME: ${{ secrets.NEXUS_USER }}
ASF_PASSWORD: ${{ secrets.NEXUS_PW }}
GPG_KEY: "not_used"
GPG_PASSPHRASE: "not_used"
GIT_REF: "branch-3.0"
run: ./dev/create-release/release-build.sh publish-snapshot
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ target/
unit-tests.log
work/
docs/.jekyll-metadata
docs/.jekyll-cache

# For Hive
TempStatsStore/
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: SparkR
Type: Package
Version: 3.0.1
Version: 3.0.2
Title: R Front End for 'Apache Spark'
Description: Provides an R Front end for 'Apache Spark' <https://spark.apache.org>.
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,8 @@ setMethod("collect",
arrowTable <- arrow::read_arrow(readRaw(conn))
}
as.data.frame(arrowTable, stringsAsFactors = stringsAsFactors)
}, finally = {
},
finally = {
close(conn)
})
return(output)
Expand Down
6 changes: 4 additions & 2 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -4028,7 +4028,8 @@ setMethod("date_trunc",
})

#' @details
#' \code{current_date}: Returns the current date as a date column.
#' \code{current_date}: Returns the current date at the start of query evaluation as a date column.
#' All calls of current_date within the same query return the same value.
#'
#' @rdname column_datetime_functions
#' @aliases current_date current_date,missing-method
Expand All @@ -4044,7 +4045,8 @@ setMethod("current_date",
})

#' @details
#' \code{current_timestamp}: Returns the current timestamp as a timestamp column.
#' \code{current_timestamp}: Returns the current timestamp at the start of query evaluation as
#' a timestamp column. All calls of current_timestamp within the same query return the same value.
#'
#' @rdname column_datetime_functions
#' @aliases current_timestamp current_timestamp,missing-method
Expand Down
201 changes: 60 additions & 141 deletions R/pkg/tests/fulltests/test_sparkSQL_arrow.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ library(testthat)

context("SparkSQL Arrow optimization")

sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
sparkSession <- sparkR.session(
master = sparkRTestMaster,
enableHiveSupport = FALSE,
sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))

test_that("createDataFrame/collect Arrow optimization", {
skip_if_not_installed("arrow")
Expand All @@ -35,29 +38,13 @@ test_that("createDataFrame/collect Arrow optimization", {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})

callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
expect_equal(collect(createDataFrame(mtcars)), expected)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
expect_equal(collect(createDataFrame(mtcars)), expected)
})

test_that("createDataFrame/collect Arrow optimization - many partitions (partition order test)", {
skip_if_not_installed("arrow")

conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]

callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
expect_equal(collect(createDataFrame(mtcars, numPartitions = 32)),
collect(createDataFrame(mtcars, numPartitions = 1)))
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
expect_equal(collect(createDataFrame(mtcars, numPartitions = 32)),
collect(createDataFrame(mtcars, numPartitions = 1)))
})

test_that("createDataFrame/collect Arrow optimization - type specification", {
Expand All @@ -81,13 +68,7 @@ test_that("createDataFrame/collect Arrow optimization - type specification", {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})

callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
expect_equal(collect(createDataFrame(rdf)), expected)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
expect_equal(collect(createDataFrame(rdf)), expected)
})

test_that("dapply() Arrow optimization", {
Expand All @@ -98,36 +79,30 @@ test_that("dapply() Arrow optimization", {
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]

callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "false")
tryCatch({
ret <- dapply(df,
function(rdf) {
stopifnot(is.data.frame(rdf))
rdf
},
schema(df))
expected <- collect(ret)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})

callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
ret <- dapply(df,
function(rdf) {
stopifnot(is.data.frame(rdf))
# mtcars' hp is more then 50.
stopifnot(all(rdf$hp > 50))
rdf
},
schema(df))
actual <- collect(ret)
expect_equal(actual, expected)
expect_equal(count(ret), nrow(mtcars))
expected <- collect(ret)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})

ret <- dapply(df,
function(rdf) {
stopifnot(is.data.frame(rdf))
# mtcars' hp is more then 50.
stopifnot(all(rdf$hp > 50))
rdf
},
schema(df))
actual <- collect(ret)
expect_equal(actual, expected)
expect_equal(count(ret), nrow(mtcars))
})

test_that("dapply() Arrow optimization - type specification", {
Expand All @@ -154,34 +129,18 @@ test_that("dapply() Arrow optimization - type specification", {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})

callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
ret <- dapply(df, function(rdf) { rdf }, schema(df))
actual <- collect(ret)
expect_equal(actual, expected)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
ret <- dapply(df, function(rdf) { rdf }, schema(df))
actual <- collect(ret)
expect_equal(actual, expected)
})

test_that("dapply() Arrow optimization - type specification (date and timestamp)", {
skip_if_not_installed("arrow")
rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
b = as.POSIXct("1990-02-24 12:34:56"))))
df <- createDataFrame(rdf)

conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]

callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
ret <- dapply(df, function(rdf) { rdf }, schema(df))
expect_equal(collect(ret), rdf)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
ret <- dapply(df, function(rdf) { rdf }, schema(df))
expect_equal(collect(ret), rdf)
})

test_that("gapply() Arrow optimization", {
Expand Down Expand Up @@ -209,28 +168,22 @@ test_that("gapply() Arrow optimization", {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})

callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
ret <- gapply(df,
"gear",
function(key, grouped) {
if (length(key) > 0) {
stopifnot(is.numeric(key[[1]]))
}
stopifnot(is.data.frame(grouped))
stopifnot(length(colnames(grouped)) == 11)
# mtcars' hp is more then 50.
stopifnot(all(grouped$hp > 50))
grouped
},
schema(df))
actual <- collect(ret)
expect_equal(actual, expected)
expect_equal(count(ret), nrow(mtcars))
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
ret <- gapply(df,
"gear",
function(key, grouped) {
if (length(key) > 0) {
stopifnot(is.numeric(key[[1]]))
}
stopifnot(is.data.frame(grouped))
stopifnot(length(colnames(grouped)) == 11)
# mtcars' hp is more then 50.
stopifnot(all(grouped$hp > 50))
grouped
},
schema(df))
actual <- collect(ret)
expect_equal(actual, expected)
expect_equal(count(ret), nrow(mtcars))
})

test_that("gapply() Arrow optimization - type specification", {
Expand All @@ -250,84 +203,50 @@ test_that("gapply() Arrow optimization - type specification", {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "false")
tryCatch({
ret <- gapply(df,
"a",
function(key, grouped) { grouped }, schema(df))
"a",
function(key, grouped) { grouped }, schema(df))
expected <- collect(ret)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})


callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
ret <- gapply(df,
"a",
function(key, grouped) { grouped }, schema(df))
actual <- collect(ret)
expect_equal(actual, expected)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
ret <- gapply(df,
"a",
function(key, grouped) { grouped }, schema(df))
actual <- collect(ret)
expect_equal(actual, expected)
})

test_that("gapply() Arrow optimization - type specification (date and timestamp)", {
skip_if_not_installed("arrow")
rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
b = as.POSIXct("1990-02-24 12:34:56"))))
df <- createDataFrame(rdf)

conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]

callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
ret <- gapply(df,
"a",
function(key, grouped) { grouped }, schema(df))
expect_equal(collect(ret), rdf)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
ret <- gapply(df,
"a",
function(key, grouped) { grouped }, schema(df))
expect_equal(collect(ret), rdf)
})

test_that("Arrow optimization - unsupported types", {
skip_if_not_installed("arrow")

conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
expect_error(checkSchemaInArrow(structType("a FLOAT")), "not support float type")
expect_error(checkSchemaInArrow(structType("a BINARY")), "not support binary type")
expect_error(checkSchemaInArrow(structType("a ARRAY<INT>")), "not support array type")
expect_error(checkSchemaInArrow(structType("a MAP<INT, INT>")), "not support map type")
expect_error(checkSchemaInArrow(structType("a STRUCT<a: INT>")),
"not support nested struct type")
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
expect_error(checkSchemaInArrow(structType("a FLOAT")), "not support float type")
expect_error(checkSchemaInArrow(structType("a BINARY")), "not support binary type")
expect_error(checkSchemaInArrow(structType("a ARRAY<INT>")), "not support array type")
expect_error(checkSchemaInArrow(structType("a MAP<INT, INT>")), "not support map type")
expect_error(checkSchemaInArrow(structType("a STRUCT<a: INT>")),
"not support nested struct type")
})

test_that("SPARK-32478: gapply() Arrow optimization - error message for schema mismatch", {
skip_if_not_installed("arrow")
df <- createDataFrame(list(list(a = 1L, b = "a")))

conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]

callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
expect_error(
expect_error(
count(gapply(df, "a", function(key, group) { group }, structType("a int, b int"))),
"expected IntegerType, IntegerType, got IntegerType, StringType")
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
})

sparkR.session.stop()
Loading

0 comments on commit a339cf3

Please sign in to comment.