Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

After WRITE ORDERED BY is set in the Iceberg table, Spark will generate an additional layer of Job tasks and no shuffle is generated. Is this a bug? #12268

Open
3 tasks done
SGITLOGIN opened this issue Feb 14, 2025 · 0 comments
Labels
bug Something isn't working

Comments

@SGITLOGIN
Copy link

SGITLOGIN commented Feb 14, 2025

Apache Iceberg version

1.6.1

Query engine

Spark 3.4.2

Please describe the bug 🐞

Question

  1. After WRITE ORDERED BY is set in the Iceberg table, Spark will generate an additional layer of Job tasks and no shuffle is generated, this will cause the data to be read one more time and the scan time to double. Is this a bug?

The reproduction process is as follows

Create table

spark.sql("""
CREATE TABLE test.iceberg_impression_log_05 (
  `motype` string,                 
  `third` string,                 
  `logdate` string, 
  `dt` string)
USING iceberg
PARTITIONED BY (dt)
LOCATION 'oss://huan-bigdata-test/test/changguowei/iceberg_impression_log_05/'
TBLPROPERTIES (
  'write.format.default' = 'orc',
  'write.metadata.delete-after-commit.enabled' = 'true',
  'write.metadata.previous-versions-max' = '2',
  'write.target-file-size-bytes' = '134217728',
  'write.orc.compression-codec' = 'zlib')
""").show()

Add sort field

spark.sql("""
ALTER TABLE test.iceberg_impression_log_05 WRITE ORDERED BY logdate ASC NULLS FIRST
""").show(false)

Insert

spark.sql("""
INSERT overwrite table test.iceberg_impression_log_05 partition(dt='2024-06-10')
select
    motype ,
    third ,
    logdate
FROM test.hive_impression_log
where dt = '2024-06-10'
""").show(false)

Spark execution process

Image Image

Spark WebUI Physical Plan

== Physical Plan ==
OverwriteByExpression (12)
+- AdaptiveSparkPlan (11)
   +- == Final Plan ==
      * Sort (7)
      +- AQEShuffleRead (6)
         +- ShuffleQueryStage (5), Statistics(sizeInBytes=11.5 GiB, rowCount=1.25E+8)
            +- Exchange (4)
               +- * Project (3)
                  +- * ColumnarToRow (2)
                     +- Scan orc spark_catalog.test.hive_impression_log (1)
   +- == Initial Plan ==
      Sort (10)
      +- Exchange (9)
         +- Project (8)
            +- Scan orc spark_catalog.test.hive_impression_log (1)


(1) Scan orc spark_catalog.test.hive_impression_log
Output [4]: [motype#4, third#5, logdate#6, dt#62]
Batched: true
Location: InMemoryFileIndex [oss://..../hive_huan_ad_monitor_impression_log_orc/dt=2024-06-10]
PartitionFilters: [isnotnull(dt#62), (dt#62 = 2024-06-10)]
ReadSchema: struct<motype:string,third:string,logdate:string>

(2) ColumnarToRow [codegen id : 1]
Input [4]: [motype#4, third#5, logdate#6, dt#62]

(3) Project [codegen id : 1]
Output [4]: [motype#4, third#5, logdate#6, 2024-06-10 AS dt#71]
Input [4]: [motype#4, third#5, logdate#6, dt#62]

(4) Exchange
Input [4]: [motype#4, third#5, logdate#6, dt#71]
Arguments: rangepartitioning(dt#71 ASC NULLS FIRST, logdate#6 ASC NULLS FIRST, 200), REBALANCE_PARTITIONS_BY_COL, [plan_id=31]

(5) ShuffleQueryStage
Output [4]: [motype#4, third#5, logdate#6, dt#71]
Arguments: 0

(6) AQEShuffleRead
Input [4]: [motype#4, third#5, logdate#6, dt#71]
Arguments: coalesced

(7) Sort [codegen id : 2]
Input [4]: [motype#4, third#5, logdate#6, dt#71]
Arguments: [dt#71 ASC NULLS FIRST, logdate#6 ASC NULLS FIRST], false, 0

(8) Project
Output [4]: [motype#4, third#5, logdate#6, 2024-06-10 AS dt#71]
Input [4]: [motype#4, third#5, logdate#6, dt#62]

(9) Exchange
Input [4]: [motype#4, third#5, logdate#6, dt#71]
Arguments: rangepartitioning(dt#71 ASC NULLS FIRST, logdate#6 ASC NULLS FIRST, 200), REBALANCE_PARTITIONS_BY_COL, [plan_id=14]

(10) Sort
Input [4]: [motype#4, third#5, logdate#6, dt#71]
Arguments: [dt#71 ASC NULLS FIRST, logdate#6 ASC NULLS FIRST], false, 0

(11) AdaptiveSparkPlan
Output [4]: [motype#4, third#5, logdate#6, dt#71]
Arguments: isFinalPlan=true

(12) OverwriteByExpression
Input [4]: [motype#4, third#5, logdate#6, dt#71]
Arguments: org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3691/969421003@612bfb57, IcebergWrite(table=spark_catalog.test.iceberg_impression_log_05, format=ORC)

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@SGITLOGIN SGITLOGIN added the bug Something isn't working label Feb 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant