Skip to content

Commit

Permalink
Unified Spill numPartitionBits (#9095)
Browse files Browse the repository at this point in the history
Summary:
Deprecate the old `QueryConfig::kDeprecatedJoinSpillPartitionBits`,
and use the unified `QueryConfig::kNumSpillPartitionBits`.

Pull Request resolved: #9095

Reviewed By: tanjialiang

Differential Revision: D54980978

Pulled By: xiaoxmeng

fbshipit-source-id: 12e94c2280069076acef85e72085ec210a525686
  • Loading branch information
duanmeng authored and facebook-github-bot committed Mar 17, 2024
1 parent 6c2441b commit 901662f
Show file tree
Hide file tree
Showing 15 changed files with 108 additions and 100 deletions.
12 changes: 4 additions & 8 deletions velox/common/base/SpillConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ SpillConfig::SpillConfig(
int32_t _minSpillableReservationPct,
int32_t _spillableReservationGrowthPct,
uint8_t _startPartitionBit,
uint8_t _joinPartitionBits,
uint8_t _rowNumberPartitionBits,
uint8_t _numPartitionBits,
int32_t _maxSpillLevel,
uint64_t _maxSpillRunRows,
uint64_t _writerFlushThresholdSize,
Expand All @@ -48,8 +47,7 @@ SpillConfig::SpillConfig(
minSpillableReservationPct(_minSpillableReservationPct),
spillableReservationGrowthPct(_spillableReservationGrowthPct),
startPartitionBit(_startPartitionBit),
joinPartitionBits(_joinPartitionBits),
rowNumberPartitionBits(_rowNumberPartitionBits),
numPartitionBits(_numPartitionBits),
maxSpillLevel(_maxSpillLevel),
maxSpillRunRows(_maxSpillRunRows),
writerFlushThresholdSize(_writerFlushThresholdSize),
Expand All @@ -61,9 +59,7 @@ SpillConfig::SpillConfig(
"Spillable memory reservation growth pct should not be lower than minimum available pct");
}

int32_t SpillConfig::spillLevel(
uint8_t startBitOffset,
uint8_t numPartitionBits) const {
int32_t SpillConfig::spillLevel(uint8_t startBitOffset) const {
VELOX_CHECK_LE(
startBitOffset + numPartitionBits,
64,
Expand All @@ -90,6 +86,6 @@ bool SpillConfig::exceedSpillLevelLimit(
if (maxSpillLevel == -1) {
return false;
}
return spillLevel(startBitOffset, numPartitionBits) > maxSpillLevel;
return spillLevel(startBitOffset) > maxSpillLevel;
}
} // namespace facebook::velox::common
13 changes: 4 additions & 9 deletions velox/common/base/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ struct SpillConfig {
int32_t _minSpillableReservationPct,
int32_t _spillableReservationGrowthPct,
uint8_t _startPartitionBit,
uint8_t _joinPartitionBits,
uint8_t _rowNumberPartitionBits,
uint8_t _numPartitionBits,
int32_t _maxSpillLevel,
uint64_t _maxSpillRunRows,
uint64_t _writerFlushThresholdSize,
Expand All @@ -69,7 +68,7 @@ struct SpillConfig {
///
/// NOTE: we advance (or right shift) the partition bit offset when goes to
/// the next level of recursive spilling.
int32_t spillLevel(uint8_t startBitOffset, uint8_t numPartitionBits) const;
int32_t spillLevel(uint8_t startBitOffset) const;

/// Checks if the given 'startBitOffset' and 'numPartitionBits' has exceeded
/// the max hash join spill limit.
Expand Down Expand Up @@ -119,13 +118,9 @@ struct SpillConfig {
/// The start partition bit offset of the top (the first level) partitions.
uint8_t startPartitionBit;

/// Used to calculate the spill hash partition number for hash join with
/// 'startPartitionBit'.
uint8_t joinPartitionBits;

/// Used to calculate the spill partition number of the hash table in
/// Used to calculate the spill hash partition number for hash join and
/// RowNumber with 'startPartitionBit'.
uint8_t rowNumberPartitionBits;
uint8_t numPartitionBits;

/// The max allowed spilling level with zero being the initial spilling
/// level. This only applies for hash build spilling which needs recursive
Expand Down
10 changes: 2 additions & 8 deletions velox/common/base/tests/SpillConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ TEST(SpillConfig, spillLevel) {
0,
kInitialBitOffset,
kNumPartitionsBits,
kNumPartitionsBits,
0,
0,
0,
Expand All @@ -64,12 +63,9 @@ TEST(SpillConfig, spillLevel) {
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
if (testData.expectedLevel == -1) {
ASSERT_ANY_THROW(
config.spillLevel(testData.bitOffset, kNumPartitionsBits));
ASSERT_ANY_THROW(config.spillLevel(testData.bitOffset));
} else {
ASSERT_EQ(
config.spillLevel(testData.bitOffset, kNumPartitionsBits),
testData.expectedLevel);
ASSERT_EQ(config.spillLevel(testData.bitOffset), testData.expectedLevel);
}
}
}
Expand Down Expand Up @@ -126,7 +122,6 @@ TEST(SpillConfig, spillLevelLimit) {
0,
testData.startBitOffset,
testData.numBits,
testData.numBits,
testData.maxSpillLevel,
0,
0,
Expand Down Expand Up @@ -175,7 +170,6 @@ TEST(SpillConfig, spillableReservationPercentages) {
0,
0,
0,
0,
1'000'000,
0,
"none");
Expand Down
6 changes: 3 additions & 3 deletions velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ DEBUG_ONLY_TEST_F(
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kJoinSpillEnabled, "true")
.config(core::QueryConfig::kJoinSpillPartitionBits, "2")
.config(core::QueryConfig::kNumSpillPartitionBits, "2")
.maxDrivers(numDrivers)
.plan(PlanBuilder()
.values(vectors)
Expand Down Expand Up @@ -756,7 +756,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenMaybeReserveAndTaskAbort) {
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kJoinSpillEnabled, "true")
.config(core::QueryConfig::kJoinSpillPartitionBits, "2")
.config(core::QueryConfig::kNumSpillPartitionBits, "2")
.maxDrivers(numDrivers)
.plan(PlanBuilder()
.values(vectors)
Expand Down Expand Up @@ -817,7 +817,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, asyncArbitratonFromNonDriverContext) {
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kJoinSpillEnabled, "true")
.config(core::QueryConfig::kJoinSpillPartitionBits, "2")
.config(core::QueryConfig::kNumSpillPartitionBits, "2")
.plan(PlanBuilder()
.values(vectors)
.localPartition({"c0", "c1"})
Expand Down
1 change: 0 additions & 1 deletion velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
0,
0,
0,
0,
writerFlushThreshold,
"none");
}
Expand Down
15 changes: 9 additions & 6 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,12 @@ class QueryConfig {
static constexpr const char* kSpillStartPartitionBit =
"spiller_start_partition_bit";

/// !!! DEPRECATED: do not use.
static constexpr const char* kJoinSpillPartitionBits =
"join_spiller_partition_bits";

static constexpr const char* kRowNumberSpillPartitionBits =
"row_number_spiller_partition_bits";
static constexpr const char* kNumSpillPartitionBits =
"num_spill_partition_bits";

static constexpr const char* kMinSpillableReservationPct =
"min_spillable_reservation_pct";
Expand Down Expand Up @@ -584,6 +585,8 @@ class QueryConfig {
/// for hash join. The number of spill partitions will be power of two.
///
/// NOTE: as for now, we only support up to 8-way spill partitioning.
///
/// DEPRECATED.
uint8_t joinSpillPartitionBits() const {
constexpr uint8_t kDefaultBits = 3;
constexpr uint8_t kMaxBits = 3;
Expand All @@ -592,14 +595,14 @@ class QueryConfig {
}

/// Returns the number of bits used to calculate the spill partition number
/// for RowNumber. The number of spill partitions will be power of two.
///
/// for hash join and RowNumber. The number of spill partitions will be power
/// of tow.
/// NOTE: as for now, we only support up to 8-way spill partitioning.
uint8_t rowNumberSpillPartitionBits() const {
uint8_t numSpillPartitionBits() const {
constexpr uint8_t kDefaultBits = 3;
constexpr uint8_t kMaxBits = 3;
return std::min(
kMaxBits, get<uint8_t>(kRowNumberSpillPartitionBits, kDefaultBits));
kMaxBits, get<uint8_t>(kNumSpillPartitionBits, kDefaultBits));
}

uint64_t writerFlushThresholdBytes() const {
Expand Down
4 changes: 2 additions & 2 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,9 @@ Spilling
- integer
- 29
- The start partition bit which is used with `spiller_partition_bits` together to calculate the spilling partition number.
* - join_spiller_partition_bits
* - num_spill_partition_bits
- integer
- 2
- 3
- The number of bits (N) used to calculate the spilling partition number for hash join and RowNumber: 2 ^ N. At the moment the maximum
value is 3, meaning we only support up to 8-way spill partitioning.ing.
* - testing.spill_pct
Expand Down
1 change: 0 additions & 1 deletion velox/dwio/dwrf/test/E2EWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ class E2EWriterTest : public testing::Test {
0,
0,
0,
0,
writerFlushThresholdSize,
"none");
}
Expand Down
3 changes: 1 addition & 2 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
queryConfig.minSpillableReservationPct(),
queryConfig.spillableReservationGrowthPct(),
queryConfig.spillStartPartitionBit(),
queryConfig.joinSpillPartitionBits(),
queryConfig.rowNumberSpillPartitionBits(),
queryConfig.numSpillPartitionBits(),
queryConfig.maxSpillLevel(),
queryConfig.maxSpillRunRows(),
queryConfig.writerFlushThresholdBytes(),
Expand Down
12 changes: 6 additions & 6 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
const auto& spillConfig = spillConfig_.value();
HashBitRange hashBits(
spillConfig.startPartitionBit,
spillConfig.startPartitionBit + spillConfig.joinPartitionBits);
spillConfig.startPartitionBit + spillConfig.numPartitionBits);

if (spillPartition != nullptr) {
LOG(INFO) << "Setup reader to read spilled input from "
Expand All @@ -222,11 +222,11 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
spillInputReader_ = spillPartition->createUnorderedReader(pool());

const auto startBit = spillPartition->id().partitionBitOffset() +
spillConfig.joinPartitionBits;
spillConfig.numPartitionBits;
// Disable spilling if exceeding the max spill level and the query might run
// out of memory if the restored partition still can't fit in memory.
if (spillConfig.exceedSpillLevelLimit(
startBit, spillConfig.joinPartitionBits)) {
startBit, spillConfig.numPartitionBits)) {
RECORD_METRIC_VALUE(kMetricMaxSpillLevelExceededCount);
LOG(WARNING) << "Exceeded spill level limit: "
<< spillConfig.maxSpillLevel
Expand All @@ -235,7 +235,7 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
exceededMaxSpillLevelLimit_ = true;
return;
}
hashBits = HashBitRange(startBit, startBit + spillConfig.joinPartitionBits);
hashBits = HashBitRange(startBit, startBit + spillConfig.numPartitionBits);
}

spiller_ = std::make_unique<Spiller>(
Expand Down Expand Up @@ -906,8 +906,8 @@ void HashBuild::addRuntimeStats() {
if (spiller_ != nullptr && spiller_->isAnySpilled()) {
lockedStats->addRuntimeStat(
"maxSpillLevel",
RuntimeCounter(spillConfig()->spillLevel(
spiller_->hashBits().begin(), spillConfig()->joinPartitionBits)));
RuntimeCounter(
spillConfig()->spillLevel(spiller_->hashBits().begin())));
}
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ void HashProbe::maybeSetupSpillInput(
HashBitRange(
spillInputPartitionIds_.begin()->partitionBitOffset(),
spillInputPartitionIds_.begin()->partitionBitOffset() +
spillConfig.joinPartitionBits),
spillConfig.numPartitionBits),
&spillConfig);
// Set the spill partitions to the corresponding ones at the build side. The
// hash probe operator itself won't trigger any spilling.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ void RowNumber::spill() {

spillPartitionBits_ = HashBitRange(
spillConfig_->startPartitionBit,
spillConfig_->startPartitionBit + spillConfig_->joinPartitionBits);
spillConfig_->startPartitionBit + spillConfig_->numPartitionBits);

const auto spillPartitionSet = spillHashTable();

Expand Down
12 changes: 6 additions & 6 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3144,7 +3144,7 @@ TEST_P(MultiThreadedHashJoinTest, noSpillLevelLimit) {
"SELECT t_k0, t_data, u_k0, u_data FROM t, u WHERE t.t_k0 = u.u_k0")
.maxSpillLevel(-1)
.config(core::QueryConfig::kSpillStartPartitionBit, "48")
.config(core::QueryConfig::kJoinSpillPartitionBits, "3")
.config(core::QueryConfig::kNumSpillPartitionBits, "3")
.checkSpillStats(false)
.verifier([&](const std::shared_ptr<Task>& task, bool hasSpill) {
if (!hasSpill) {
Expand Down Expand Up @@ -4944,7 +4944,7 @@ TEST_F(HashJoinTest, spillFileSize) {
.referenceQuery(
"SELECT t_k0, t_data, u_k0, u_data FROM t, u WHERE t.t_k0 = u.u_k0")
.config(core::QueryConfig::kSpillStartPartitionBit, "48")
.config(core::QueryConfig::kJoinSpillPartitionBits, "3")
.config(core::QueryConfig::kNumSpillPartitionBits, "3")
.config(
core::QueryConfig::kMaxSpillFileSize, std::to_string(spillFileSize))
.checkSpillStats(false)
Expand Down Expand Up @@ -4978,7 +4978,7 @@ TEST_F(HashJoinTest, spillPartitionBitsOverlap) {
.referenceQuery(
"SELECT t_k0, t_k1, t_data, u_k0, u_k1, u_data FROM t, u WHERE t_k0 = u_k0 and t_k1 = u_k1")
.config(core::QueryConfig::kSpillStartPartitionBit, "8")
.config(core::QueryConfig::kJoinSpillPartitionBits, "1")
.config(core::QueryConfig::kNumSpillPartitionBits, "1")
.checkSpillStats(false)
.maxSpillLevel(0);
VELOX_ASSERT_THROW(builder.run(), "vs. 8");
Expand Down Expand Up @@ -6480,7 +6480,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimFromJoinBuild) {
std::unordered_map<std::string, std::string> config{
{core::QueryConfig::kSpillEnabled, "true"},
{core::QueryConfig::kJoinSpillEnabled, "true"},
{core::QueryConfig::kJoinSpillPartitionBits, "2"},
{core::QueryConfig::kNumSpillPartitionBits, "2"},
};
joinQueryCtx->testingOverrideConfigUnsafe(std::move(config));

Expand Down Expand Up @@ -6903,7 +6903,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredByEnsureJoinTableFit) {
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kJoinSpillEnabled, true)
.config(core::QueryConfig::kJoinSpillPartitionBits, 2)
.config(core::QueryConfig::kNumSpillPartitionBits, 2)
// Set multiple hash build drivers to trigger parallel build.
.maxDrivers(4)
.queryCtx(joinQueryCtx)
Expand Down Expand Up @@ -6970,7 +6970,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringJoinTableBuild) {
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kJoinSpillEnabled, true)
.config(core::QueryConfig::kJoinSpillPartitionBits, 2)
.config(core::QueryConfig::kNumSpillPartitionBits, 2)
// Set multiple hash build drivers to trigger parallel build.
.maxDrivers(4)
.queryCtx(joinQueryCtx)
Expand Down
Loading

0 comments on commit 901662f

Please sign in to comment.