Intel-bigdata / Spark Adaptive
Licence: apache-2.0
Stars: ✭ 277
Programming Languages
scala
5932 projects
Spark SQL Adaptive Execution
There are three main features in Adaptive Execution, including auto setting the shuffle partition number, optimizing join strategy at runtime and handling skewed join. These features can be enabled separately. To start with Adaptive Exection on Spark 2.3, please build branch ae-2.3-08
and at least set spark.sql.adaptive.enabled
to true. For users who enabled external shuffle service, please also upgrade external shuffle service to use adaptive execution feature.
An Engilish version design doc is available on google doc. A Chinese version blog is available on CSDN that introduces the features and benchmark results. SPARK-23128 is the Jira for contributing this work to Apache Spark.
Auto Setting The Shuffle Partition Number
Property Name | Default | Meaning |
---|---|---|
spark.sql.adaptive.enabled |
false | When true, enable adaptive query execution. |
spark.sql.adaptive.minNumPostShufflePartitions |
1 | The minimum number of post-shuffle partitions used in adaptive execution. This can be used to control the minimum parallelism. |
spark.sql.adaptive.maxNumPostShufflePartitions |
500 | The maximum number of post-shuffle partitions used in adaptive execution. This is also used as the initial shuffle partition number so please set it to an reasonable value. |
spark.sql.adaptive.shuffle.targetPostShuffleInputSize |
67108864 | The target post-shuffle input size in bytes of a task. By default is 64 MB. |
spark.sql.adaptive.shuffle.targetPostShuffleRowCount |
20000000 | The target post-shuffle row count of a task. This only takes effect if row count information is collected. |
Optimizing Join Strategy at Runtime
Property Name | Default | Meaning |
---|---|---|
spark.sql.adaptive.join.enabled |
true |
When true and spark.sql.adaptive.enabled is enabled, a better join strategy is determined at runtime.
|
spark.sql.adaptiveBroadcastJoinThreshold |
equals to spark.sql.autoBroadcastJoinThreshold
|
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join in adaptive exeuction mode. If not set, it equals to spark.sql.autoBroadcastJoinThreshold .
|
Handling Skewed Join
Property Name | Default | Meaning |
---|---|---|
spark.sql.adaptive.skewedJoin.enabled |
false |
When true and spark.sql.adaptive.enabled is enabled, a skewed join is automatically handled at runtime.
|
spark.sql.adaptive.skewedPartitionFactor |
10 |
A partition is considered as a skewed partition if its size is larger than this factor multiple the median partition size and also larger than spark.sql.adaptive.skewedPartitionSizeThreshold , or if its row count is larger than this factor multiple the median row count and also larger than spark.sql.adaptive.skewedPartitionRowCountThreshold .
|
spark.sql.adaptive.skewedPartitionSizeThreshold |
67108864 | Configures the minimum size in bytes for a partition that is considered as a skewed partition in adaptive skewed join. |
spark.sql.adaptive.skewedPartitionRowCountThreshold |
10000000 | Configures the minimum row count for a partition that is considered as a skewed partition in adaptive skewed join. |
spark.shuffle.statistics.verbose |
false | Collect shuffle statistics in verbose mode, including row counts etc. This is required for handling skewed join. |
Note that the project description data, including the texts, logos, images, and/or trademarks,
for each open source project belongs to its rightful owner.
If you wish to add or remove any projects, please contact us at [email protected].