Skip to content

Commit

Permalink
Add Linux Memory Manager
Browse files Browse the repository at this point in the history
Co-authored-by: Minhan Cao <minhan.duc.cao@gmail.com>
  • Loading branch information
mohsaka and minhancao committed Oct 10, 2024
1 parent 7deadd1 commit 1ea7b0a
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 0 deletions.
9 changes: 9 additions & 0 deletions presto-docs/src/main/sphinx/presto_cpp/features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,15 @@ is valid.
If the time period exceeds the parameter value, the request is rejected as
authentication failure (HTTP 401).

LinuxMemoryChecker
------------------

The LinuxMemoryChecker extends from PeriodicMemoryChecker and periodically checks
memory usage using memory calculation from inactive_anon + active_anon in the memory stat
file from Linux cgroups V1 or V2. The LinuxMemoryChecker is used for Linux systems only.

The LinuxMemoryChecker can be enabled by setting the CMake flag ``PRESTO_MEMORY_CHECKER_TYPE=LINUX_MEMORY_CHECKER``.

Async Data Cache and Prefetching
--------------------------------

Expand Down
34 changes: 34 additions & 0 deletions presto-docs/src/main/sphinx/presto_cpp/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,37 @@ The configuration properties of Presto C++ workers are described here, in alphab
* **Default value:** ``0.25``

See description for ``shared-arbitrator.memory-pool-min-free-capacity``

Memory Checker Properties
-------------------------

The LinuxMemoryChecker extends from PeriodicMemoryChecker and is used for Linux systems only.
The LinuxMemoryChecker can be enabled by setting the CMake flag ``PRESTO_MEMORY_CHECKER_TYPE=LINUX_MEMORY_CHECKER``.
The following properties for PeriodicMemoryChecker are as follows:

``system-mem-pushback-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

If set to ``true``, starts memory limit checker to trigger memory pushback when
server is under low memory pressure.

``system-mem-limit-gb``
^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Default value:** ``0``

Specifies the system memory limit that triggers the memory pushback or heap dump if
the server memory usage is beyond this limit. A value of zero means no limit is set.

``system-mem-shrink-gb``
^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Default value:** ``0``

Specifies the amount of memory to shrink when the memory pushback is
triggered. This only applies if ``system-mem-pushback-enabled`` is ``true``.
3 changes: 3 additions & 0 deletions presto-native-execution/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ endif
ifneq ($(PRESTO_STATS_REPORTER_TYPE),)
EXTRA_CMAKE_FLAGS += -DPRESTO_STATS_REPORTER_TYPE=$(PRESTO_STATS_REPORTER_TYPE)
endif
ifneq ($(PRESTO_MEMORY_CHECKER_TYPE),)
EXTRA_CMAKE_FLAGS += -DPRESTO_MEMORY_CHECKER_TYPE=$(PRESTO_MEMORY_CHECKER_TYPE)
endif

CMAKE_FLAGS := -DTREAT_WARNINGS_AS_ERRORS=${TREAT_WARNINGS_AS_ERRORS}
CMAKE_FLAGS += -DENABLE_ALL_WARNINGS=${ENABLE_WALL}
Expand Down
16 changes: 16 additions & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,19 @@ if(PRESTO_STATS_REPORTER_TYPE)
"${PRESTO_STATS_REPORTER_TYPE} is not a valid stats reporter name")
endif()
endif()

if(PRESTO_MEMORY_CHECKER_TYPE)
add_compile_definitions(PRESTO_MEMORY_CHECKER_TYPE)
# Check if the current platform is Linux and the memory checker type is
# LINUX_MEMORY_CHECKER.
if(UNIX
AND NOT APPLE
AND (PRESTO_MEMORY_CHECKER_TYPE STREQUAL "LINUX_MEMORY_CHECKER"))
add_library(presto_linux_memory_checker OBJECT LinuxMemoryChecker.cpp)
target_link_libraries(presto_server presto_linux_memory_checker)
else()
message(
FATAL_ERROR
"${PRESTO_MEMORY_CHECKER_TYPE} is not a valid memory checker name")
endif()
endif()
160 changes: 160 additions & 0 deletions presto-native-execution/presto_cpp/main/LinuxMemoryChecker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <boost/regex.hpp>
#include <folly/Conv.h>
#include <folly/CppAttributes.h>
#include <folly/File.h>
#include <folly/FileUtil.h>
#include <folly/Format.h>
#include <folly/Range.h>
#include <folly/Singleton.h>
#include <folly/String.h>
#include <folly/gen/Base.h>
#include <folly/gen/File.h>
#include <folly/gen/String.h>
#include "presto_cpp/main/PeriodicMemoryChecker.h"
#include "presto_cpp/main/common/Configs.h"

#include <sys/stat.h>

namespace facebook::presto {

class LinuxMemoryChecker : public PeriodicMemoryChecker {
public:
explicit LinuxMemoryChecker(const PeriodicMemoryChecker::Config& config)
: PeriodicMemoryChecker(config) {
// Find out what cgroup version (v1 or v2) we have based on the directory
// it's mounted.
struct stat buffer;
if ((stat(kCgroupV1Path, &buffer) == 0)) {
statFile_ = kCgroupV1Path;
} else if ((stat(kCgroupV2Path, &buffer) == 0)) {
statFile_ = kCgroupV2Path;
} else {
statFile_ = "None";
}
LOG(INFO) << fmt::format("Using memory stat file {}", statFile_);
}

~LinuxMemoryChecker() override {}

protected:
// Current memory calculation used is inactive_anon + active_anon
// Our first attempt was using memInfo memTotal - memAvailable.
// However memInfo is not containerized so we reserve this as a
// last resort.
//
// Next we tried to use what docker/kubernetes uses for their
// calculation. cgroup usage_in_bytes - total_inactive_files.
// However we found out that usage_in_bytes is a fuzz value
// and has a chance for the sync to occur after the shrink
// polling interval. This would result in double shrinks.
//
// Therefore we decided on values from the memory.stat file
// that are real time statistics. At first we tried to use
// the calculation suggested by the kernel team RSS+CACHE(+SWAP)
// However we noticed that this value was not closely related to the
// value in usage_in_bytes which is used to OOMKill. We then looked
// at all of the values in the stat file and decided that
// inactive_anon + active_anon moves closest to that of
// usage_in_bytes
//
// NOTE: We do not know if cgroup V2 memory.current is a fuzz
// value. It may be better than what we currently use. For
// consistency we will match cgroup V1 and change if
// necessary.
int64_t systemUsedMemoryBytes() override {
size_t memAvailable = 0;
size_t memTotal = 0;
size_t inactiveAnon = 0;
size_t activeAnon = 0;

if (statFile_ != "None") {
folly::gen::byLine(statFile_.c_str()) |
[&](const folly::StringPiece& line) -> void {
inactiveAnon =
extractNumericConfigValueWithRegex(line, kInactiveAnonRegex);
activeAnon = extractNumericConfigValueWithRegex(line, kActiveAnonRegex);
};

// Unit is in bytes.
return inactiveAnon + activeAnon;
}

// Last resort use host machine info.
folly::gen::byLine("/proc/meminfo") |
[&](const folly::StringPiece& line) -> void {
memAvailable =
extractNumericConfigValueWithRegex(line, kMemAvailableRegex) * 1024;
memTotal =
extractNumericConfigValueWithRegex(line, kMemTotalRegex) * 1024;
};
// Unit is in bytes.
return (memAvailable && memTotal) ? memTotal - memAvailable : 0;
}

int64_t mallocBytes() const override {
VELOX_UNSUPPORTED();
}

void periodicCb() const override {
return;
}

bool heapDumpCb(const std::string& filePath) const override {
VELOX_UNSUPPORTED();
}

void removeDumpFile(const std::string& filePath) const override {
VELOX_UNSUPPORTED();
}

private:
const boost::regex kInactiveAnonRegex{R"!(inactive_anon\s*(\d+)\s*)!"};
const boost::regex kActiveAnonRegex{R"!(active_anon\s*(\d+)\s*)!"};
const boost::regex kMemAvailableRegex{R"!(MemAvailable:\s*(\d+)\s*kB)!"};
const boost::regex kMemTotalRegex{R"!(MemTotal:\s*(\d+)\s*kB)!"};
const char* kCgroupV1Path = "/sys/fs/cgroup/memory/memory.stat";
const char* kCgroupV2Path = "/sys/fs/cgroup/memory.stat";
std::string statFile_;

size_t extractNumericConfigValueWithRegex(
const folly::StringPiece& line,
const boost::regex& regex) {
boost::cmatch match;
if (boost::regex_match(line.data(), match, regex)) {
if (match.size() > 1) {
std::string numStr(match[1].str());
return std::stroull(numStr);
}
}
return 0;
}
};

folly::Singleton<facebook::presto::PeriodicMemoryChecker> checker(
[]() -> facebook::presto::PeriodicMemoryChecker* {
PeriodicMemoryChecker::Config config;
auto* systemConfig = SystemConfig::instance();
config.systemMemPushbackEnabled =
systemConfig->systemMemPushbackEnabled();
config.systemMemLimitBytes =
static_cast<uint64_t>(systemConfig->systemMemLimitGb()) << 30;
config.systemMemShrinkBytes =
static_cast<uint64_t>(systemConfig->systemMemShrinkGb()) << 30;
return std::make_unique<LinuxMemoryChecker>(config).release();
});

} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,12 @@ void PeriodicMemoryChecker::pushbackMemory() {
kCounterMemoryPushbackLatencyMs, latencyMs * 1000);
LOG(INFO) << "Shrunk " << velox::succinctBytes(freedBytes);
}

#ifndef PRESTO_MEMORY_CHECKER_TYPE
// Initialize singleton for the checker to be nullptr if
// PRESTO_MEMORY_CHECKER_TYPE is not defined.
folly::Singleton<facebook::presto::PeriodicMemoryChecker> checker([]() {
return nullptr;
});
#endif
} // namespace facebook::presto
17 changes: 17 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <glog/logging.h>
#include "CoordinatorDiscoverer.h"
#include "presto_cpp/main/Announcer.h"
#include "presto_cpp/main/PeriodicMemoryChecker.h"
#include "presto_cpp/main/PeriodicTaskManager.h"
#include "presto_cpp/main/SignalHandler.h"
#include "presto_cpp/main/SystemConnector.h"
Expand Down Expand Up @@ -573,6 +574,8 @@ void PrestoServer::run() {
addAdditionalPeriodicTasks();
periodicTaskManager_->start();

addMemoryCheckerPeriodicTask();

// Start everything. After the return from the following call we are shutting
// down.
httpServer_->start(getHttpServerFilters());
Expand All @@ -592,6 +595,8 @@ void PrestoServer::run() {

stopAdditionalPeriodicTasks();

stopMemoryCheckerPeriodicTask();

// Destroy entities here to ensure we won't get any messages after Server
// object is gone and to have nice log in case shutdown gets stuck.
PRESTO_SHUTDOWN_LOG(INFO) << "Destroying Task Resource";
Expand Down Expand Up @@ -974,6 +979,18 @@ void PrestoServer::updateAnnouncerDetails() {
}
}

void PrestoServer::addMemoryCheckerPeriodicTask() {
if (folly::Singleton<PeriodicMemoryChecker>::try_get()) {
folly::Singleton<PeriodicMemoryChecker>::try_get()->start();
}
}

void PrestoServer::stopMemoryCheckerPeriodicTask() {
if (folly::Singleton<PeriodicMemoryChecker>::try_get()) {
folly::Singleton<PeriodicMemoryChecker>::try_get()->stop();
}
}

void PrestoServer::addServerPeriodicTasks() {
periodicTaskManager_->addTask(
[server = this]() { server->populateMemAndCPUInfo(); },
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ class PrestoServer {

virtual void stopAdditionalPeriodicTasks(){};

virtual void addMemoryCheckerPeriodicTask();

virtual void stopMemoryCheckerPeriodicTask();

virtual void initializeCoordinatorDiscoverer();

virtual std::shared_ptr<velox::exec::TaskListener> getTaskListener();
Expand Down

0 comments on commit 1ea7b0a

Please sign in to comment.