From f9500f5c0e958465a53711e4cd0e58aedb26b888 Mon Sep 17 00:00:00 2001 From: joke1196 Date: Thu, 30 Jan 2025 13:21:41 +0000 Subject: [PATCH 1/2] Create rule S7194 --- rules/S7194/metadata.json | 2 ++ rules/S7194/python/metadata.json | 25 ++++++++++++++++++ rules/S7194/python/rule.adoc | 44 ++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+) create mode 100644 rules/S7194/metadata.json create mode 100644 rules/S7194/python/metadata.json create mode 100644 rules/S7194/python/rule.adoc diff --git a/rules/S7194/metadata.json b/rules/S7194/metadata.json new file mode 100644 index 00000000000..2c63c085104 --- /dev/null +++ b/rules/S7194/metadata.json @@ -0,0 +1,2 @@ +{ +} diff --git a/rules/S7194/python/metadata.json b/rules/S7194/python/metadata.json new file mode 100644 index 00000000000..92dc5b3ee62 --- /dev/null +++ b/rules/S7194/python/metadata.json @@ -0,0 +1,25 @@ +{ + "title": "FIXME", + "type": "CODE_SMELL", + "status": "ready", + "remediation": { + "func": "Constant\/Issue", + "constantCost": "5min" + }, + "tags": [ + ], + "defaultSeverity": "Major", + "ruleSpecification": "RSPEC-7194", + "sqKey": "S7194", + "scope": "All", + "defaultQualityProfiles": ["Sonar way"], + "quickfix": "unknown", + "code": { + "impacts": { + "MAINTAINABILITY": "HIGH", + "RELIABILITY": "MEDIUM", + "SECURITY": "LOW" + }, + "attribute": "CONVENTIONAL" + } +} diff --git a/rules/S7194/python/rule.adoc b/rules/S7194/python/rule.adoc new file mode 100644 index 00000000000..caae0d69054 --- /dev/null +++ b/rules/S7194/python/rule.adoc @@ -0,0 +1,44 @@ +FIXME: add a description + +// If you want to factorize the description uncomment the following line and create the file. +//include::../description.adoc[] + +== Why is this an issue? + +FIXME: remove the unused optional headers (that are commented out) + +//=== What is the potential impact? + +== How to fix it +//== How to fix it in FRAMEWORK NAME + +=== Code examples + +==== Noncompliant code example + +[source,python,diff-id=1,diff-type=noncompliant] +---- +FIXME +---- + +==== Compliant solution + +[source,python,diff-id=1,diff-type=compliant] +---- +FIXME +---- + +//=== How does this work? + +//=== Pitfalls + +//=== Going the extra mile + + +//== Resources +//=== Documentation +//=== Articles & blog posts +//=== Conference presentations +//=== Standards +//=== External coding guidelines +//=== Benchmarks From 6916af074b3fe979ab19eaf43438fbfefab33ff8 Mon Sep 17 00:00:00 2001 From: David Kunzmann Date: Thu, 30 Jan 2025 15:11:06 +0100 Subject: [PATCH 2/2] Create rule S7194: PySpark broadcasting should be used when joining a small DataFrame to a larger DataFrame. --- rules/S7194/python/metadata.json | 10 ++--- rules/S7194/python/rule.adoc | 64 ++++++++++++++++++++++---------- 2 files changed, 50 insertions(+), 24 deletions(-) diff --git a/rules/S7194/python/metadata.json b/rules/S7194/python/metadata.json index 92dc5b3ee62..5b8baac2bc0 100644 --- a/rules/S7194/python/metadata.json +++ b/rules/S7194/python/metadata.json @@ -1,5 +1,5 @@ { - "title": "FIXME", + "title": "PySpark broadcasting should be used when joining a small DataFrame to a larger DataFrame", "type": "CODE_SMELL", "status": "ready", "remediation": { @@ -7,6 +7,8 @@ "constantCost": "5min" }, "tags": [ + "data-science", + "pyspark" ], "defaultSeverity": "Major", "ruleSpecification": "RSPEC-7194", @@ -16,10 +18,8 @@ "quickfix": "unknown", "code": { "impacts": { - "MAINTAINABILITY": "HIGH", - "RELIABILITY": "MEDIUM", - "SECURITY": "LOW" + "RELIABILITY": "LOW" }, - "attribute": "CONVENTIONAL" + "attribute": "EFFICIENT" } } diff --git a/rules/S7194/python/rule.adoc b/rules/S7194/python/rule.adoc index caae0d69054..3649705fceb 100644 --- a/rules/S7194/python/rule.adoc +++ b/rules/S7194/python/rule.adoc @@ -1,16 +1,18 @@ -FIXME: add a description - -// If you want to factorize the description uncomment the following line and create the file. -//include::../description.adoc[] +This rule raises an issue when a small DataFrame is joined to another DataFrame without the use of the broadcast operation. == Why is this an issue? -FIXME: remove the unused optional headers (that are commented out) +In PySpark, shuffling refers to the process of transferring data between worker nodes within a cluster. +This operation, while necessary for tasks such as join and aggregation on DataFrames, can be resource-intensive. +Although Spark handles shuffling automatically, there are strategies to minimize it, thereby enhancing the performance of these operations. -//=== What is the potential impact? +When performing join operations with multiple DataFrames in PySpark, it is crucial to consider the size of the DataFrames involved. +If a small DataFrame is being joined to a larger one, utilizing the `broadcast` function to distribute the small DataFrame across all worker nodes can be beneficial. +This approach significantly reduces the volume of data shuffled between nodes, thereby improving the efficiency of the join operation. == How to fix it -//== How to fix it in FRAMEWORK NAME + +To fix this issue, use the `broadcast` function on the small DataFrame before performing the join operation. === Code examples @@ -18,27 +20,51 @@ FIXME: remove the unused optional headers (that are commented out) [source,python,diff-id=1,diff-type=noncompliant] ---- -FIXME +from pyspark.sql import SparkSession + +spark = SparkSession.builder.appName('myspark').getOrCreate() + +data = [ + (1, "Alice"), + (2, "Bob"), + (2, "Charlie"), + (1, "Dan"), + (2, "Elsa") +] +large_df = spark.createDataFrame(data, ["department_id", "name"]) +small_df = spark.createDataFrame([(1, 'HR'), (2, 'Finance')], ["department_id", "department"]) + +joined_df = large_df.join(small_df, on="department_id", how="left") # NonCompliant: the small DataFrame is not broadcasted ---- ==== Compliant solution [source,python,diff-id=1,diff-type=compliant] ---- -FIXME +from pyspark.sql import SparkSession +from pyspark.sql.functions import broadcast + +spark = SparkSession.builder.appName('myspark').getOrCreate() + +data = [ + (1, "Alice"), + (2, "Bob"), + (2, "Charlie"), + (1, "Dan"), + (2, "Elsa") +] +large_df = spark.createDataFrame(data, ["department_id", "name"]) +small_df = spark.createDataFrame([(1, 'HR'), (2, 'Finance')], ["department_id", "department"]) + +joined_df = large_df.join(broadcast(small_df), on="department_id", how="left") # Compliant ---- -//=== How does this work? +== Resources +=== Documentation -//=== Pitfalls +* PySpark Documentation - https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.broadcast.html[pyspark.sql.functions.broadcast] -//=== Going the extra mile +=== Articles & blog posts +* Medium Article - https://aspinfo.medium.com/what-is-broadcast-join-how-to-perform-broadcast-in-pyspark-699aef2eff5a[What is broadcast join, how to perform broadcast in pyspark?] -//== Resources -//=== Documentation -//=== Articles & blog posts -//=== Conference presentations -//=== Standards -//=== External coding guidelines -//=== Benchmarks