Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create rule S7194: PySpark broadcasting should be used when joining a small DataFrame to a larger DataFrame #4637

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rules/S7194/metadata.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{
}
25 changes: 25 additions & 0 deletions rules/S7194/python/metadata.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"title": "PySpark broadcasting should be used when joining a small DataFrame to a larger DataFrame",
"type": "CODE_SMELL",
"status": "ready",
"remediation": {
"func": "Constant\/Issue",
"constantCost": "5min"
},
"tags": [
"data-science",
"pyspark"
],
"defaultSeverity": "Major",
"ruleSpecification": "RSPEC-7194",
"sqKey": "S7194",
"scope": "All",
"defaultQualityProfiles": ["Sonar way"],
"quickfix": "unknown",
"code": {
"impacts": {
"RELIABILITY": "LOW"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the impact when the attribute is efficient

},
"attribute": "EFFICIENT"
}
}
70 changes: 70 additions & 0 deletions rules/S7194/python/rule.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
This rule raises an issue when a small DataFrame is joined to another DataFrame without the use of the broadcast operation.

== Why is this an issue?

In PySpark, shuffling refers to the process of transferring data between worker nodes within a cluster.
This operation, while necessary for tasks such as join and aggregation on DataFrames, can be resource-intensive.
Although Spark handles shuffling automatically, there are strategies to minimize it, thereby enhancing the performance of these operations.

When performing join operations with multiple DataFrames in PySpark, it is crucial to consider the size of the DataFrames involved.
If a small DataFrame is being joined to a larger one, utilizing the `broadcast` function to distribute the small DataFrame across all worker nodes can be beneficial.
This approach significantly reduces the volume of data shuffled between nodes, thereby improving the efficiency of the join operation.

== How to fix it

To fix this issue, use the `broadcast` function on the small DataFrame before performing the join operation.

=== Code examples

==== Noncompliant code example

[source,python,diff-id=1,diff-type=noncompliant]
----
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('myspark').getOrCreate()

data = [
(1, "Alice"),
(2, "Bob"),
(2, "Charlie"),
(1, "Dan"),
(2, "Elsa")
]
large_df = spark.createDataFrame(data, ["department_id", "name"])
small_df = spark.createDataFrame([(1, 'HR'), (2, 'Finance')], ["department_id", "department"])

joined_df = large_df.join(small_df, on="department_id", how="left") # NonCompliant: the small DataFrame is not broadcasted
----

==== Compliant solution

[source,python,diff-id=1,diff-type=compliant]
----
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

spark = SparkSession.builder.appName('myspark').getOrCreate()

data = [
(1, "Alice"),
(2, "Bob"),
(2, "Charlie"),
(1, "Dan"),
(2, "Elsa")
]
large_df = spark.createDataFrame(data, ["department_id", "name"])
small_df = spark.createDataFrame([(1, 'HR'), (2, 'Finance')], ["department_id", "department"])

joined_df = large_df.join(broadcast(small_df), on="department_id", how="left") # Compliant
----

== Resources
=== Documentation

* PySpark Documentation - https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.broadcast.html[pyspark.sql.functions.broadcast]

=== Articles & blog posts

* Medium Article - https://aspinfo.medium.com/what-is-broadcast-join-how-to-perform-broadcast-in-pyspark-699aef2eff5a[What is broadcast join, how to perform broadcast in pyspark?]