Skip to content

Commit

Permalink
MemoryManager and Page Size + Configuration Properties
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Jun 13, 2023
1 parent 7a97a27 commit 8ef51da
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 49 deletions.
134 changes: 92 additions & 42 deletions docs/configuration-properties.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,5 @@
# Configuration Properties

## spark.scheduler

### barrier.maxConcurrentTasksCheck.interval { #spark.scheduler.barrier.maxConcurrentTasksCheck.interval }

**spark.scheduler.barrier.maxConcurrentTasksCheck.interval**

### barrier.maxConcurrentTasksCheck.maxFailures { #spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures }

**spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures**

## <span id="spark.shuffle.sort.io.plugin.class"><span id="SHUFFLE_IO_PLUGIN_CLASS"> shuffle.sort.io.plugin.class

**spark.shuffle.sort.io.plugin.class**

Name of the class to use for [shuffle IO](shuffle/ShuffleDataIO.md)

Default: [LocalDiskShuffleDataIO](shuffle/LocalDiskShuffleDataIO.md)

Used when:

* `ShuffleDataIOUtils` is requested to [loadShuffleDataIO](shuffle/ShuffleDataIOUtils.md#loadShuffleDataIO)

## <span id="spark.app.id"> spark.app.id

Unique identifier of a Spark application that Spark uses to uniquely identify [metric sources](metrics/MetricsSystem.md#buildRegistryName).
Expand Down Expand Up @@ -55,6 +33,18 @@ Used when:
* `TorrentBroadcast` is requested to [setConf](broadcast-variables/TorrentBroadcast.md#compressionCodec)
* `SerializerManager` is [created](serializer/SerializerManager.md#compressBroadcast)

## <span id="BUFFER_PAGESIZE"> spark.buffer.pageSize { #spark.buffer.pageSize }

**spark.buffer.pageSize**

The amount of memory used per page (in bytes)

Default: (undefined)

Used when:

* `MemoryManager` is [created](memory/MemoryManager.md#pageSizeBytes)

## <span id="spark.cleaner.referenceTracking"><span id="CLEANER_REFERENCE_TRACKING"> spark.cleaner.referenceTracking

Controls whether to enable [ContextCleaner](core/ContextCleaner.md)
Expand Down Expand Up @@ -483,15 +473,29 @@ Used when:
* `Executor` is [created](executor/Executor.md#maxDirectResultSize)
* `MapOutputTrackerMaster` is [created](scheduler/MapOutputTrackerMaster.md#maxRpcMessageSize) (and makes sure that [spark.shuffle.mapOutput.minSizeForBroadcast](#spark.shuffle.mapOutput.minSizeForBroadcast) is below the threshold)

## <span id="spark.scheduler.minRegisteredResourcesRatio"> spark.scheduler.minRegisteredResourcesRatio
## spark.scheduler

### barrier.maxConcurrentTasksCheck.interval { #spark.scheduler.barrier.maxConcurrentTasksCheck.interval }

**spark.scheduler.barrier.maxConcurrentTasksCheck.interval**

### barrier.maxConcurrentTasksCheck.maxFailures { #spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures }

**spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures**

### minRegisteredResourcesRatio { #spark.scheduler.minRegisteredResourcesRatio }

**spark.scheduler.minRegisteredResourcesRatio**

Minimum ratio of (registered resources / total expected resources) before submitting tasks

Default: (undefined)

## <span id="spark.scheduler.revive.interval"><span id="SCHEDULER_REVIVE_INTERVAL"> spark.scheduler.revive.interval
### <span id="SCHEDULER_REVIVE_INTERVAL"> spark.scheduler.revive.interval { #spark.scheduler.revive.interval }

**Revive Interval** that is the time (in millis) between resource offers revives
**spark.scheduler.revive.interval**

The time (in millis) between resource offers revives

Default: `1s`

Expand All @@ -510,21 +514,41 @@ Used when:
* `SparkEnv` utility is used to [create a SparkEnv](SparkEnv.md#create)
* `SparkConf` is requested to [registerKryoClasses](SparkConf.md#registerKryoClasses) (as a side-effect)

## <span id="spark.shuffle.checksum.enabled"><span id="SHUFFLE_CHECKSUM_ENABLED"> spark.shuffle.checksum.enabled
## spark.shuffle

### sort.io.plugin.class { #spark.shuffle.sort.io.plugin.class }

**spark.shuffle.sort.io.plugin.class**

Name of the class to use for [shuffle IO](shuffle/ShuffleDataIO.md)

Default: [LocalDiskShuffleDataIO](shuffle/LocalDiskShuffleDataIO.md)

Used when:

* `ShuffleDataIOUtils` is requested to [loadShuffleDataIO](shuffle/ShuffleDataIOUtils.md#loadShuffleDataIO)

### <span id="SHUFFLE_CHECKSUM_ENABLED"> checksum.enabled { #spark.shuffle.checksum.enabled }

**spark.shuffle.checksum.enabled**

Controls checksuming of shuffle data.
If enabled, Spark will calculate the checksum values for each partition data within the map output file and store the values in a checksum file on the disk.
When there's shuffle data corruption detected, Spark will try to diagnose the cause (e.g., network issue, disk issue, etc.) of the corruption by using the checksum file.

Default: `true`

## <span id="spark.shuffle.compress"> spark.shuffle.compress
### compress { #spark.shuffle.compress }

**spark.shuffle.compress**

Controls whether to compress shuffle output when stored
Enables compressing shuffle output when stored

Default: `true`

## <span id="spark.shuffle.detectCorrupt"><span id="SHUFFLE_DETECT_CORRUPT"> spark.shuffle.detectCorrupt
### <span id="SHUFFLE_DETECT_CORRUPT"> detectCorrupt { #spark.shuffle.detectCorrupt }

**spark.shuffle.detectCorrupt**

Controls corruption detection in fetched blocks

Expand All @@ -534,7 +558,9 @@ Used when:

* `BlockStoreShuffleReader` is requested to [read combined records for a reduce task](shuffle/BlockStoreShuffleReader.md#read)

## <span id="spark.shuffle.detectCorrupt.useExtraMemory"><span id="SHUFFLE_DETECT_CORRUPT_MEMORY"> spark.shuffle.detectCorrupt.useExtraMemory
### <span id="SHUFFLE_DETECT_CORRUPT_MEMORY"> detectCorrupt.useExtraMemory { #spark.shuffle.detectCorrupt.useExtraMemory }

**spark.shuffle.detectCorrupt.useExtraMemory**

If enabled, part of a compressed/encrypted stream will be de-compressed/de-crypted by using extra memory to detect early corruption. Any `IOException` thrown will cause the task to be retried once and if it fails again with same exception, then `FetchFailedException` will be thrown to retry previous stage

Expand All @@ -544,7 +570,9 @@ Used when:

* `BlockStoreShuffleReader` is requested to [read combined records for a reduce task](shuffle/BlockStoreShuffleReader.md#read)

## <span id="spark.shuffle.file.buffer"> spark.shuffle.file.buffer
### file.buffer { #spark.shuffle.file.buffer }

**spark.shuffle.file.buffer**

Size of the in-memory buffer for each shuffle file output stream, in KiB unless otherwise specified. These buffers reduce the number of disk seeks and system calls made in creating intermediate shuffle files.

Expand All @@ -560,7 +588,9 @@ Used when the following are created:
* [ExternalAppendOnlyMap](shuffle/ExternalAppendOnlyMap.md)
* [ExternalSorter](shuffle/ExternalSorter.md)

## <span id="spark.shuffle.manager"> spark.shuffle.manager
### manager { #spark.shuffle.manager }

**spark.shuffle.manager**

A fully-qualified class name or the alias of the [ShuffleManager](shuffle/ShuffleManager.md) in a Spark application

Expand All @@ -573,7 +603,9 @@ Supported aliases:

Used when `SparkEnv` object is requested to [create a "base" SparkEnv for a driver or an executor](SparkEnv.md#create)

## <span id="spark.shuffle.mapOutput.parallelAggregationThreshold"><span id="SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD"> spark.shuffle.mapOutput.parallelAggregationThreshold
### <span id="SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD"> mapOutput.parallelAggregationThreshold { #spark.shuffle.mapOutput.parallelAggregationThreshold }

**spark.shuffle.mapOutput.parallelAggregationThreshold**

**(internal)** Multi-thread is used when the number of mappers * shuffle partitions is greater than or equal to this threshold. Note that the actual parallelism is calculated by number of mappers * shuffle partitions / this threshold + 1, so this threshold should be positive.

Expand All @@ -583,15 +615,19 @@ Used when:

* `MapOutputTrackerMaster` is requested for the [statistics of a ShuffleDependency](scheduler/MapOutputTrackerMaster.md#getStatistics)

## <span id="spark.shuffle.minNumPartitionsToHighlyCompress"><span id="SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS"> spark.shuffle.minNumPartitionsToHighlyCompress
### <span id="SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS"> minNumPartitionsToHighlyCompress { #spark.shuffle.minNumPartitionsToHighlyCompress }

**spark.shuffle.minNumPartitionsToHighlyCompress**

**(internal)** Minimum number of partitions (threshold) for `MapStatus` utility to prefer a [HighlyCompressedMapStatus](scheduler/MapStatus.md#HighlyCompressedMapStatus) (over [CompressedMapStatus](scheduler/MapStatus.md#CompressedMapStatus)) (for [ShuffleWriters](shuffle/ShuffleWriter.md)).

Default: `2000`

Must be a positive integer (above `0`)

## <span id="spark.shuffle.push.enabled"><span id="PUSH_BASED_SHUFFLE_ENABLED"> spark.shuffle.push.enabled
### <span id="PUSH_BASED_SHUFFLE_ENABLED"> push.enabled { #spark.shuffle.push.enabled }

**spark.shuffle.push.enabled**

Enables push-based shuffle on the client side

Expand All @@ -603,21 +639,27 @@ Used when:

* `Utils` utility is used to [determine whether push-based shuffle is enabled or not](Utils.md#isPushBasedShuffleEnabled)

## <span id="spark.shuffle.readHostLocalDisk"><span id="SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED"> spark.shuffle.readHostLocalDisk
### <span id="SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED"> readHostLocalDisk { #spark.shuffle.readHostLocalDisk }

**spark.shuffle.readHostLocalDisk**

If enabled (with [spark.shuffle.useOldFetchProtocol](#spark.shuffle.useOldFetchProtocol) disabled and [spark.shuffle.service.enabled](external-shuffle-service/configuration-properties.md#spark.shuffle.service.enabled) enabled), shuffle blocks requested from those block managers which are running on the same host are read from the disk directly instead of being fetched as remote blocks over the network.

Default: `true`

## <span id="spark.shuffle.registration.maxAttempts"><span id="SHUFFLE_REGISTRATION_MAX_ATTEMPTS"> spark.shuffle.registration.maxAttempts
### <span id="SHUFFLE_REGISTRATION_MAX_ATTEMPTS"> registration.maxAttempts { #spark.shuffle.registration.maxAttempts }

**spark.shuffle.registration.maxAttempts**

How many attempts to [register a BlockManager with External Shuffle Service](storage/BlockManager.md#registerWithExternalShuffleServer)

Default: `3`

Used when `BlockManager` is requested to [register with External Shuffle Server](storage/BlockManager.md#registerWithExternalShuffleServer)

## <span id="spark.shuffle.sort.bypassMergeThreshold"><span id="SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD"> spark.shuffle.sort.bypassMergeThreshold
### <span id="SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD"> sort.bypassMergeThreshold { #spark.shuffle.sort.bypassMergeThreshold }

**spark.shuffle.sort.bypassMergeThreshold**

Maximum number of reduce partitions below which [SortShuffleManager](shuffle/SortShuffleManager.md) avoids merge-sorting data for no map-side aggregation

Expand All @@ -628,15 +670,19 @@ Used when:
* `SortShuffleWriter` utility is used to [shouldBypassMergeSort](shuffle/SortShuffleWriter.md#shouldBypassMergeSort)
* `ShuffleExchangeExec` ([Spark SQL]({{ book.spark_sql }}/physical-operators/ShuffleExchangeExec)) physical operator is requested to `prepareShuffleDependency`

## <span id="spark.shuffle.spill.initialMemoryThreshold"> spark.shuffle.spill.initialMemoryThreshold
### spill.initialMemoryThreshold { #spark.shuffle.spill.initialMemoryThreshold }

**spark.shuffle.spill.initialMemoryThreshold**

Initial threshold for the size of an in-memory collection

Default: 5MB

Used by [Spillable](shuffle/Spillable.md#initialMemoryThreshold)

## <span id="spark.shuffle.spill.numElementsForceSpillThreshold"><span id="SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD"> spark.shuffle.spill.numElementsForceSpillThreshold
### <span id="SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD"> spill.numElementsForceSpillThreshold { #spark.shuffle.spill.numElementsForceSpillThreshold }

**spark.shuffle.spill.numElementsForceSpillThreshold**

**(internal)** The maximum number of elements in memory before forcing the shuffle sorter to spill.

Expand All @@ -653,15 +699,19 @@ Used when:
* Spark SQL's `UnsafeExternalRowSorter` is created
* Spark SQL's `UnsafeFixedWidthAggregationMap` is requested for an `UnsafeKVExternalSorter`

## <span id="spark.shuffle.sync"><span id="SHUFFLE_SYNC"> spark.shuffle.sync
### <span id="SHUFFLE_SYNC"> sync { #spark.shuffle.sync }

**spark.shuffle.sync**

Controls whether `DiskBlockObjectWriter` should force outstanding writes to disk while [committing a single atomic block](storage/DiskBlockObjectWriter.md#commitAndGet) (i.e. all operating system buffers should synchronize with the disk to ensure that all changes to a file are in fact recorded in the storage)

Default: `false`

Used when `BlockManager` is requested for a [DiskBlockObjectWriter](storage/BlockManager.md#getDiskWriter)

## <span id="spark.shuffle.useOldFetchProtocol"><span id="SHUFFLE_USE_OLD_FETCH_PROTOCOL"> spark.shuffle.useOldFetchProtocol
### <span id="SHUFFLE_USE_OLD_FETCH_PROTOCOL"> useOldFetchProtocol { #spark.shuffle.useOldFetchProtocol }

**spark.shuffle.useOldFetchProtocol**

Whether to use the old protocol while doing the shuffle block fetching. It is only enabled while we need the compatibility in the scenario of new Spark version job fetching shuffle blocks from old version external shuffle service.

Expand Down
36 changes: 31 additions & 5 deletions docs/memory/MemoryManager.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# MemoryManager

`MemoryManager` is an [abstraction](#contract) of [memory managers](#implementations) that can share available memory between task execution ([TaskMemoryManager](TaskMemoryManager.md#memoryManager)) and storage ([BlockManager](../storage/BlockManager.md#memoryManager)).
`MemoryManager` is an [abstraction](#contract) of [memory managers](#implementations) that can share available memory between tasks ([TaskMemoryManager](TaskMemoryManager.md#memoryManager)) and storage ([BlockManager](../storage/BlockManager.md#memoryManager)).

![MemoryManager and Core Services](../images/memory/MemoryManager.png)

Expand Down Expand Up @@ -95,16 +95,23 @@ Used when:
??? note "Abstract Class"
`MemoryManager` is an abstract class and cannot be created directly. It is created indirectly for the [concrete MemoryManagers](#implementations).

## <span id="SparkEnv"> Accessing MemoryManager (SparkEnv)
## Accessing MemoryManager { #SparkEnv }

`MemoryManager` is available as [SparkEnv.memoryManager](../SparkEnv.md#memoryManager) on the driver and executors.

```text
```scala
import org.apache.spark.SparkEnv
val mm = SparkEnv.get.memoryManager
```

```scala
// MemoryManager is private[spark]
// the following won't work unless within org.apache.spark package
// import org.apache.spark.memory.MemoryManager
// assert(mm.isInstanceOf[MemoryManager])

scala> :type mm
org.apache.spark.memory.MemoryManager
// we have to revert to string comparision 😔
assert("UnifiedMemoryManager".equals(mm.getClass.getSimpleName))
```

## <span id="setMemoryStore"> Associating MemoryStore with Storage Memory Pools
Expand Down Expand Up @@ -241,3 +248,22 @@ MemoryMode | MemoryAllocator
`tungstenMemoryAllocator` is used when:

* `TaskMemoryManager` is requested to [allocate a memory page](TaskMemoryManager.md#allocatePage), [release a memory page](TaskMemoryManager.md#freePage) and [clean up all the allocated memory](TaskMemoryManager.md#cleanUpAllAllocatedMemory)

## Page Size { #pageSizeBytes }

`pageSizeBytes` is either [spark.buffer.pageSize](../configuration-properties.md#spark.buffer.pageSize), if defined, or the [default page size](#defaultPageSizeBytes).

`pageSizeBytes` is used when:

* `TaskMemoryManager` is requested for the [page size](TaskMemoryManager.md#pageSizeBytes)

### Default Page Size { #defaultPageSizeBytes }

```scala
defaultPageSizeBytes: Long
```

??? note "Lazy Value"
`defaultPageSizeBytes` is a Scala **lazy value** to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the [Scala Language Specification]({{ scala.spec }}/05-classes-and-objects.html#lazy).
6 changes: 4 additions & 2 deletions docs/memory/TaskMemoryManager.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,15 @@ In the end, `releaseExecutionMemory` requests the [MemoryManager](#memoryManager
* `MemoryConsumer` is requested to [free up memory](MemoryConsumer.md#freeMemory)
* `TaskMemoryManager` is requested to [allocatePage](#allocatePage) and [freePage](#freePage)

## <span id="pageSizeBytes"> Page Size
## Page Size { #pageSizeBytes }

```java
long pageSizeBytes()
```

`pageSizeBytes` requests the [MemoryManager](#memoryManager) for the [pageSizeBytes](MemoryManager.md#pageSizeBytes).
`pageSizeBytes` requests the [MemoryManager](#memoryManager) for the [page size](MemoryManager.md#pageSizeBytes).

---

`pageSizeBytes` is used when:

Expand Down

0 comments on commit 8ef51da

Please sign in to comment.