From 775ef4d7f8452d884610d66305dc00dafb2b82f5 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Mon, 29 Apr 2024 12:38:35 -0700 Subject: [PATCH] [native] Add reserved memory capacity configs Add two system configs for reserved query memory capacity: query-reserved-memory-gb: the total amount of query memory capacity reserved to ensure that a query has minimal memory capacity to run memory-pool-reserved-capacity: the minimal amount of memory capacity in bytes reserved for each query memory pool. --- .../presto_cpp/main/PrestoServer.cpp | 9 +++++ .../presto_cpp/main/common/Configs.cpp | 13 ++++++- .../presto_cpp/main/common/Configs.h | 23 +++++++++++ .../main/types/PrestoToVeloxConnector.cpp | 2 +- .../property/NativeExecutionSystemConfig.java | 38 ++++++++++++++++++- .../TestNativeExecutionSystemConfig.java | 4 ++ 6 files changed, 86 insertions(+), 3 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index a833a55ac671..fd2b00c21661 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -669,7 +669,16 @@ void PrestoServer::initializeVeloxMemory() { memoryGb, "Query memory capacity must not be larger than system memory capacity"); options.arbitratorCapacity = queryMemoryGb << 30; + const uint64_t queryReservedMemoryGb = + systemConfig->queryReservedMemoryGb(); + VELOX_USER_CHECK_LE( + queryReservedMemoryGb, + queryMemoryGb, + "Query reserved memory capacity must not be larger than query memory capacity"); + options.arbitratorReservedCapacity = queryReservedMemoryGb << 30; options.memoryPoolInitCapacity = systemConfig->memoryPoolInitCapacity(); + options.memoryPoolReservedCapacity = + systemConfig->memoryPoolReservedCapacity(); options.memoryPoolTransferCapacity = systemConfig->memoryPoolTransferCapacity(); options.memoryReclaimWaitMs = systemConfig->memoryReclaimWaitMs(); diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index 1169abebacc9..e7bf5f367873 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -12,8 +12,8 @@ * limitations under the License. */ -#include "presto_cpp/main/common/ConfigReader.h" #include "presto_cpp/main/common/Configs.h" +#include "presto_cpp/main/common/ConfigReader.h" #include "presto_cpp/main/common/Utils.h" #include "velox/core/QueryConfig.h" @@ -185,6 +185,7 @@ SystemConfig::SystemConfig() { BOOL_PROP(kUseMmapAllocator, true), STR_PROP(kMemoryArbitratorKind, ""), NUM_PROP(kQueryMemoryGb, 38), + NUM_PROP(kQueryReservedMemoryGb, 4), BOOL_PROP(kEnableVeloxTaskLogging, false), BOOL_PROP(kEnableVeloxExprSetLogging, false), NUM_PROP(kLocalShuffleMaxPartitionBytes, 268435456), @@ -462,12 +463,22 @@ int32_t SystemConfig::queryMemoryGb() const { return optionalProperty(kQueryMemoryGb).value(); } +int32_t SystemConfig::queryReservedMemoryGb() const { + return optionalProperty(kQueryReservedMemoryGb).value(); +} + uint64_t SystemConfig::memoryPoolInitCapacity() const { static constexpr uint64_t kMemoryPoolInitCapacityDefault = 128 << 20; return optionalProperty(kMemoryPoolInitCapacity) .value_or(kMemoryPoolInitCapacityDefault); } +uint64_t SystemConfig::memoryPoolReservedCapacity() const { + static constexpr uint64_t kMemoryPoolReservedCapacityDefault = 64 << 20; + return optionalProperty(kMemoryPoolReservedCapacity) + .value_or(kMemoryPoolReservedCapacityDefault); +} + uint64_t SystemConfig::memoryPoolTransferCapacity() const { static constexpr uint64_t kMemoryPoolTransferCapacityDefault = 32 << 20; return optionalProperty(kMemoryPoolTransferCapacity) diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index 31b580915224..065821e6edab 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -244,8 +244,10 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kSpillerSpillPath{ "experimental.spiller-spill-path"}; static constexpr std::string_view kShutdownOnsetSec{"shutdown-onset-sec"}; + /// Memory allocation limit enforced via internal memory allocator. static constexpr std::string_view kSystemMemoryGb{"system-memory-gb"}; + /// Specifies the total memory capacity that can be used by query execution in /// GB. The query memory capacity should be configured less than the system /// memory capacity ('system-memory-gb') to reserve memory for system usage @@ -256,6 +258,18 @@ class SystemConfig : public ConfigBase { /// this config only applies if the memory arbitration has been enabled. static constexpr std::string_view kQueryMemoryGb{"query-memory-gb"}; + /// Specifies the amount of query memory capacity reserved to ensure that each + /// query has minimal memory capacity to run. A query can only allocate from + /// the reserved query memory if its current capacity is less than the minimal + /// memory capacity as specified by 'memory-pool-reserved-capacity'. The + /// exceeding capacity has to allocate from the non-reserved query memory. + /// + /// NOTE: the reserved query memory capacity is enforced by memory arbitrator + /// so that this config only applies if the memory arbitration has been + /// enabled. + static constexpr std::string_view kQueryReservedMemoryGb{ + "query-reserved-memory-gb"}; + /// If true, enable memory pushback when the server is under low memory /// condition. This only applies if 'system-mem-limit-gb' is set. static constexpr std::string_view kSystemMemPushbackEnabled{ @@ -335,6 +349,11 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kMemoryPoolInitCapacity{ "memory-pool-init-capacity"}; + /// The minimal amount of memory capacity in bytes reserved for each query + /// memory pool. + static constexpr std::string_view kMemoryPoolReservedCapacity{ + "memory-pool-reserved-capacity"}; + /// The minimal memory capacity in bytes transferred between memory pools /// during memory arbitration. /// @@ -620,8 +639,12 @@ class SystemConfig : public ConfigBase { int32_t queryMemoryGb() const; + int32_t queryReservedMemoryGb() const; + uint64_t memoryPoolInitCapacity() const; + uint64_t memoryPoolReservedCapacity() const; + uint64_t memoryPoolTransferCapacity() const; uint64_t memoryReclaimWaitMs() const; diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp index fa1df1f19f76..030dde7873ee 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp @@ -1470,4 +1470,4 @@ std::unique_ptr TpchPrestoToVeloxConnector::createConnectorProtocol() const { return std::make_unique(); } -} // namespace facebook::presto \ No newline at end of file +} // namespace facebook::presto diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/property/NativeExecutionSystemConfig.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/property/NativeExecutionSystemConfig.java index fce7e58c57c4..b5187241be7e 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/property/NativeExecutionSystemConfig.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/property/NativeExecutionSystemConfig.java @@ -79,11 +79,18 @@ public class NativeExecutionSystemConfig // Set memory arbitrator capacity to the same as per-query memory capacity // as there is only one query running at Presto-on-Spark at a time. private static final String MEMORY_ARBITRATOR_CAPACITY_GB = "query-memory-gb"; + // Set memory arbitrator reserved capacity. Since there is only one query + // running at Presto-on-Spark at a time, then we shall set this to zero. + private static final String MEMORY_ARBITRATOR_RESERVED_CAPACITY_GB = "query-reserved-memory-gb"; // Set the initial memory capacity when we create a query memory pool. For // Presto-on-Spark, we set it to 'query-memory-gb' to allocate all the // memory arbitrator capacity to the query memory pool on its creation as // there is only one query running at a time. private static final String MEMORY_POOL_INIT_CAPACITY = "memory-pool-init-capacity"; + // Set the reserved memory capacity when we create a query memory pool. For + // Presto-on-Spark, we set this to zero as there is only one query running + // at a time. + private static final String MEMORY_POOL_RESERVED_CAPACITY = "memory-pool-reserved-capacity"; // Set the minimal memory capacity transfer between memory pools under // memory arbitration. For Presto-on-Spark, there is only one query running // so this specified how much memory to reclaim from a query when it runs @@ -125,12 +132,15 @@ public class NativeExecutionSystemConfig // Reserve 2GB from system memory for system operations such as disk // spilling and cache prefetch. private DataSize queryMemoryGb = new DataSize(8, DataSize.Unit.GIGABYTE); + + private DataSize queryReservedMemoryGb = new DataSize(0, DataSize.Unit.GIGABYTE); private boolean useMmapAllocator = true; private String memoryArbitratorKind = "SHARED"; private int memoryArbitratorCapacityGb = 8; + private int memoryArbitratorReservedCapacityGb; private long memoryPoolInitCapacity = 8L << 30; + private long memoryPoolReservedCapacity; private long memoryPoolTransferCapacity = 2L << 30; - private long memoryReclaimWaitMs = 300_000; private String spillerSpillPath = ""; private int concurrentLifespansPerTask = 5; @@ -170,7 +180,9 @@ public Map getAllProperties() .put(USE_MMAP_ALLOCATOR, String.valueOf(getUseMmapAllocator())) .put(MEMORY_ARBITRATOR_KIND, String.valueOf(getMemoryArbitratorKind())) .put(MEMORY_ARBITRATOR_CAPACITY_GB, String.valueOf(getMemoryArbitratorCapacityGb())) + .put(MEMORY_ARBITRATOR_RESERVED_CAPACITY_GB, String.valueOf(getMemoryArbitratorReservedCapacityGb())) .put(MEMORY_POOL_INIT_CAPACITY, String.valueOf(getMemoryPoolInitCapacity())) + .put(MEMORY_POOL_RESERVED_CAPACITY, String.valueOf(getMemoryPoolReservedCapacity())) .put(MEMORY_POOL_TRANSFER_CAPACITY, String.valueOf(getMemoryPoolTransferCapacity())) .put(MEMORY_RECLAIM_WAIT_MS, String.valueOf(getMemoryReclaimWaitMs())) .put(SPILLER_SPILL_PATH, String.valueOf(getSpillerSpillPath())) @@ -470,6 +482,18 @@ public int getMemoryArbitratorCapacityGb() return memoryArbitratorCapacityGb; } + @Config(MEMORY_ARBITRATOR_RESERVED_CAPACITY_GB) + public NativeExecutionSystemConfig setMemoryArbitratorReservedCapacityGb(int memoryArbitratorReservedCapacityGb) + { + this.memoryArbitratorReservedCapacityGb = memoryArbitratorReservedCapacityGb; + return this; + } + + public int getMemoryArbitratorReservedCapacityGb() + { + return memoryArbitratorReservedCapacityGb; + } + @Config(MEMORY_POOL_INIT_CAPACITY) public NativeExecutionSystemConfig setMemoryPoolInitCapacity(long memoryPoolInitCapacity) { @@ -482,6 +506,18 @@ public long getMemoryPoolInitCapacity() return memoryPoolInitCapacity; } + @Config(MEMORY_POOL_RESERVED_CAPACITY) + public NativeExecutionSystemConfig setMemoryPoolReservedCapacity(long memoryPoolReservedCapacity) + { + this.memoryPoolReservedCapacity = memoryPoolReservedCapacity; + return this; + } + + public long getMemoryPoolReservedCapacity() + { + return memoryPoolReservedCapacity; + } + @Config(MEMORY_POOL_TRANSFER_CAPACITY) public NativeExecutionSystemConfig setMemoryPoolTransferCapacity(long memoryPoolTransferCapacity) { diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/property/TestNativeExecutionSystemConfig.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/property/TestNativeExecutionSystemConfig.java index 1df7e81e9ab8..93809c083ced 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/property/TestNativeExecutionSystemConfig.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/property/TestNativeExecutionSystemConfig.java @@ -84,7 +84,9 @@ public void testNativeExecutionSystemConfig() .setUseMmapAllocator(true) .setMemoryArbitratorKind("SHARED") .setMemoryArbitratorCapacityGb(8) + .setMemoryArbitratorReservedCapacityGb(0) .setMemoryPoolInitCapacity(8L << 30) + .setMemoryPoolReservedCapacity(0) .setMemoryPoolTransferCapacity(2L << 30) .setMemoryReclaimWaitMs(300_000) .setSpillerSpillPath("") @@ -123,7 +125,9 @@ public void testNativeExecutionSystemConfig() .setUseMmapAllocator(false) .setMemoryArbitratorKind("") .setMemoryArbitratorCapacityGb(10) + .setMemoryArbitratorReservedCapacityGb(8) .setMemoryPoolInitCapacity(7L << 30) + .setMemoryPoolReservedCapacity(6L << 30) .setMemoryPoolTransferCapacity(1L << 30) .setMemoryReclaimWaitMs(123123123) .setSpillerSpillPath("dummy.spill.path")