-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSparkSQL-in-R.Rmd
146 lines (94 loc) · 3.42 KB
/
SparkSQL-in-R.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
---
title: Spark SQL and Machine Learning with R
author: Ali Zaidi
date: '`r format(Sys.Date(), "%B %d, %Y")`'
output:
html_notebook:
toc: yes
toc_float: true
html_document:
keep_md: yes
toc: yes
toc_float: true
---
# Using Spark SQL with R
There are three different R APIs we can use with Spark: [`SparkR`](http://spark.apache.org/docs/latest/sparkr.html), [`RxSpark`](https://msdn.microsoft.com/en-us/microsoft-r/scaler/rxspark), and [`sparklyr`](spark.rstudio.com).
We will examine `RxSpark` in depth later today. For now, let's take a look at `sparklyr`. The greatest advantage of `sparklyr` over `SparkR` is it's clean and tidy interface to Spark SQL and SparkML.
```{r, eval = FALSE}
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "yarn-client")
src_tbls(sc)
```
## Configuration
To specify the configuration of your spark session, use the `spark_config` function from the `sparklyr` package.
```{r}
config <- spark_config()
config$spark.executor.cores <- 3
config$spark.executor.memory <- "15G"
sc <- spark_connect(master = "yarn-client",
config = config)
```
## Working with Tables
```{r}
pullrequest <- tbl(sc, "pullrequest")
users <- tbl(sc, "users")
pullrequest %>% head()
# what's the schema of the table users
sdf_schema(users)
```
## Joins
```{r}
users_sub <- select(users, login, userid, bio, blog, company) %>%
sdf_register("usersSub")
tbl_cache(sc, 'usersSub')
pullrequest_sub <- select(pullrequest, repo, owner, pullrequestid,
additions, deletions, baserepofullname,
body, comments, commits, merged, mergeable,
userid, userlogin) %>% sdf_register('pr_sub')
tbl_cache(sc, 'pr_sub')
joined_tbl <- left_join(pullrequest_sub, users_sub, by = "userid")
joined_tbl %>% sdf_register('joined')
tbl_cache(sc, 'joined')
```
## Calculate Number of Merged Pull Requests
```{r}
sum_pr_commit <- joined_tbl %>%
group_by(repo) %>%
summarise(counts = n(),
ave_additions = mean(additions),
ave_deletions = mean(deletions),
merged_total = sum(as.numeric(merged)))
sum_pr_commit %>% sdf_register("summaryPRMerged")
tbl_cache(sc, 'summaryPRMerged')
```
## Save Data to Parquet
Now that we have our merged dataset, let's save it to a Parquet file so we can consume it and analyze it with Microsoft R Server.
```{r save-to-parquet}
rxHadoopMakeDir("/joinedData/")
rxHadoopListFiles("/")
spark_write_parquet(joined_tbl,
path = "/joinedData/PR_users/")
```
## Loading Data in RevoScaleR
```{r}
myNameNode <- "default"
myPort <- 0
hdfsFS <- RxHdfsFileSystem(hostName = myNameNode,
port = myPort)
joined_parquet <- RxParquetData("/joinedData/PR_users",
fileSystem = hdfsFS)
computeContext <- RxSpark(consoleOutput=TRUE,
nameNode=myNameNode,
port=myPort,
executorCores=14,
executorMem = "20g",
executorOverheadMem = "7g",
persistentRun = TRUE,
extraSparkConfig = "--conf spark.speculation=true")
rxSetComputeContext(computeContext)
spark_disconnect(sc)
rxGetInfo(joined_parquet,getVarInfo = TRUE, numRows = 10)
dtree <- rxDTree(merged ~ commits + comments,data = joined_parquet)
rxStopEngine(computeContext)
```